risingwave_stream/executor/
hash_join.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{BTreeMap, HashSet};
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use anyhow::Context;
21use either::Either;
22use itertools::Itertools;
23use multimap::MultiMap;
24use risingwave_common::array::Op;
25use risingwave_common::hash::{HashKey, NullBitmap};
26use risingwave_common::metrics::LabelGuardedHistogram;
27use risingwave_common::row::RowExt;
28use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_common::util::iter_util::ZipEqDebug;
31use risingwave_expr::expr::NonStrictExpression;
32use risingwave_pb::stream_plan::InequalityType;
33use tokio::time::Instant;
34
35use self::builder::JoinChunkBuilder;
36use super::barrier_align::*;
37use super::join::hash_join::*;
38use super::join::row::{JoinEncoding, JoinRow};
39use super::join::*;
40use super::watermark::*;
41use crate::executor::CachedJoinRow;
42use crate::executor::join::builder::JoinStreamChunkBuilder;
43use crate::executor::join::hash_join::CacheResult;
44use crate::executor::prelude::*;
45
46fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
47    HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
48}
49
50/// Information about an inequality condition in the join.
51/// Represents: `left_col <op> right_col` where op is one of `<`, `<=`, `>`, `>=`.
52#[derive(Debug, Clone)]
53pub struct InequalityPairInfo {
54    /// Index of the left side column (from left input).
55    pub left_idx: usize,
56    /// Index of the right side column (from right input).
57    pub right_idx: usize,
58    /// Whether this condition is used to clean left state table.
59    pub clean_left_state: bool,
60    /// Whether this condition is used to clean right state table.
61    pub clean_right_state: bool,
62    /// Comparison operator.
63    pub op: InequalityType,
64}
65
66impl InequalityPairInfo {
67    /// Returns true if left side has larger values (`>` or `>=`).
68    pub fn left_side_is_larger(&self) -> bool {
69        matches!(
70            self.op,
71            InequalityType::GreaterThan | InequalityType::GreaterThanOrEqual
72        )
73    }
74}
75
76pub struct JoinParams {
77    /// Indices of the join keys
78    pub join_key_indices: Vec<usize>,
79    /// Indices of the input pk after dedup
80    pub deduped_pk_indices: Vec<usize>,
81}
82
83impl JoinParams {
84    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
85        Self {
86            join_key_indices,
87            deduped_pk_indices,
88        }
89    }
90}
91
92struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
93    /// Store all data from a one side stream
94    ht: JoinHashMap<K, S, E>,
95    /// Indices of the join key columns
96    join_key_indices: Vec<usize>,
97    /// The data type of all columns without degree.
98    all_data_types: Vec<DataType>,
99    /// The start position for the side in output new columns
100    start_pos: usize,
101    /// The mapping from input indices of a side to output columns.
102    i2o_mapping: Vec<(usize, usize)>,
103    i2o_mapping_indexed: MultiMap<usize, usize>,
104    /// The first field of the ith element indicates that when a watermark at the ith column of
105    /// this side comes, what band join conditions should be updated in order to possibly
106    /// generate a new watermark at that column or the corresponding column in the counterpart
107    /// join side.
108    ///
109    /// The second field indicates that whether the column is required less than the
110    /// the corresponding column in the counterpart join side in the band join condition.
111    input2inequality_index: Vec<Vec<(usize, bool)>>,
112    /// Some fields which are required non null to match due to inequalities.
113    non_null_fields: Vec<usize>,
114    /// (i, j) in this `Vec` means that state data in this join side can be cleaned if the value of
115    /// its ith column is less than the synthetic watermark of the jth band join condition.
116    state_clean_columns: Vec<(usize, usize)>,
117    /// Whether degree table is needed for this side.
118    need_degree_table: bool,
119    _marker: std::marker::PhantomData<E>,
120}
121
122impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        f.debug_struct("JoinSide")
125            .field("join_key_indices", &self.join_key_indices)
126            .field("col_types", &self.all_data_types)
127            .field("start_pos", &self.start_pos)
128            .field("i2o_mapping", &self.i2o_mapping)
129            .field("need_degree_table", &self.need_degree_table)
130            .finish()
131    }
132}
133
134impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
135    // WARNING: Please do not call this until we implement it.
136    fn is_dirty(&self) -> bool {
137        unimplemented!()
138    }
139
140    #[expect(dead_code)]
141    fn clear_cache(&mut self) {
142        assert!(
143            !self.is_dirty(),
144            "cannot clear cache while states of hash join are dirty"
145        );
146
147        // TODO: not working with rearranged chain
148        // self.ht.clear();
149    }
150
151    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
152        self.ht.init(epoch).await
153    }
154}
155
156/// `HashJoinExecutor` takes two input streams and runs equal hash join on them.
157/// The output columns are the concatenation of left and right columns.
158pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
159{
160    ctx: ActorContextRef,
161    info: ExecutorInfo,
162
163    /// Left input executor
164    input_l: Option<Executor>,
165    /// Right input executor
166    input_r: Option<Executor>,
167    /// The data types of the formed new columns
168    actual_output_data_types: Vec<DataType>,
169    /// The parameters of the left join executor
170    side_l: JoinSide<K, S, E>,
171    /// The parameters of the right join executor
172    side_r: JoinSide<K, S, E>,
173    /// Optional non-equi join conditions
174    cond: Option<NonStrictExpression>,
175    /// Inequality pairs with output column indices.
176    /// Each entry contains: (`output_indices`, `InequalityPairInfo`).
177    inequality_pairs: Vec<(Vec<usize>, InequalityPairInfo)>,
178    /// The output watermark of each inequality condition and its value is the minimum of the
179    /// watermarks from both sides. It will be used to generate watermark into downstream
180    /// and do state cleaning based on `clean_left_state`/`clean_right_state` fields.
181    inequality_watermarks: Vec<Option<Watermark>>,
182    /// Join-key positions and cleaning flags for watermark handling.
183    /// Each entry: (join-key position, `do_state_cleaning`).
184    watermark_indices_in_jk: Vec<(usize, bool)>,
185
186    /// Whether the logic can be optimized for append-only stream
187    append_only_optimize: bool,
188
189    metrics: Arc<StreamingMetrics>,
190    /// The maximum size of the chunk produced by executor at a time
191    chunk_size: usize,
192    /// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
193    cnt_rows_received: u32,
194
195    /// watermark column index -> `BufferedWatermarks`
196    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
197
198    /// When to alert high join amplification
199    high_join_amplification_threshold: usize,
200
201    /// Max number of rows that will be cached in the entry state.
202    entry_state_max_rows: usize,
203    /// Number of processed rows between periodic manual evictions of the join cache.
204    join_cache_evict_interval_rows: u32,
205}
206
207impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
208    for HashJoinExecutor<K, S, T, E>
209{
210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211        f.debug_struct("HashJoinExecutor")
212            .field("join_type", &T)
213            .field("input_left", &self.input_l.as_ref().unwrap().identity())
214            .field("input_right", &self.input_r.as_ref().unwrap().identity())
215            .field("side_l", &self.side_l)
216            .field("side_r", &self.side_r)
217            .field("stream_key", &self.info.stream_key)
218            .field("schema", &self.info.schema)
219            .field("actual_output_data_types", &self.actual_output_data_types)
220            .field(
221                "join_cache_evict_interval_rows",
222                &self.join_cache_evict_interval_rows,
223            )
224            .finish()
225    }
226}
227
228impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
229    for HashJoinExecutor<K, S, T, E>
230{
231    fn execute(self: Box<Self>) -> BoxedMessageStream {
232        self.into_stream().boxed()
233    }
234}
235
236struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
237    ctx: &'a ActorContextRef,
238    side_l: &'a mut JoinSide<K, S, E>,
239    side_r: &'a mut JoinSide<K, S, E>,
240    actual_output_data_types: &'a [DataType],
241    cond: &'a mut Option<NonStrictExpression>,
242    inequality_watermarks: &'a [Option<Watermark>],
243    chunk: StreamChunk,
244    append_only_optimize: bool,
245    chunk_size: usize,
246    cnt_rows_received: &'a mut u32,
247    high_join_amplification_threshold: usize,
248    entry_state_max_rows: usize,
249    join_cache_evict_interval_rows: u32,
250    join_matched_join_keys: &'a LabelGuardedHistogram,
251}
252
253impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
254    HashJoinExecutor<K, S, T, E>
255{
256    #[expect(clippy::too_many_arguments)]
257    pub fn new(
258        ctx: ActorContextRef,
259        info: ExecutorInfo,
260        input_l: Executor,
261        input_r: Executor,
262        params_l: JoinParams,
263        params_r: JoinParams,
264        null_safe: Vec<bool>,
265        output_indices: Vec<usize>,
266        cond: Option<NonStrictExpression>,
267        inequality_pairs: Vec<InequalityPairInfo>,
268        state_table_l: StateTable<S>,
269        degree_state_table_l: StateTable<S>,
270        state_table_r: StateTable<S>,
271        degree_state_table_r: StateTable<S>,
272        watermark_epoch: AtomicU64Ref,
273        is_append_only: bool,
274        metrics: Arc<StreamingMetrics>,
275        chunk_size: usize,
276        high_join_amplification_threshold: usize,
277        watermark_indices_in_jk: Vec<(usize, bool)>,
278    ) -> Self {
279        Self::new_with_cache_size(
280            ctx,
281            info,
282            input_l,
283            input_r,
284            params_l,
285            params_r,
286            null_safe,
287            output_indices,
288            cond,
289            inequality_pairs,
290            state_table_l,
291            degree_state_table_l,
292            state_table_r,
293            degree_state_table_r,
294            watermark_epoch,
295            is_append_only,
296            metrics,
297            chunk_size,
298            high_join_amplification_threshold,
299            None,
300            watermark_indices_in_jk,
301        )
302    }
303
304    #[expect(clippy::too_many_arguments)]
305    pub fn new_with_cache_size(
306        ctx: ActorContextRef,
307        info: ExecutorInfo,
308        input_l: Executor,
309        input_r: Executor,
310        params_l: JoinParams,
311        params_r: JoinParams,
312        null_safe: Vec<bool>,
313        output_indices: Vec<usize>,
314        cond: Option<NonStrictExpression>,
315        inequality_pairs: Vec<InequalityPairInfo>,
316        state_table_l: StateTable<S>,
317        degree_state_table_l: StateTable<S>,
318        state_table_r: StateTable<S>,
319        degree_state_table_r: StateTable<S>,
320        watermark_epoch: AtomicU64Ref,
321        is_append_only: bool,
322        metrics: Arc<StreamingMetrics>,
323        chunk_size: usize,
324        high_join_amplification_threshold: usize,
325        entry_state_max_rows: Option<usize>,
326        watermark_indices_in_jk: Vec<(usize, bool)>,
327    ) -> Self {
328        let entry_state_max_rows = match entry_state_max_rows {
329            None => ctx.config.developer.hash_join_entry_state_max_rows,
330            Some(entry_state_max_rows) => entry_state_max_rows,
331        };
332        let join_cache_evict_interval_rows = ctx
333            .config
334            .developer
335            .join_hash_map_evict_interval_rows
336            .max(1);
337        let side_l_column_n = input_l.schema().len();
338
339        let schema_fields = match T {
340            JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
341            JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
342            _ => [
343                input_l.schema().fields.clone(),
344                input_r.schema().fields.clone(),
345            ]
346            .concat(),
347        };
348
349        let original_output_data_types = schema_fields
350            .iter()
351            .map(|field| field.data_type())
352            .collect_vec();
353        let actual_output_data_types = output_indices
354            .iter()
355            .map(|&idx| original_output_data_types[idx].clone())
356            .collect_vec();
357
358        // Data types of of hash join state.
359        let state_all_data_types_l = input_l.schema().data_types();
360        let state_all_data_types_r = input_r.schema().data_types();
361
362        let state_pk_indices_l = input_l.stream_key().to_vec();
363        let state_pk_indices_r = input_r.stream_key().to_vec();
364
365        let state_join_key_indices_l = params_l.join_key_indices;
366        let state_join_key_indices_r = params_r.join_key_indices;
367
368        let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
369        let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
370
371        let degree_pk_indices_l = (state_join_key_indices_l.len()
372            ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
373            .collect_vec();
374        let degree_pk_indices_r = (state_join_key_indices_r.len()
375            ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
376            .collect_vec();
377
378        // If pk is contained in join key.
379        let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
380        let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
381
382        // check whether join key contains pk in both side
383        let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
384
385        let join_key_data_types_l = state_join_key_indices_l
386            .iter()
387            .map(|idx| state_all_data_types_l[*idx].clone())
388            .collect_vec();
389
390        let join_key_data_types_r = state_join_key_indices_r
391            .iter()
392            .map(|idx| state_all_data_types_r[*idx].clone())
393            .collect_vec();
394
395        assert_eq!(join_key_data_types_l, join_key_data_types_r);
396
397        let null_matched = K::Bitmap::from_bool_vec(null_safe);
398
399        let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
400        let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
401
402        let (left_to_output, right_to_output) = {
403            let (left_len, right_len) = if is_left_semi_or_anti(T) {
404                (state_all_data_types_l.len(), 0usize)
405            } else if is_right_semi_or_anti(T) {
406                (0usize, state_all_data_types_r.len())
407            } else {
408                (state_all_data_types_l.len(), state_all_data_types_r.len())
409            };
410            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
411        };
412
413        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
414        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
415
416        let left_input_len = input_l.schema().len();
417        let right_input_len = input_r.schema().len();
418        let mut l2inequality_index = vec![vec![]; left_input_len];
419        let mut r2inequality_index = vec![vec![]; right_input_len];
420        let mut l_inequal_state_clean_columns = vec![];
421        let mut r_inequal_state_clean_columns = vec![];
422        let inequality_pairs = inequality_pairs
423            .into_iter()
424            .enumerate()
425            .map(|(index, pair)| {
426                // Map input columns to inequality index
427                // The second field indicates whether this column is required to be less than
428                // the corresponding column on the other side.
429                // For `left >= right`: left is NOT less than right, right IS less than left
430                // For `left <= right`: left IS less than right, right is NOT less than left
431                let left_is_larger = pair.left_side_is_larger();
432                l2inequality_index[pair.left_idx].push((index, !left_is_larger));
433                r2inequality_index[pair.right_idx].push((index, left_is_larger));
434
435                // Determine state cleanup columns
436                if pair.clean_left_state {
437                    l_inequal_state_clean_columns.push((pair.left_idx, index));
438                }
439                if pair.clean_right_state {
440                    r_inequal_state_clean_columns.push((pair.right_idx, index));
441                }
442
443                // Get output indices for watermark emission
444                // We only emit watermarks for the LARGER side's output columns
445                let output_indices = if pair.left_side_is_larger() {
446                    // Left is larger, emit for left output columns
447                    l2o_indexed
448                        .get_vec(&pair.left_idx)
449                        .cloned()
450                        .unwrap_or_default()
451                } else {
452                    // Right is larger, emit for right output columns
453                    r2o_indexed
454                        .get_vec(&pair.right_idx)
455                        .cloned()
456                        .unwrap_or_default()
457                };
458
459                (output_indices, pair)
460            })
461            .collect_vec();
462
463        let mut l_non_null_fields = l2inequality_index
464            .iter()
465            .positions(|inequalities| !inequalities.is_empty())
466            .collect_vec();
467        let mut r_non_null_fields = r2inequality_index
468            .iter()
469            .positions(|inequalities| !inequalities.is_empty())
470            .collect_vec();
471
472        if append_only_optimize {
473            l_inequal_state_clean_columns.clear();
474            r_inequal_state_clean_columns.clear();
475            l_non_null_fields.clear();
476            r_non_null_fields.clear();
477        }
478
479        // Create degree tables with inequality column index for watermark-based cleaning.
480        // The degree_inequality_idx is the column index in the input row that should be
481        // stored in the degree table for inequality-based watermark cleaning.
482        let l_inequality_idx = l_inequal_state_clean_columns
483            .first()
484            .map(|(col_idx, _)| *col_idx);
485        let r_inequality_idx = r_inequal_state_clean_columns
486            .first()
487            .map(|(col_idx, _)| *col_idx);
488
489        let degree_state_l = need_degree_table_l.then(|| {
490            TableInner::new(
491                degree_pk_indices_l,
492                degree_join_key_indices_l,
493                degree_state_table_l,
494                l_inequality_idx,
495            )
496        });
497        let degree_state_r = need_degree_table_r.then(|| {
498            TableInner::new(
499                degree_pk_indices_r,
500                degree_join_key_indices_r,
501                degree_state_table_r,
502                r_inequality_idx,
503            )
504        });
505
506        let inequality_watermarks = vec![None; inequality_pairs.len()];
507        let watermark_buffers = BTreeMap::new();
508        Self {
509            ctx: ctx.clone(),
510            info,
511            input_l: Some(input_l),
512            input_r: Some(input_r),
513            actual_output_data_types,
514            side_l: JoinSide {
515                ht: JoinHashMap::new(
516                    watermark_epoch.clone(),
517                    join_key_data_types_l,
518                    state_join_key_indices_l.clone(),
519                    state_all_data_types_l.clone(),
520                    state_table_l,
521                    params_l.deduped_pk_indices,
522                    degree_state_l,
523                    null_matched.clone(),
524                    pk_contained_in_jk_l,
525                    metrics.clone(),
526                    ctx.id,
527                    ctx.fragment_id,
528                    "left",
529                ),
530                join_key_indices: state_join_key_indices_l,
531                all_data_types: state_all_data_types_l,
532                i2o_mapping: left_to_output,
533                i2o_mapping_indexed: l2o_indexed,
534                input2inequality_index: l2inequality_index,
535                non_null_fields: l_non_null_fields,
536                state_clean_columns: l_inequal_state_clean_columns,
537                start_pos: 0,
538                need_degree_table: need_degree_table_l,
539                _marker: PhantomData,
540            },
541            side_r: JoinSide {
542                ht: JoinHashMap::new(
543                    watermark_epoch,
544                    join_key_data_types_r,
545                    state_join_key_indices_r.clone(),
546                    state_all_data_types_r.clone(),
547                    state_table_r,
548                    params_r.deduped_pk_indices,
549                    degree_state_r,
550                    null_matched,
551                    pk_contained_in_jk_r,
552                    metrics.clone(),
553                    ctx.id,
554                    ctx.fragment_id,
555                    "right",
556                ),
557                join_key_indices: state_join_key_indices_r,
558                all_data_types: state_all_data_types_r,
559                start_pos: side_l_column_n,
560                i2o_mapping: right_to_output,
561                i2o_mapping_indexed: r2o_indexed,
562                input2inequality_index: r2inequality_index,
563                non_null_fields: r_non_null_fields,
564                state_clean_columns: r_inequal_state_clean_columns,
565                need_degree_table: need_degree_table_r,
566                _marker: PhantomData,
567            },
568            cond,
569            inequality_pairs,
570            inequality_watermarks,
571            watermark_indices_in_jk,
572            append_only_optimize,
573            metrics,
574            chunk_size,
575            cnt_rows_received: 0,
576            watermark_buffers,
577            high_join_amplification_threshold,
578            entry_state_max_rows,
579            join_cache_evict_interval_rows,
580        }
581    }
582
583    #[try_stream(ok = Message, error = StreamExecutorError)]
584    async fn into_stream(mut self) {
585        let input_l = self.input_l.take().unwrap();
586        let input_r = self.input_r.take().unwrap();
587        let aligned_stream = barrier_align(
588            input_l.execute(),
589            input_r.execute(),
590            self.ctx.id,
591            self.ctx.fragment_id,
592            self.metrics.clone(),
593            "Join",
594        );
595        pin_mut!(aligned_stream);
596
597        let actor_id = self.ctx.id;
598
599        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
600        let first_epoch = barrier.epoch;
601        // The first barrier message should be propagated.
602        yield Message::Barrier(barrier);
603        self.side_l.init(first_epoch).await?;
604        self.side_r.init(first_epoch).await?;
605
606        let actor_id_str = self.ctx.id.to_string();
607        let fragment_id_str = self.ctx.fragment_id.to_string();
608
609        // initialized some metrics
610        let join_actor_input_waiting_duration_ns = self
611            .metrics
612            .join_actor_input_waiting_duration_ns
613            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
614        let left_join_match_duration_ns = self
615            .metrics
616            .join_match_duration_ns
617            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
618        let right_join_match_duration_ns = self
619            .metrics
620            .join_match_duration_ns
621            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
622
623        let barrier_join_match_duration_ns = self
624            .metrics
625            .join_match_duration_ns
626            .with_guarded_label_values(&[
627                actor_id_str.as_str(),
628                fragment_id_str.as_str(),
629                "barrier",
630            ]);
631
632        let left_join_cached_entry_count = self
633            .metrics
634            .join_cached_entry_count
635            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
636
637        let right_join_cached_entry_count = self
638            .metrics
639            .join_cached_entry_count
640            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
641
642        // Bind at executor scope: a per-chunk guard would be dropped between chunks and reset the series.
643        let left_table_id_str = self.side_l.ht.table_id().to_string();
644        let right_table_id_str = self.side_r.ht.table_id().to_string();
645        let left_join_matched_join_keys = self
646            .metrics
647            .join_matched_join_keys
648            .with_guarded_label_values(&[
649                actor_id_str.as_str(),
650                fragment_id_str.as_str(),
651                left_table_id_str.as_str(),
652            ]);
653        let right_join_matched_join_keys = self
654            .metrics
655            .join_matched_join_keys
656            .with_guarded_label_values(&[
657                actor_id_str.as_str(),
658                fragment_id_str.as_str(),
659                right_table_id_str.as_str(),
660            ]);
661
662        let mut start_time = Instant::now();
663
664        while let Some(msg) = aligned_stream
665            .next()
666            .instrument_await("hash_join_barrier_align")
667            .await
668        {
669            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
670            match msg? {
671                AlignedMessage::WatermarkLeft(watermark) => {
672                    for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
673                        yield Message::Watermark(watermark_to_emit);
674                    }
675                }
676                AlignedMessage::WatermarkRight(watermark) => {
677                    for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
678                        yield Message::Watermark(watermark_to_emit);
679                    }
680                }
681                AlignedMessage::Left(chunk) => {
682                    let mut left_time = Duration::from_nanos(0);
683                    let mut left_start_time = Instant::now();
684                    #[for_await]
685                    for chunk in Self::eq_join_left(EqJoinArgs {
686                        ctx: &self.ctx,
687                        side_l: &mut self.side_l,
688                        side_r: &mut self.side_r,
689                        actual_output_data_types: &self.actual_output_data_types,
690                        cond: &mut self.cond,
691                        inequality_watermarks: &self.inequality_watermarks,
692                        chunk,
693                        append_only_optimize: self.append_only_optimize,
694                        chunk_size: self.chunk_size,
695                        cnt_rows_received: &mut self.cnt_rows_received,
696                        high_join_amplification_threshold: self.high_join_amplification_threshold,
697                        entry_state_max_rows: self.entry_state_max_rows,
698                        join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
699                        join_matched_join_keys: &left_join_matched_join_keys,
700                    }) {
701                        left_time += left_start_time.elapsed();
702                        yield Message::Chunk(chunk?);
703                        left_start_time = Instant::now();
704                    }
705                    left_time += left_start_time.elapsed();
706                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
707                    self.try_flush_data().await?;
708                }
709                AlignedMessage::Right(chunk) => {
710                    let mut right_time = Duration::from_nanos(0);
711                    let mut right_start_time = Instant::now();
712                    #[for_await]
713                    for chunk in Self::eq_join_right(EqJoinArgs {
714                        ctx: &self.ctx,
715                        side_l: &mut self.side_l,
716                        side_r: &mut self.side_r,
717                        actual_output_data_types: &self.actual_output_data_types,
718                        cond: &mut self.cond,
719                        inequality_watermarks: &self.inequality_watermarks,
720                        chunk,
721                        append_only_optimize: self.append_only_optimize,
722                        chunk_size: self.chunk_size,
723                        cnt_rows_received: &mut self.cnt_rows_received,
724                        high_join_amplification_threshold: self.high_join_amplification_threshold,
725                        entry_state_max_rows: self.entry_state_max_rows,
726                        join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
727                        join_matched_join_keys: &right_join_matched_join_keys,
728                    }) {
729                        right_time += right_start_time.elapsed();
730                        yield Message::Chunk(chunk?);
731                        right_start_time = Instant::now();
732                    }
733                    right_time += right_start_time.elapsed();
734                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
735                    self.try_flush_data().await?;
736                }
737                AlignedMessage::Barrier(barrier) => {
738                    let barrier_start_time = Instant::now();
739                    let (left_post_commit, right_post_commit) =
740                        self.flush_data(barrier.epoch).await?;
741
742                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
743
744                    // We don't include the time post yielding barrier because the vnode update
745                    // is a one-off and rare operation.
746                    barrier_join_match_duration_ns
747                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
748                    yield Message::Barrier(barrier);
749
750                    // Update the vnode bitmap for state tables of both sides if asked.
751                    right_post_commit
752                        .post_yield_barrier(update_vnode_bitmap.clone())
753                        .await?;
754                    if left_post_commit
755                        .post_yield_barrier(update_vnode_bitmap)
756                        .await?
757                        .unwrap_or(false)
758                    {
759                        self.watermark_buffers
760                            .values_mut()
761                            .for_each(|buffers| buffers.clear());
762                        self.inequality_watermarks.fill(None);
763                    }
764
765                    // Report metrics of cached join rows/entries
766                    for (join_cached_entry_count, ht) in [
767                        (&left_join_cached_entry_count, &self.side_l.ht),
768                        (&right_join_cached_entry_count, &self.side_r.ht),
769                    ] {
770                        join_cached_entry_count.set(ht.entry_count() as i64);
771                    }
772                }
773            }
774            start_time = Instant::now();
775        }
776    }
777
778    async fn flush_data(
779        &mut self,
780        epoch: EpochPair,
781    ) -> StreamExecutorResult<(
782        JoinHashMapPostCommit<'_, K, S, E>,
783        JoinHashMapPostCommit<'_, K, S, E>,
784    )> {
785        // All changes to the state has been buffered in the mem-table of the state table. Just
786        // `commit` them here.
787        let left = self.side_l.ht.flush(epoch).await?;
788        let right = self.side_r.ht.flush(epoch).await?;
789        Ok((left, right))
790    }
791
792    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
793        // All changes to the state has been buffered in the mem-table of the state table. Just
794        // `commit` them here.
795        self.side_l.ht.try_flush().await?;
796        self.side_r.ht.try_flush().await?;
797        Ok(())
798    }
799
800    // We need to manually evict the cache.
801    fn evict_cache(
802        side_update: &mut JoinSide<K, S, E>,
803        side_match: &mut JoinSide<K, S, E>,
804        cnt_rows_received: &mut u32,
805        join_cache_evict_interval_rows: u32,
806    ) {
807        *cnt_rows_received += 1;
808        if *cnt_rows_received >= join_cache_evict_interval_rows {
809            side_update.ht.evict();
810            side_match.ht.evict();
811            *cnt_rows_received = 0;
812        }
813    }
814
815    fn handle_watermark(
816        &mut self,
817        side: SideTypePrimitive,
818        watermark: Watermark,
819    ) -> StreamExecutorResult<Vec<Watermark>> {
820        let (side_update, side_match) = if side == SideType::Left {
821            (&mut self.side_l, &mut self.side_r)
822        } else {
823            (&mut self.side_r, &mut self.side_l)
824        };
825
826        // Process join-key watermarks
827        let wm_in_jk = side_update
828            .join_key_indices
829            .iter()
830            .positions(|idx| *idx == watermark.col_idx);
831        let mut watermarks_to_emit = vec![];
832        for idx in wm_in_jk {
833            let buffers = self
834                .watermark_buffers
835                .entry(idx)
836                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
837            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
838                if self
839                    .watermark_indices_in_jk
840                    .iter()
841                    .any(|(jk_pos, do_clean)| *jk_pos == idx && *do_clean)
842                {
843                    side_match
844                        .ht
845                        .update_watermark(selected_watermark.val.clone());
846                    side_update
847                        .ht
848                        .update_watermark(selected_watermark.val.clone());
849                }
850
851                let empty_indices = vec![];
852                let output_indices = side_update
853                    .i2o_mapping_indexed
854                    .get_vec(&side_update.join_key_indices[idx])
855                    .unwrap_or(&empty_indices)
856                    .iter()
857                    .chain(
858                        side_match
859                            .i2o_mapping_indexed
860                            .get_vec(&side_match.join_key_indices[idx])
861                            .unwrap_or(&empty_indices),
862                    );
863                for output_idx in output_indices {
864                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
865                }
866            };
867        }
868
869        // Process inequality watermarks
870        // We can only yield the LARGER side's watermark downstream.
871        // For `left >= right`: left is larger, emit for left output columns
872        // For `left <= right`: right is larger, emit for right output columns
873        let mut update_left_watermark = None;
874        let mut update_right_watermark = None;
875        if let Some(watermark_indices) = side_update.input2inequality_index.get(watermark.col_idx) {
876            for (inequality_index, _) in watermark_indices {
877                let buffers = self
878                    .watermark_buffers
879                    .entry(side_update.join_key_indices.len() + inequality_index)
880                    .or_insert_with(|| {
881                        BufferedWatermarks::with_ids([SideType::Left, SideType::Right])
882                    });
883                if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone())
884                {
885                    let (output_indices, pair_info) = &self.inequality_pairs[*inequality_index];
886                    let left_is_larger = pair_info.left_side_is_larger();
887
888                    // Emit watermark for the larger side's output columns only
889                    for output_idx in output_indices {
890                        watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
891                    }
892                    // Store watermark for state cleaning (via useful_state_clean_columns)
893                    self.inequality_watermarks[*inequality_index] =
894                        Some(selected_watermark.clone());
895
896                    // Mark which side needs state cleaning (only if the clean flag is set)
897                    if left_is_larger && pair_info.clean_left_state {
898                        update_left_watermark = Some(selected_watermark.val.clone());
899                    } else if !left_is_larger && pair_info.clean_right_state {
900                        update_right_watermark = Some(selected_watermark.val.clone());
901                    }
902                }
903            }
904            // Do state cleaning on the larger side.
905            // Now that degree tables include the inequality column, we can clean both
906            // state and degree tables using `update_watermark`.
907            if let Some(val) = update_left_watermark {
908                self.side_l.ht.update_watermark(val);
909            }
910            if let Some(val) = update_right_watermark {
911                self.side_r.ht.update_watermark(val);
912            }
913        }
914        Ok(watermarks_to_emit)
915    }
916
917    fn row_concat(
918        row_update: impl Row,
919        update_start_pos: usize,
920        row_matched: impl Row,
921        matched_start_pos: usize,
922    ) -> OwnedRow {
923        let mut new_row = vec![None; row_update.len() + row_matched.len()];
924
925        for (i, datum_ref) in row_update.iter().enumerate() {
926            new_row[i + update_start_pos] = datum_ref.to_owned_datum();
927        }
928        for (i, datum_ref) in row_matched.iter().enumerate() {
929            new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
930        }
931        OwnedRow::new(new_row)
932    }
933
934    /// Used to forward `eq_join_oneside` to show join side in stack.
935    fn eq_join_left(
936        args: EqJoinArgs<'_, K, S, E>,
937    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
938        Self::eq_join_oneside::<{ SideType::Left }>(args)
939    }
940
941    /// Used to forward `eq_join_oneside` to show join side in stack.
942    fn eq_join_right(
943        args: EqJoinArgs<'_, K, S, E>,
944    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
945        Self::eq_join_oneside::<{ SideType::Right }>(args)
946    }
947
948    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
949    async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
950        let EqJoinArgs {
951            ctx,
952            side_l,
953            side_r,
954            actual_output_data_types,
955            cond,
956            inequality_watermarks,
957            chunk,
958            append_only_optimize,
959            chunk_size,
960            cnt_rows_received,
961            high_join_amplification_threshold,
962            entry_state_max_rows,
963            join_cache_evict_interval_rows,
964            join_matched_join_keys,
965            ..
966        } = args;
967
968        let (side_update, side_match) = if SIDE == SideType::Left {
969            (side_l, side_r)
970        } else {
971            (side_r, side_l)
972        };
973
974        let useful_state_clean_columns = side_match
975            .state_clean_columns
976            .iter()
977            .filter_map(|(column_idx, inequality_index)| {
978                inequality_watermarks[*inequality_index]
979                    .as_ref()
980                    .map(|watermark| (*column_idx, watermark))
981            })
982            .collect_vec();
983
984        let mut hashjoin_chunk_builder =
985            JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
986                chunk_size,
987                actual_output_data_types.to_vec(),
988                side_update.i2o_mapping.clone(),
989                side_match.i2o_mapping.clone(),
990            ));
991
992        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
993        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
994            let Some((op, row)) = r else {
995                continue;
996            };
997            Self::evict_cache(
998                side_update,
999                side_match,
1000                cnt_rows_received,
1001                join_cache_evict_interval_rows,
1002            );
1003
1004            let cache_lookup_result = {
1005                let probe_non_null_requirement_satisfied = side_update
1006                    .non_null_fields
1007                    .iter()
1008                    .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
1009                let build_non_null_requirement_satisfied =
1010                    key.null_bitmap().is_subset(side_match.ht.null_matched());
1011                if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
1012                    side_match.ht.take_state_opt(key)
1013                } else {
1014                    CacheResult::NeverMatch
1015                }
1016            };
1017            let mut total_matches = 0;
1018
1019            macro_rules! match_rows {
1020                ($op:ident) => {
1021                    Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
1022                        cache_lookup_result,
1023                        row,
1024                        key,
1025                        &mut hashjoin_chunk_builder,
1026                        side_match,
1027                        side_update,
1028                        &useful_state_clean_columns,
1029                        cond,
1030                        append_only_optimize,
1031                        entry_state_max_rows,
1032                    )
1033                };
1034            }
1035
1036            match op {
1037                Op::Insert | Op::UpdateInsert =>
1038                {
1039                    #[for_await]
1040                    for chunk in match_rows!(Insert) {
1041                        let chunk = chunk?;
1042                        total_matches += chunk.cardinality();
1043                        yield chunk;
1044                    }
1045                }
1046                Op::Delete | Op::UpdateDelete =>
1047                {
1048                    #[for_await]
1049                    for chunk in match_rows!(Delete) {
1050                        let chunk = chunk?;
1051                        total_matches += chunk.cardinality();
1052                        yield chunk;
1053                    }
1054                }
1055            };
1056
1057            join_matched_join_keys.observe(total_matches as _);
1058            if total_matches > high_join_amplification_threshold {
1059                let join_key_data_types = side_update.ht.join_key_data_types();
1060                let key = key.deserialize(join_key_data_types)?;
1061                tracing::warn!(target: "high_join_amplification",
1062                    matched_rows_len = total_matches,
1063                    update_table_id = %side_update.ht.table_id(),
1064                    match_table_id = %side_match.ht.table_id(),
1065                    join_key = ?key,
1066                    actor_id = %ctx.id,
1067                    fragment_id = %ctx.fragment_id,
1068                    "large rows matched for join key"
1069                );
1070            }
1071        }
1072        // NOTE(kwannoel): We don't track metrics for this last chunk.
1073        if let Some(chunk) = hashjoin_chunk_builder.take() {
1074            yield chunk;
1075        }
1076    }
1077
1078    /// For the probe-side row, we need to check if it has values in cache, if not, we need to
1079    /// fetch the matched rows from the state table.
1080    ///
1081    /// Every matched build-side row being processed needs to go through the following phases:
1082    /// 1. Handle join condition evaluation.
1083    /// 2. Always do cache refill, if the state count is good.
1084    /// 3. Handle state cleaning.
1085    /// 4. Handle degree table update.
1086    #[expect(clippy::too_many_arguments)]
1087    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
1088    async fn handle_match_rows<
1089        'a,
1090        const SIDE: SideTypePrimitive,
1091        const JOIN_OP: JoinOpPrimitive,
1092    >(
1093        cached_lookup_result: CacheResult<E>,
1094        row: RowRef<'a>,
1095        key: &'a K,
1096        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1097        side_match: &'a mut JoinSide<K, S, E>,
1098        side_update: &'a mut JoinSide<K, S, E>,
1099        useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1100        cond: &'a mut Option<NonStrictExpression>,
1101        append_only_optimize: bool,
1102        entry_state_max_rows: usize,
1103    ) {
1104        let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1105        let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1106        let mut entry_state_count = 0;
1107
1108        let mut degree = 0;
1109        let mut append_only_matched_row = None;
1110        let mut matched_rows_to_clean = vec![];
1111
1112        macro_rules! match_row {
1113            (
1114                $match_order_key_indices:expr,
1115                $degree_table:expr,
1116                $matched_row:expr,
1117                $matched_row_ref:expr,
1118                $from_cache:literal,
1119                $map_output:expr,
1120            ) => {
1121                Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1122                    row,
1123                    $matched_row,
1124                    $matched_row_ref,
1125                    hashjoin_chunk_builder,
1126                    $match_order_key_indices,
1127                    $degree_table,
1128                    side_update.start_pos,
1129                    side_match.start_pos,
1130                    cond,
1131                    &mut degree,
1132                    useful_state_clean_columns,
1133                    append_only_optimize,
1134                    &mut append_only_matched_row,
1135                    &mut matched_rows_to_clean,
1136                    $map_output,
1137                )
1138            };
1139        }
1140
1141        let entry_state = match cached_lookup_result {
1142            CacheResult::NeverMatch => {
1143                let op = match JOIN_OP {
1144                    JoinOp::Insert => Op::Insert,
1145                    JoinOp::Delete => Op::Delete,
1146                };
1147                if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1148                    yield chunk;
1149                }
1150                return Ok(());
1151            }
1152            CacheResult::Hit(mut cached_rows) => {
1153                let (match_order_key_indices, match_degree_state) =
1154                    side_match.ht.get_degree_state_mut_ref();
1155                // Handle cached rows which match the probe-side row.
1156                for (matched_row_ref, matched_row) in
1157                    cached_rows.values_mut(&side_match.all_data_types)
1158                {
1159                    let matched_row = matched_row?;
1160                    if let Some(chunk) = match_row!(
1161                        match_order_key_indices,
1162                        match_degree_state,
1163                        matched_row,
1164                        Some(matched_row_ref),
1165                        true,
1166                        Either::Left,
1167                    )
1168                    .await
1169                    {
1170                        yield chunk;
1171                    }
1172                }
1173
1174                cached_rows
1175            }
1176            CacheResult::Miss => {
1177                // Handle rows which are not in cache.
1178                let (matched_rows, match_order_key_indices, degree_table) = side_match
1179                    .ht
1180                    .fetch_matched_rows_and_get_degree_table_ref(key)
1181                    .await?;
1182
1183                #[for_await]
1184                for matched_row in matched_rows {
1185                    let (encoded_pk, matched_row) = matched_row?;
1186
1187                    let mut matched_row_ref = None;
1188
1189                    // cache refill
1190                    if entry_state_count <= entry_state_max_rows {
1191                        let row_ref = entry_state
1192                            .insert(encoded_pk, E::encode(&matched_row))
1193                            .with_context(|| format!("row: {}", row.display(),))?;
1194                        matched_row_ref = Some(row_ref);
1195                        entry_state_count += 1;
1196                    }
1197                    if let Some(chunk) = match_row!(
1198                        match_order_key_indices,
1199                        degree_table,
1200                        matched_row,
1201                        matched_row_ref,
1202                        false,
1203                        Either::Right,
1204                    )
1205                    .await
1206                    {
1207                        yield chunk;
1208                    }
1209                }
1210                Box::new(entry_state)
1211            }
1212        };
1213
1214        // forward rows depending on join types
1215        let op = match JOIN_OP {
1216            JoinOp::Insert => Op::Insert,
1217            JoinOp::Delete => Op::Delete,
1218        };
1219        if degree == 0 {
1220            if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1221                yield chunk;
1222            }
1223        } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1224        {
1225            yield chunk;
1226        }
1227
1228        // cache refill
1229        if cache_hit || entry_state_count <= entry_state_max_rows {
1230            side_match.ht.update_state(key, entry_state);
1231        }
1232
1233        // watermark state cleaning
1234        for matched_row in matched_rows_to_clean {
1235            // The committed table watermark is responsible for reclaiming rows in the state table.
1236            side_match.ht.delete_row_in_mem(key, &matched_row.row)?;
1237        }
1238
1239        // apply append_only optimization to clean matched_rows which have been persisted
1240        if append_only_optimize && let Some(row) = append_only_matched_row {
1241            assert_matches!(JOIN_OP, JoinOp::Insert);
1242            side_match.ht.delete_handle_degree(key, row)?;
1243            return Ok(());
1244        }
1245
1246        // no append_only optimization, update state table(s).
1247        match JOIN_OP {
1248            JoinOp::Insert => {
1249                side_update
1250                    .ht
1251                    .insert_handle_degree(key, JoinRow::new(row, degree))?;
1252            }
1253            JoinOp::Delete => {
1254                side_update
1255                    .ht
1256                    .delete_handle_degree(key, JoinRow::new(row, degree))?;
1257            }
1258        }
1259    }
1260
1261    #[expect(clippy::too_many_arguments)]
1262    #[inline]
1263    async fn handle_match_row<
1264        'a,
1265        R: Row,  // input row type
1266        RO: Row, // output row type
1267        const SIDE: SideTypePrimitive,
1268        const JOIN_OP: JoinOpPrimitive,
1269        const MATCHED_ROWS_FROM_CACHE: bool,
1270    >(
1271        update_row: RowRef<'a>,
1272        mut matched_row: JoinRow<R>,
1273        mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1274        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1275        match_order_key_indices: &[usize],
1276        match_degree_table: &mut Option<TableInner<S>>,
1277        side_update_start_pos: usize,
1278        side_match_start_pos: usize,
1279        cond: &Option<NonStrictExpression>,
1280        update_row_degree: &mut u64,
1281        useful_state_clean_columns: &[(usize, &'a Watermark)],
1282        append_only_optimize: bool,
1283        append_only_matched_row: &mut Option<JoinRow<RO>>,
1284        matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1285        map_output: impl Fn(R) -> RO,
1286    ) -> Option<StreamChunk> {
1287        let mut need_state_clean = false;
1288        let mut chunk_opt = None;
1289        // TODO(kwannoel): Instead of evaluating this every loop,
1290        // we can call this only if there's a non-equi expression.
1291        // check join cond
1292        let join_condition_satisfied = Self::check_join_condition(
1293            update_row,
1294            side_update_start_pos,
1295            &matched_row.row,
1296            side_match_start_pos,
1297            cond,
1298        )
1299        .await;
1300
1301        if join_condition_satisfied {
1302            // update degree
1303            *update_row_degree += 1;
1304            // send matched row downstream
1305
1306            // Inserts must happen before degrees are updated,
1307            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1308            if matches!(JOIN_OP, JoinOp::Insert)
1309                && !forward_exactly_once(T, SIDE)
1310                && let Some(chunk) =
1311                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1312            {
1313                chunk_opt = Some(chunk);
1314            }
1315            // TODO(kwannoel): We can actually statically decide this, using join side + join type.
1316            if let Some(degree_table) = match_degree_table {
1317                update_degree::<S, { JOIN_OP }>(
1318                    match_order_key_indices,
1319                    degree_table,
1320                    &mut matched_row,
1321                );
1322                if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1323                    // update matched row in cache
1324                    match JOIN_OP {
1325                        JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1326                        JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1327                    }
1328                }
1329            }
1330
1331            // Deletes must happen after degree table is updated.
1332            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1333            if matches!(JOIN_OP, JoinOp::Delete)
1334                && !forward_exactly_once(T, SIDE)
1335                && let Some(chunk) =
1336                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1337            {
1338                chunk_opt = Some(chunk);
1339            }
1340        } else {
1341            // check if need state cleaning
1342            for (column_idx, watermark) in useful_state_clean_columns {
1343                if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1344                    scalar
1345                        .default_cmp(&watermark.val.as_scalar_ref_impl())
1346                        .is_lt()
1347                }) {
1348                    need_state_clean = true;
1349                    break;
1350                }
1351            }
1352        }
1353        // If the stream is append-only and the join key covers pk in both side,
1354        // then we can remove matched rows since pk is unique and will not be
1355        // inserted again
1356        if append_only_optimize {
1357            assert_matches!(JOIN_OP, JoinOp::Insert);
1358            // Since join key contains pk and pk is unique, there should be only
1359            // one row if matched.
1360            assert!(append_only_matched_row.is_none());
1361            *append_only_matched_row = Some(matched_row.map(map_output));
1362        } else if need_state_clean {
1363            debug_assert!(
1364                !append_only_optimize,
1365                "`append_only_optimize` and `need_state_clean` must not both be true"
1366            );
1367            matched_rows_to_clean.push(matched_row.map(map_output));
1368        }
1369
1370        chunk_opt
1371    }
1372
1373    // TODO(yuhao-su): We should find a better way to eval the expression
1374    // without concat two rows.
1375    // if there are non-equi expressions
1376    // NOTE(kwannoel): We can probably let `eval` use `impl Row` instead of `OwnedRow`.
1377    #[inline]
1378    async fn check_join_condition(
1379        row: impl Row,
1380        side_update_start_pos: usize,
1381        matched_row: impl Row,
1382        side_match_start_pos: usize,
1383        join_condition: &Option<NonStrictExpression>,
1384    ) -> bool {
1385        if let Some(join_condition) = join_condition {
1386            let new_row = Self::row_concat(
1387                row,
1388                side_update_start_pos,
1389                matched_row,
1390                side_match_start_pos,
1391            );
1392            join_condition
1393                .eval_row_infallible(&new_row)
1394                .await
1395                .map(|s| *s.as_bool())
1396                .unwrap_or(false)
1397        } else {
1398            true
1399        }
1400    }
1401}
1402
1403#[cfg(test)]
1404mod tests {
1405    use std::sync::atomic::AtomicU64;
1406
1407    use pretty_assertions::assert_eq;
1408    use risingwave_common::array::*;
1409    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1410    use risingwave_common::config::StreamingConfig;
1411    use risingwave_common::hash::{Key64, Key128};
1412    use risingwave_common::util::epoch::test_epoch;
1413    use risingwave_common::util::sort_util::OrderType;
1414    use risingwave_storage::memory::MemoryStateStore;
1415
1416    use super::*;
1417    use crate::common::table::test_utils::gen_pbtable;
1418    use crate::executor::MemoryEncoding;
1419    use crate::executor::test_utils::expr::build_from_pretty;
1420    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1421
1422    async fn create_in_memory_state_table(
1423        mem_state: MemoryStateStore,
1424        data_types: &[DataType],
1425        order_types: &[OrderType],
1426        pk_indices: &[usize],
1427        table_id: u32,
1428    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1429        create_in_memory_state_table_with_inequality(
1430            mem_state,
1431            data_types,
1432            order_types,
1433            pk_indices,
1434            table_id,
1435            None,
1436        )
1437        .await
1438    }
1439
1440    async fn create_in_memory_state_table_with_inequality(
1441        mem_state: MemoryStateStore,
1442        data_types: &[DataType],
1443        order_types: &[OrderType],
1444        pk_indices: &[usize],
1445        table_id: u32,
1446        degree_inequality_type: Option<DataType>,
1447    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1448        create_in_memory_state_table_with_watermark(
1449            mem_state,
1450            data_types,
1451            order_types,
1452            pk_indices,
1453            table_id,
1454            degree_inequality_type,
1455            vec![],
1456            vec![],
1457        )
1458        .await
1459    }
1460
1461    #[expect(clippy::too_many_arguments)]
1462    async fn create_in_memory_state_table_with_watermark(
1463        mem_state: MemoryStateStore,
1464        data_types: &[DataType],
1465        order_types: &[OrderType],
1466        pk_indices: &[usize],
1467        table_id: u32,
1468        degree_inequality_type: Option<DataType>,
1469        state_clean_watermark_indices: Vec<usize>,
1470        degree_clean_watermark_indices: Vec<usize>,
1471    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1472        let column_descs = data_types
1473            .iter()
1474            .enumerate()
1475            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1476            .collect_vec();
1477        let mut state_table_catalog = gen_pbtable(
1478            TableId::new(table_id),
1479            column_descs,
1480            order_types.to_vec(),
1481            pk_indices.to_vec(),
1482            0,
1483        );
1484        state_table_catalog.clean_watermark_indices = state_clean_watermark_indices
1485            .into_iter()
1486            .map(|idx| idx as u32)
1487            .collect();
1488        let state_table =
1489            StateTable::from_table_catalog(&state_table_catalog, mem_state.clone(), None).await;
1490
1491        // Create degree table with schema: [pk..., _degree, inequality?]
1492        let mut degree_table_column_descs = vec![];
1493        pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1494            degree_table_column_descs.push(ColumnDesc::unnamed(
1495                ColumnId::new(pk_id as i32),
1496                data_types[*idx].clone(),
1497            ))
1498        });
1499        // Add _degree column
1500        degree_table_column_descs.push(ColumnDesc::unnamed(
1501            ColumnId::new(pk_indices.len() as i32),
1502            DataType::Int64,
1503        ));
1504        // Add inequality column if present
1505        if let Some(ineq_type) = degree_inequality_type {
1506            degree_table_column_descs.push(ColumnDesc::unnamed(
1507                ColumnId::new((pk_indices.len() + 1) as i32),
1508                ineq_type,
1509            ));
1510        }
1511        let mut degree_table_catalog = gen_pbtable(
1512            TableId::new(table_id + 1),
1513            degree_table_column_descs,
1514            order_types.to_vec(),
1515            pk_indices.to_vec(),
1516            0,
1517        );
1518        degree_table_catalog.clean_watermark_indices = degree_clean_watermark_indices
1519            .into_iter()
1520            .map(|idx| idx as u32)
1521            .collect();
1522        let degree_state_table =
1523            StateTable::from_table_catalog(&degree_table_catalog, mem_state, None).await;
1524        (state_table, degree_state_table)
1525    }
1526
1527    fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1528        build_from_pretty(
1529            condition_text
1530                .as_deref()
1531                .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1532        )
1533    }
1534
1535    async fn create_executor<const T: JoinTypePrimitive>(
1536        with_condition: bool,
1537        null_safe: bool,
1538        condition_text: Option<String>,
1539        inequality_pairs: Vec<InequalityPairInfo>,
1540    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1541        let schema = Schema {
1542            fields: vec![
1543                Field::unnamed(DataType::Int64), // join key
1544                Field::unnamed(DataType::Int64),
1545            ],
1546        };
1547        let (tx_l, source_l) = MockSource::channel();
1548        let source_l = source_l.into_executor(schema.clone(), vec![1]);
1549        let (tx_r, source_r) = MockSource::channel();
1550        let source_r = source_r.into_executor(schema, vec![1]);
1551        let params_l = JoinParams::new(vec![0], vec![1]);
1552        let params_r = JoinParams::new(vec![0], vec![1]);
1553        let cond = with_condition.then(|| create_cond(condition_text));
1554
1555        let mem_state = MemoryStateStore::new();
1556
1557        // Determine inequality column types for degree tables based on inequality_pairs
1558        let l_degree_ineq_type = inequality_pairs
1559            .iter()
1560            .find(|pair| pair.clean_left_state)
1561            .map(|_| DataType::Int64); // Column 1 is Int64
1562        let r_degree_ineq_type = inequality_pairs
1563            .iter()
1564            .find(|pair| pair.clean_right_state)
1565            .map(|_| DataType::Int64); // Column 1 is Int64
1566        let l_clean_watermark_indices = inequality_pairs
1567            .iter()
1568            .find(|pair| pair.clean_left_state)
1569            .map(|pair| vec![pair.left_idx])
1570            .unwrap_or_default();
1571        let r_clean_watermark_indices = inequality_pairs
1572            .iter()
1573            .find(|pair| pair.clean_right_state)
1574            .map(|pair| vec![pair.right_idx])
1575            .unwrap_or_default();
1576        let degree_inequality_column_idx = 3;
1577        let l_degree_clean_watermark_indices = l_degree_ineq_type
1578            .as_ref()
1579            .map(|_| vec![degree_inequality_column_idx])
1580            .unwrap_or_default();
1581        let r_degree_clean_watermark_indices = r_degree_ineq_type
1582            .as_ref()
1583            .map(|_| vec![degree_inequality_column_idx])
1584            .unwrap_or_default();
1585
1586        let (state_l, degree_state_l) = create_in_memory_state_table_with_watermark(
1587            mem_state.clone(),
1588            &[DataType::Int64, DataType::Int64],
1589            &[OrderType::ascending(), OrderType::ascending()],
1590            &[0, 1],
1591            0,
1592            l_degree_ineq_type,
1593            l_clean_watermark_indices,
1594            l_degree_clean_watermark_indices,
1595        )
1596        .await;
1597
1598        let (state_r, degree_state_r) = create_in_memory_state_table_with_watermark(
1599            mem_state,
1600            &[DataType::Int64, DataType::Int64],
1601            &[OrderType::ascending(), OrderType::ascending()],
1602            &[0, 1],
1603            2,
1604            r_degree_ineq_type,
1605            r_clean_watermark_indices,
1606            r_degree_clean_watermark_indices,
1607        )
1608        .await;
1609
1610        let schema = match T {
1611            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1612            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1613            _ => [source_l.schema().fields(), source_r.schema().fields()]
1614                .concat()
1615                .into_iter()
1616                .collect(),
1617        };
1618        let schema_len = schema.len();
1619        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1620
1621        let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1622            ActorContext::for_test(123),
1623            info,
1624            source_l,
1625            source_r,
1626            params_l,
1627            params_r,
1628            vec![null_safe],
1629            (0..schema_len).collect_vec(),
1630            cond,
1631            inequality_pairs,
1632            state_l,
1633            degree_state_l,
1634            state_r,
1635            degree_state_r,
1636            Arc::new(AtomicU64::new(0)),
1637            false,
1638            Arc::new(StreamingMetrics::unused()),
1639            1024,
1640            2048,
1641            vec![(0, true)],
1642        );
1643        (tx_l, tx_r, executor.boxed().execute())
1644    }
1645
1646    async fn create_classical_executor<const T: JoinTypePrimitive>(
1647        with_condition: bool,
1648        null_safe: bool,
1649        condition_text: Option<String>,
1650    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1651        create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1652    }
1653
1654    async fn create_append_only_executor<const T: JoinTypePrimitive>(
1655        with_condition: bool,
1656    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1657        let schema = Schema {
1658            fields: vec![
1659                Field::unnamed(DataType::Int64),
1660                Field::unnamed(DataType::Int64),
1661                Field::unnamed(DataType::Int64),
1662            ],
1663        };
1664        let (tx_l, source_l) = MockSource::channel();
1665        let source_l = source_l.into_executor(schema.clone(), vec![0]);
1666        let (tx_r, source_r) = MockSource::channel();
1667        let source_r = source_r.into_executor(schema, vec![0]);
1668        let params_l = JoinParams::new(vec![0, 1], vec![]);
1669        let params_r = JoinParams::new(vec![0, 1], vec![]);
1670        let cond = with_condition.then(|| create_cond(None));
1671
1672        let mem_state = MemoryStateStore::new();
1673
1674        let (state_l, degree_state_l) = create_in_memory_state_table(
1675            mem_state.clone(),
1676            &[DataType::Int64, DataType::Int64, DataType::Int64],
1677            &[
1678                OrderType::ascending(),
1679                OrderType::ascending(),
1680                OrderType::ascending(),
1681            ],
1682            &[0, 1, 0],
1683            0,
1684        )
1685        .await;
1686
1687        let (state_r, degree_state_r) = create_in_memory_state_table(
1688            mem_state,
1689            &[DataType::Int64, DataType::Int64, DataType::Int64],
1690            &[
1691                OrderType::ascending(),
1692                OrderType::ascending(),
1693                OrderType::ascending(),
1694            ],
1695            &[0, 1, 1],
1696            1,
1697        )
1698        .await;
1699
1700        let schema = match T {
1701            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1702            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1703            _ => [source_l.schema().fields(), source_r.schema().fields()]
1704                .concat()
1705                .into_iter()
1706                .collect(),
1707        };
1708        let schema_len = schema.len();
1709        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1710
1711        let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1712            ActorContext::for_test(123),
1713            info,
1714            source_l,
1715            source_r,
1716            params_l,
1717            params_r,
1718            vec![false],
1719            (0..schema_len).collect_vec(),
1720            cond,
1721            vec![],
1722            state_l,
1723            degree_state_l,
1724            state_r,
1725            degree_state_r,
1726            Arc::new(AtomicU64::new(0)),
1727            true,
1728            Arc::new(StreamingMetrics::unused()),
1729            1024,
1730            2048,
1731            vec![(0, true)],
1732        );
1733        (tx_l, tx_r, executor.boxed().execute())
1734    }
1735
1736    #[tokio::test]
1737    async fn test_inequality_join_watermark() -> StreamExecutorResult<()> {
1738        // Test inequality join with watermark-based state cleanup
1739        // Condition: left.col1 >= right.col1 (left side is larger, clean left state)
1740        // Left state rows with col1 < watermark will be cleaned
1741        let chunk_l1 = StreamChunk::from_pretty(
1742            "  I I
1743             + 2 4
1744             + 2 7
1745             + 3 8",
1746        );
1747        let chunk_r1 = StreamChunk::from_pretty(
1748            "  I I
1749             + 2 6",
1750        );
1751        let chunk_r2 = StreamChunk::from_pretty(
1752            "  I I
1753             + 2 3",
1754        );
1755        // Test with condition: left.col1 >= right.col1
1756        let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1757            true,
1758            false,
1759            Some(String::from(
1760                "(greater_than_or_equal:boolean $1:int8 $3:int8)",
1761            )),
1762            vec![InequalityPairInfo {
1763                left_idx: 1,
1764                right_idx: 1,
1765                clean_left_state: true, // left >= right, left is larger
1766                clean_right_state: false,
1767                op: InequalityType::GreaterThanOrEqual,
1768            }],
1769        )
1770        .await;
1771
1772        // push the init barrier for left and right
1773        tx_l.push_barrier(test_epoch(1), false);
1774        tx_r.push_barrier(test_epoch(1), false);
1775        hash_join.next_unwrap_ready_barrier()?;
1776
1777        // push the left chunk: (2, 4), (2, 7), (3, 8)
1778        tx_l.push_chunk(chunk_l1);
1779        hash_join.next_unwrap_pending();
1780
1781        // push watermarks: left=10, right=6
1782        // Output watermark is min(10, 6) = 6 for both output columns
1783        tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1784        hash_join.next_unwrap_pending();
1785
1786        tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1787        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1788        assert_eq!(
1789            output_watermark,
1790            Watermark::new(1, DataType::Int64, ScalarImpl::Int64(6))
1791        );
1792
1793        // After watermark, left state rows with col1 < 6 are cleaned
1794        // Row (2, 4) should be cleaned (4 < 6)
1795        // Row (2, 7) should remain (7 >= 6)
1796        // Row (3, 8) should remain (8 >= 6)
1797
1798        // push right chunk (2, 6)
1799        // Only (2, 7) can match: 7 >= 6 is TRUE
1800        // (2, 4) was cleaned, so it won't match
1801        tx_r.push_chunk(chunk_r1);
1802        let chunk = hash_join.next_unwrap_ready_chunk()?;
1803        assert_eq!(
1804            chunk,
1805            StreamChunk::from_pretty(
1806                " I I I I
1807                + 2 7 2 6"
1808            )
1809        );
1810
1811        // push right chunk (2, 3)
1812        // (2, 7) can match: 7 >= 3 is TRUE
1813        // (2, 4) was cleaned, won't match
1814        tx_r.push_chunk(chunk_r2);
1815        let chunk = hash_join.next_unwrap_ready_chunk()?;
1816        assert_eq!(
1817            chunk,
1818            StreamChunk::from_pretty(
1819                " I I I I
1820                + 2 7 2 3"
1821            )
1822        );
1823
1824        Ok(())
1825    }
1826
1827    #[tokio::test]
1828    async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1829        let chunk_l1 = StreamChunk::from_pretty(
1830            "  I I
1831             + 1 4
1832             + 2 5
1833             + 3 6",
1834        );
1835        let chunk_l2 = StreamChunk::from_pretty(
1836            "  I I
1837             + 3 8
1838             - 3 8",
1839        );
1840        let chunk_r1 = StreamChunk::from_pretty(
1841            "  I I
1842             + 2 7
1843             + 4 8
1844             + 6 9",
1845        );
1846        let chunk_r2 = StreamChunk::from_pretty(
1847            "  I  I
1848             + 3 10
1849             + 6 11",
1850        );
1851        let (mut tx_l, mut tx_r, mut hash_join) =
1852            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1853
1854        // push the init barrier for left and right
1855        tx_l.push_barrier(test_epoch(1), false);
1856        tx_r.push_barrier(test_epoch(1), false);
1857        hash_join.next_unwrap_ready_barrier()?;
1858
1859        // push the 1st left chunk
1860        tx_l.push_chunk(chunk_l1);
1861        hash_join.next_unwrap_pending();
1862
1863        // push the init barrier for left and right
1864        tx_l.push_barrier(test_epoch(2), false);
1865        tx_r.push_barrier(test_epoch(2), false);
1866        hash_join.next_unwrap_ready_barrier()?;
1867
1868        // push the 2nd left chunk
1869        tx_l.push_chunk(chunk_l2);
1870        hash_join.next_unwrap_pending();
1871
1872        // push the 1st right chunk
1873        tx_r.push_chunk(chunk_r1);
1874        let chunk = hash_join.next_unwrap_ready_chunk()?;
1875        assert_eq!(
1876            chunk,
1877            StreamChunk::from_pretty(
1878                " I I I I
1879                + 2 5 2 7"
1880            )
1881        );
1882
1883        // push the 2nd right chunk
1884        tx_r.push_chunk(chunk_r2);
1885        let chunk = hash_join.next_unwrap_ready_chunk()?;
1886        assert_eq!(
1887            chunk,
1888            StreamChunk::from_pretty(
1889                " I I I I
1890                + 3 6 3 10"
1891            )
1892        );
1893
1894        Ok(())
1895    }
1896
1897    #[tokio::test]
1898    async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1899        let chunk_l1 = StreamChunk::from_pretty(
1900            "  I I
1901             + 1 4
1902             + 2 5
1903             + . 6",
1904        );
1905        let chunk_l2 = StreamChunk::from_pretty(
1906            "  I I
1907             + . 8
1908             - . 8",
1909        );
1910        let chunk_r1 = StreamChunk::from_pretty(
1911            "  I I
1912             + 2 7
1913             + 4 8
1914             + 6 9",
1915        );
1916        let chunk_r2 = StreamChunk::from_pretty(
1917            "  I  I
1918             + . 10
1919             + 6 11",
1920        );
1921        let (mut tx_l, mut tx_r, mut hash_join) =
1922            create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1923
1924        // push the init barrier for left and right
1925        tx_l.push_barrier(test_epoch(1), false);
1926        tx_r.push_barrier(test_epoch(1), false);
1927        hash_join.next_unwrap_ready_barrier()?;
1928
1929        // push the 1st left chunk
1930        tx_l.push_chunk(chunk_l1);
1931        hash_join.next_unwrap_pending();
1932
1933        // push the init barrier for left and right
1934        tx_l.push_barrier(test_epoch(2), false);
1935        tx_r.push_barrier(test_epoch(2), false);
1936        hash_join.next_unwrap_ready_barrier()?;
1937
1938        // push the 2nd left chunk
1939        tx_l.push_chunk(chunk_l2);
1940        hash_join.next_unwrap_pending();
1941
1942        // push the 1st right chunk
1943        tx_r.push_chunk(chunk_r1);
1944        let chunk = hash_join.next_unwrap_ready_chunk()?;
1945        assert_eq!(
1946            chunk,
1947            StreamChunk::from_pretty(
1948                " I I I I
1949                + 2 5 2 7"
1950            )
1951        );
1952
1953        // push the 2nd right chunk
1954        tx_r.push_chunk(chunk_r2);
1955        let chunk = hash_join.next_unwrap_ready_chunk()?;
1956        assert_eq!(
1957            chunk,
1958            StreamChunk::from_pretty(
1959                " I I I I
1960                + . 6 . 10"
1961            )
1962        );
1963
1964        Ok(())
1965    }
1966
1967    #[tokio::test]
1968    async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1969        let chunk_l1 = StreamChunk::from_pretty(
1970            "  I I
1971             + 1 4
1972             + 2 5
1973             + 3 6",
1974        );
1975        let chunk_l2 = StreamChunk::from_pretty(
1976            "  I I
1977             + 3 8
1978             - 3 8",
1979        );
1980        let chunk_r1 = StreamChunk::from_pretty(
1981            "  I I
1982             + 2 7
1983             + 4 8
1984             + 6 9",
1985        );
1986        let chunk_r2 = StreamChunk::from_pretty(
1987            "  I  I
1988             + 3 10
1989             + 6 11",
1990        );
1991        let chunk_l3 = StreamChunk::from_pretty(
1992            "  I I
1993             + 6 10",
1994        );
1995        let chunk_r3 = StreamChunk::from_pretty(
1996            "  I  I
1997             - 6 11",
1998        );
1999        let chunk_r4 = StreamChunk::from_pretty(
2000            "  I  I
2001             - 6 9",
2002        );
2003        let (mut tx_l, mut tx_r, mut hash_join) =
2004            create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
2005
2006        // push the init barrier for left and right
2007        tx_l.push_barrier(test_epoch(1), false);
2008        tx_r.push_barrier(test_epoch(1), false);
2009        hash_join.next_unwrap_ready_barrier()?;
2010
2011        // push the 1st left chunk
2012        tx_l.push_chunk(chunk_l1);
2013        hash_join.next_unwrap_pending();
2014
2015        // push the init barrier for left and right
2016        tx_l.push_barrier(test_epoch(2), false);
2017        tx_r.push_barrier(test_epoch(2), false);
2018        hash_join.next_unwrap_ready_barrier()?;
2019
2020        // push the 2nd left chunk
2021        tx_l.push_chunk(chunk_l2);
2022        hash_join.next_unwrap_pending();
2023
2024        // push the 1st right chunk
2025        tx_r.push_chunk(chunk_r1);
2026        let chunk = hash_join.next_unwrap_ready_chunk()?;
2027        assert_eq!(
2028            chunk,
2029            StreamChunk::from_pretty(
2030                " I I
2031                + 2 5"
2032            )
2033        );
2034
2035        // push the 2nd right chunk
2036        tx_r.push_chunk(chunk_r2);
2037        let chunk = hash_join.next_unwrap_ready_chunk()?;
2038        assert_eq!(
2039            chunk,
2040            StreamChunk::from_pretty(
2041                " I I
2042                + 3 6"
2043            )
2044        );
2045
2046        // push the 3rd left chunk (tests forward_exactly_once)
2047        tx_l.push_chunk(chunk_l3);
2048        let chunk = hash_join.next_unwrap_ready_chunk()?;
2049        assert_eq!(
2050            chunk,
2051            StreamChunk::from_pretty(
2052                " I I
2053                + 6 10"
2054            )
2055        );
2056
2057        // push the 3rd right chunk
2058        // (tests that no change if there are still matches)
2059        tx_r.push_chunk(chunk_r3);
2060        hash_join.next_unwrap_pending();
2061
2062        // push the 3rd left chunk
2063        // (tests that deletion occurs when there are no more matches)
2064        tx_r.push_chunk(chunk_r4);
2065        let chunk = hash_join.next_unwrap_ready_chunk()?;
2066        assert_eq!(
2067            chunk,
2068            StreamChunk::from_pretty(
2069                " I I
2070                - 6 10"
2071            )
2072        );
2073
2074        Ok(())
2075    }
2076
2077    #[tokio::test]
2078    async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
2079        let chunk_l1 = StreamChunk::from_pretty(
2080            "  I I
2081             + 1 4
2082             + 2 5
2083             + . 6",
2084        );
2085        let chunk_l2 = StreamChunk::from_pretty(
2086            "  I I
2087             + . 8
2088             - . 8",
2089        );
2090        let chunk_r1 = StreamChunk::from_pretty(
2091            "  I I
2092             + 2 7
2093             + 4 8
2094             + 6 9",
2095        );
2096        let chunk_r2 = StreamChunk::from_pretty(
2097            "  I  I
2098             + . 10
2099             + 6 11",
2100        );
2101        let chunk_l3 = StreamChunk::from_pretty(
2102            "  I I
2103             + 6 10",
2104        );
2105        let chunk_r3 = StreamChunk::from_pretty(
2106            "  I  I
2107             - 6 11",
2108        );
2109        let chunk_r4 = StreamChunk::from_pretty(
2110            "  I  I
2111             - 6 9",
2112        );
2113        let (mut tx_l, mut tx_r, mut hash_join) =
2114            create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
2115
2116        // push the init barrier for left and right
2117        tx_l.push_barrier(test_epoch(1), false);
2118        tx_r.push_barrier(test_epoch(1), false);
2119        hash_join.next_unwrap_ready_barrier()?;
2120
2121        // push the 1st left chunk
2122        tx_l.push_chunk(chunk_l1);
2123        hash_join.next_unwrap_pending();
2124
2125        // push the init barrier for left and right
2126        tx_l.push_barrier(test_epoch(2), false);
2127        tx_r.push_barrier(test_epoch(2), false);
2128        hash_join.next_unwrap_ready_barrier()?;
2129
2130        // push the 2nd left chunk
2131        tx_l.push_chunk(chunk_l2);
2132        hash_join.next_unwrap_pending();
2133
2134        // push the 1st right chunk
2135        tx_r.push_chunk(chunk_r1);
2136        let chunk = hash_join.next_unwrap_ready_chunk()?;
2137        assert_eq!(
2138            chunk,
2139            StreamChunk::from_pretty(
2140                " I I
2141                + 2 5"
2142            )
2143        );
2144
2145        // push the 2nd right chunk
2146        tx_r.push_chunk(chunk_r2);
2147        let chunk = hash_join.next_unwrap_ready_chunk()?;
2148        assert_eq!(
2149            chunk,
2150            StreamChunk::from_pretty(
2151                " I I
2152                + . 6"
2153            )
2154        );
2155
2156        // push the 3rd left chunk (tests forward_exactly_once)
2157        tx_l.push_chunk(chunk_l3);
2158        let chunk = hash_join.next_unwrap_ready_chunk()?;
2159        assert_eq!(
2160            chunk,
2161            StreamChunk::from_pretty(
2162                " I I
2163                + 6 10"
2164            )
2165        );
2166
2167        // push the 3rd right chunk
2168        // (tests that no change if there are still matches)
2169        tx_r.push_chunk(chunk_r3);
2170        hash_join.next_unwrap_pending();
2171
2172        // push the 3rd left chunk
2173        // (tests that deletion occurs when there are no more matches)
2174        tx_r.push_chunk(chunk_r4);
2175        let chunk = hash_join.next_unwrap_ready_chunk()?;
2176        assert_eq!(
2177            chunk,
2178            StreamChunk::from_pretty(
2179                " I I
2180                - 6 10"
2181            )
2182        );
2183
2184        Ok(())
2185    }
2186
2187    #[tokio::test]
2188    async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
2189        let chunk_l1 = StreamChunk::from_pretty(
2190            "  I I I
2191             + 1 4 1
2192             + 2 5 2
2193             + 3 6 3",
2194        );
2195        let chunk_l2 = StreamChunk::from_pretty(
2196            "  I I I
2197             + 4 9 4
2198             + 5 10 5",
2199        );
2200        let chunk_r1 = StreamChunk::from_pretty(
2201            "  I I I
2202             + 2 5 1
2203             + 4 9 2
2204             + 6 9 3",
2205        );
2206        let chunk_r2 = StreamChunk::from_pretty(
2207            "  I I I
2208             + 1 4 4
2209             + 3 6 5",
2210        );
2211
2212        let (mut tx_l, mut tx_r, mut hash_join) =
2213            create_append_only_executor::<{ JoinType::Inner }>(false).await;
2214
2215        // push the init barrier for left and right
2216        tx_l.push_barrier(test_epoch(1), false);
2217        tx_r.push_barrier(test_epoch(1), false);
2218        hash_join.next_unwrap_ready_barrier()?;
2219
2220        // push the 1st left chunk
2221        tx_l.push_chunk(chunk_l1);
2222        hash_join.next_unwrap_pending();
2223
2224        // push the init barrier for left and right
2225        tx_l.push_barrier(test_epoch(2), false);
2226        tx_r.push_barrier(test_epoch(2), false);
2227        hash_join.next_unwrap_ready_barrier()?;
2228
2229        // push the 2nd left chunk
2230        tx_l.push_chunk(chunk_l2);
2231        hash_join.next_unwrap_pending();
2232
2233        // push the 1st right chunk
2234        tx_r.push_chunk(chunk_r1);
2235        let chunk = hash_join.next_unwrap_ready_chunk()?;
2236        assert_eq!(
2237            chunk,
2238            StreamChunk::from_pretty(
2239                " I I I I I I
2240                + 2 5 2 2 5 1
2241                + 4 9 4 4 9 2"
2242            )
2243        );
2244
2245        // push the 2nd right chunk
2246        tx_r.push_chunk(chunk_r2);
2247        let chunk = hash_join.next_unwrap_ready_chunk()?;
2248        assert_eq!(
2249            chunk,
2250            StreamChunk::from_pretty(
2251                " I I I I I I
2252                + 1 4 1 1 4 4
2253                + 3 6 3 3 6 5"
2254            )
2255        );
2256
2257        Ok(())
2258    }
2259
2260    #[tokio::test]
2261    async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2262        let chunk_l1 = StreamChunk::from_pretty(
2263            "  I I I
2264             + 1 4 1
2265             + 2 5 2
2266             + 3 6 3",
2267        );
2268        let chunk_l2 = StreamChunk::from_pretty(
2269            "  I I I
2270             + 4 9 4
2271             + 5 10 5",
2272        );
2273        let chunk_r1 = StreamChunk::from_pretty(
2274            "  I I I
2275             + 2 5 1
2276             + 4 9 2
2277             + 6 9 3",
2278        );
2279        let chunk_r2 = StreamChunk::from_pretty(
2280            "  I I I
2281             + 1 4 4
2282             + 3 6 5",
2283        );
2284
2285        let (mut tx_l, mut tx_r, mut hash_join) =
2286            create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2287
2288        // push the init barrier for left and right
2289        tx_l.push_barrier(test_epoch(1), false);
2290        tx_r.push_barrier(test_epoch(1), false);
2291        hash_join.next_unwrap_ready_barrier()?;
2292
2293        // push the 1st left chunk
2294        tx_l.push_chunk(chunk_l1);
2295        hash_join.next_unwrap_pending();
2296
2297        // push the init barrier for left and right
2298        tx_l.push_barrier(test_epoch(2), false);
2299        tx_r.push_barrier(test_epoch(2), false);
2300        hash_join.next_unwrap_ready_barrier()?;
2301
2302        // push the 2nd left chunk
2303        tx_l.push_chunk(chunk_l2);
2304        hash_join.next_unwrap_pending();
2305
2306        // push the 1st right chunk
2307        tx_r.push_chunk(chunk_r1);
2308        let chunk = hash_join.next_unwrap_ready_chunk()?;
2309        assert_eq!(
2310            chunk,
2311            StreamChunk::from_pretty(
2312                " I I I
2313                + 2 5 2
2314                + 4 9 4"
2315            )
2316        );
2317
2318        // push the 2nd right chunk
2319        tx_r.push_chunk(chunk_r2);
2320        let chunk = hash_join.next_unwrap_ready_chunk()?;
2321        assert_eq!(
2322            chunk,
2323            StreamChunk::from_pretty(
2324                " I I I
2325                + 1 4 1
2326                + 3 6 3"
2327            )
2328        );
2329
2330        Ok(())
2331    }
2332
2333    #[tokio::test]
2334    async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2335        let chunk_l1 = StreamChunk::from_pretty(
2336            "  I I I
2337             + 1 4 1
2338             + 2 5 2
2339             + 3 6 3",
2340        );
2341        let chunk_l2 = StreamChunk::from_pretty(
2342            "  I I I
2343             + 4 9 4
2344             + 5 10 5",
2345        );
2346        let chunk_r1 = StreamChunk::from_pretty(
2347            "  I I I
2348             + 2 5 1
2349             + 4 9 2
2350             + 6 9 3",
2351        );
2352        let chunk_r2 = StreamChunk::from_pretty(
2353            "  I I I
2354             + 1 4 4
2355             + 3 6 5",
2356        );
2357
2358        let (mut tx_l, mut tx_r, mut hash_join) =
2359            create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2360
2361        // push the init barrier for left and right
2362        tx_l.push_barrier(test_epoch(1), false);
2363        tx_r.push_barrier(test_epoch(1), false);
2364        hash_join.next_unwrap_ready_barrier()?;
2365
2366        // push the 1st left chunk
2367        tx_l.push_chunk(chunk_l1);
2368        hash_join.next_unwrap_pending();
2369
2370        // push the init barrier for left and right
2371        tx_l.push_barrier(test_epoch(2), false);
2372        tx_r.push_barrier(test_epoch(2), false);
2373        hash_join.next_unwrap_ready_barrier()?;
2374
2375        // push the 2nd left chunk
2376        tx_l.push_chunk(chunk_l2);
2377        hash_join.next_unwrap_pending();
2378
2379        // push the 1st right chunk
2380        tx_r.push_chunk(chunk_r1);
2381        let chunk = hash_join.next_unwrap_ready_chunk()?;
2382        assert_eq!(
2383            chunk,
2384            StreamChunk::from_pretty(
2385                " I I I
2386                + 2 5 1
2387                + 4 9 2"
2388            )
2389        );
2390
2391        // push the 2nd right chunk
2392        tx_r.push_chunk(chunk_r2);
2393        let chunk = hash_join.next_unwrap_ready_chunk()?;
2394        assert_eq!(
2395            chunk,
2396            StreamChunk::from_pretty(
2397                " I I I
2398                + 1 4 4
2399                + 3 6 5"
2400            )
2401        );
2402
2403        Ok(())
2404    }
2405
2406    #[tokio::test]
2407    async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2408        let chunk_r1 = StreamChunk::from_pretty(
2409            "  I I
2410             + 1 4
2411             + 2 5
2412             + 3 6",
2413        );
2414        let chunk_r2 = StreamChunk::from_pretty(
2415            "  I I
2416             + 3 8
2417             - 3 8",
2418        );
2419        let chunk_l1 = StreamChunk::from_pretty(
2420            "  I I
2421             + 2 7
2422             + 4 8
2423             + 6 9",
2424        );
2425        let chunk_l2 = StreamChunk::from_pretty(
2426            "  I  I
2427             + 3 10
2428             + 6 11",
2429        );
2430        let chunk_r3 = StreamChunk::from_pretty(
2431            "  I I
2432             + 6 10",
2433        );
2434        let chunk_l3 = StreamChunk::from_pretty(
2435            "  I  I
2436             - 6 11",
2437        );
2438        let chunk_l4 = StreamChunk::from_pretty(
2439            "  I  I
2440             - 6 9",
2441        );
2442        let (mut tx_l, mut tx_r, mut hash_join) =
2443            create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2444
2445        // push the init barrier for left and right
2446        tx_l.push_barrier(test_epoch(1), false);
2447        tx_r.push_barrier(test_epoch(1), false);
2448        hash_join.next_unwrap_ready_barrier()?;
2449
2450        // push the 1st right chunk
2451        tx_r.push_chunk(chunk_r1);
2452        hash_join.next_unwrap_pending();
2453
2454        // push the init barrier for left and right
2455        tx_l.push_barrier(test_epoch(2), false);
2456        tx_r.push_barrier(test_epoch(2), false);
2457        hash_join.next_unwrap_ready_barrier()?;
2458
2459        // push the 2nd right chunk
2460        tx_r.push_chunk(chunk_r2);
2461        hash_join.next_unwrap_pending();
2462
2463        // push the 1st left chunk
2464        tx_l.push_chunk(chunk_l1);
2465        let chunk = hash_join.next_unwrap_ready_chunk()?;
2466        assert_eq!(
2467            chunk,
2468            StreamChunk::from_pretty(
2469                " I I
2470                + 2 5"
2471            )
2472        );
2473
2474        // push the 2nd left chunk
2475        tx_l.push_chunk(chunk_l2);
2476        let chunk = hash_join.next_unwrap_ready_chunk()?;
2477        assert_eq!(
2478            chunk,
2479            StreamChunk::from_pretty(
2480                " I I
2481                + 3 6"
2482            )
2483        );
2484
2485        // push the 3rd right chunk (tests forward_exactly_once)
2486        tx_r.push_chunk(chunk_r3);
2487        let chunk = hash_join.next_unwrap_ready_chunk()?;
2488        assert_eq!(
2489            chunk,
2490            StreamChunk::from_pretty(
2491                " I I
2492                + 6 10"
2493            )
2494        );
2495
2496        // push the 3rd left chunk
2497        // (tests that no change if there are still matches)
2498        tx_l.push_chunk(chunk_l3);
2499        hash_join.next_unwrap_pending();
2500
2501        // push the 3rd right chunk
2502        // (tests that deletion occurs when there are no more matches)
2503        tx_l.push_chunk(chunk_l4);
2504        let chunk = hash_join.next_unwrap_ready_chunk()?;
2505        assert_eq!(
2506            chunk,
2507            StreamChunk::from_pretty(
2508                " I I
2509                - 6 10"
2510            )
2511        );
2512
2513        Ok(())
2514    }
2515
2516    #[tokio::test]
2517    async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2518        let chunk_l1 = StreamChunk::from_pretty(
2519            "  I I
2520             + 1 4
2521             + 2 5
2522             + 3 6",
2523        );
2524        let chunk_l2 = StreamChunk::from_pretty(
2525            "  I I
2526             + 3 8
2527             - 3 8",
2528        );
2529        let chunk_r1 = StreamChunk::from_pretty(
2530            "  I I
2531             + 2 7
2532             + 4 8
2533             + 6 9",
2534        );
2535        let chunk_r2 = StreamChunk::from_pretty(
2536            "  I  I
2537             + 3 10
2538             + 6 11
2539             + 1 2
2540             + 1 3",
2541        );
2542        let chunk_l3 = StreamChunk::from_pretty(
2543            "  I I
2544             + 9 10",
2545        );
2546        let chunk_r3 = StreamChunk::from_pretty(
2547            "  I I
2548             - 1 2",
2549        );
2550        let chunk_r4 = StreamChunk::from_pretty(
2551            "  I I
2552             - 1 3",
2553        );
2554        let (mut tx_l, mut tx_r, mut hash_join) =
2555            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2556
2557        // push the init barrier for left and right
2558        tx_l.push_barrier(test_epoch(1), false);
2559        tx_r.push_barrier(test_epoch(1), false);
2560        hash_join.next_unwrap_ready_barrier()?;
2561
2562        // push the 1st left chunk
2563        tx_l.push_chunk(chunk_l1);
2564        let chunk = hash_join.next_unwrap_ready_chunk()?;
2565        assert_eq!(
2566            chunk,
2567            StreamChunk::from_pretty(
2568                " I I
2569                + 1 4
2570                + 2 5
2571                + 3 6",
2572            )
2573        );
2574
2575        // push the init barrier for left and right
2576        tx_l.push_barrier(test_epoch(2), false);
2577        tx_r.push_barrier(test_epoch(2), false);
2578        hash_join.next_unwrap_ready_barrier()?;
2579
2580        // push the 2nd left chunk
2581        tx_l.push_chunk(chunk_l2);
2582        let chunk = hash_join.next_unwrap_ready_chunk()?;
2583        assert_eq!(
2584            chunk,
2585            StreamChunk::from_pretty(
2586                "  I I
2587                 + 3 8 D
2588                 - 3 8 D",
2589            )
2590        );
2591
2592        // push the 1st right chunk
2593        tx_r.push_chunk(chunk_r1);
2594        let chunk = hash_join.next_unwrap_ready_chunk()?;
2595        assert_eq!(
2596            chunk,
2597            StreamChunk::from_pretty(
2598                " I I
2599                - 2 5"
2600            )
2601        );
2602
2603        // push the 2nd right chunk
2604        tx_r.push_chunk(chunk_r2);
2605        let chunk = hash_join.next_unwrap_ready_chunk()?;
2606        assert_eq!(
2607            chunk,
2608            StreamChunk::from_pretty(
2609                " I I
2610                - 3 6
2611                - 1 4"
2612            )
2613        );
2614
2615        // push the 3rd left chunk (tests forward_exactly_once)
2616        tx_l.push_chunk(chunk_l3);
2617        let chunk = hash_join.next_unwrap_ready_chunk()?;
2618        assert_eq!(
2619            chunk,
2620            StreamChunk::from_pretty(
2621                " I I
2622                + 9 10"
2623            )
2624        );
2625
2626        // push the 3rd right chunk
2627        // (tests that no change if there are still matches)
2628        tx_r.push_chunk(chunk_r3);
2629        hash_join.next_unwrap_pending();
2630
2631        // push the 4th right chunk
2632        // (tests that insertion occurs when there are no more matches)
2633        tx_r.push_chunk(chunk_r4);
2634        let chunk = hash_join.next_unwrap_ready_chunk()?;
2635        assert_eq!(
2636            chunk,
2637            StreamChunk::from_pretty(
2638                " I I
2639                + 1 4"
2640            )
2641        );
2642
2643        Ok(())
2644    }
2645
2646    #[tokio::test]
2647    async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2648        let chunk_r1 = StreamChunk::from_pretty(
2649            "  I I
2650             + 1 4
2651             + 2 5
2652             + 3 6",
2653        );
2654        let chunk_r2 = StreamChunk::from_pretty(
2655            "  I I
2656             + 3 8
2657             - 3 8",
2658        );
2659        let chunk_l1 = StreamChunk::from_pretty(
2660            "  I I
2661             + 2 7
2662             + 4 8
2663             + 6 9",
2664        );
2665        let chunk_l2 = StreamChunk::from_pretty(
2666            "  I  I
2667             + 3 10
2668             + 6 11
2669             + 1 2
2670             + 1 3",
2671        );
2672        let chunk_r3 = StreamChunk::from_pretty(
2673            "  I I
2674             + 9 10",
2675        );
2676        let chunk_l3 = StreamChunk::from_pretty(
2677            "  I I
2678             - 1 2",
2679        );
2680        let chunk_l4 = StreamChunk::from_pretty(
2681            "  I I
2682             - 1 3",
2683        );
2684        let (mut tx_r, mut tx_l, mut hash_join) =
2685            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2686
2687        // push the init barrier for left and right
2688        tx_r.push_barrier(test_epoch(1), false);
2689        tx_l.push_barrier(test_epoch(1), false);
2690        hash_join.next_unwrap_ready_barrier()?;
2691
2692        // push the 1st right chunk
2693        tx_r.push_chunk(chunk_r1);
2694        let chunk = hash_join.next_unwrap_ready_chunk()?;
2695        assert_eq!(
2696            chunk,
2697            StreamChunk::from_pretty(
2698                " I I
2699                + 1 4
2700                + 2 5
2701                + 3 6",
2702            )
2703        );
2704
2705        // push the init barrier for left and right
2706        tx_r.push_barrier(test_epoch(2), false);
2707        tx_l.push_barrier(test_epoch(2), false);
2708        hash_join.next_unwrap_ready_barrier()?;
2709
2710        // push the 2nd right chunk
2711        tx_r.push_chunk(chunk_r2);
2712        let chunk = hash_join.next_unwrap_ready_chunk()?;
2713        assert_eq!(
2714            chunk,
2715            StreamChunk::from_pretty(
2716                "  I I
2717                 + 3 8 D
2718                 - 3 8 D",
2719            )
2720        );
2721
2722        // push the 1st left chunk
2723        tx_l.push_chunk(chunk_l1);
2724        let chunk = hash_join.next_unwrap_ready_chunk()?;
2725        assert_eq!(
2726            chunk,
2727            StreamChunk::from_pretty(
2728                " I I
2729                - 2 5"
2730            )
2731        );
2732
2733        // push the 2nd left chunk
2734        tx_l.push_chunk(chunk_l2);
2735        let chunk = hash_join.next_unwrap_ready_chunk()?;
2736        assert_eq!(
2737            chunk,
2738            StreamChunk::from_pretty(
2739                " I I
2740                - 3 6
2741                - 1 4"
2742            )
2743        );
2744
2745        // push the 3rd right chunk (tests forward_exactly_once)
2746        tx_r.push_chunk(chunk_r3);
2747        let chunk = hash_join.next_unwrap_ready_chunk()?;
2748        assert_eq!(
2749            chunk,
2750            StreamChunk::from_pretty(
2751                " I I
2752                + 9 10"
2753            )
2754        );
2755
2756        // push the 3rd left chunk
2757        // (tests that no change if there are still matches)
2758        tx_l.push_chunk(chunk_l3);
2759        hash_join.next_unwrap_pending();
2760
2761        // push the 4th left chunk
2762        // (tests that insertion occurs when there are no more matches)
2763        tx_l.push_chunk(chunk_l4);
2764        let chunk = hash_join.next_unwrap_ready_chunk()?;
2765        assert_eq!(
2766            chunk,
2767            StreamChunk::from_pretty(
2768                " I I
2769                + 1 4"
2770            )
2771        );
2772
2773        Ok(())
2774    }
2775
2776    #[tokio::test]
2777    async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2778        let chunk_l1 = StreamChunk::from_pretty(
2779            "  I I
2780             + 1 4
2781             + 2 5
2782             + 3 6",
2783        );
2784        let chunk_l2 = StreamChunk::from_pretty(
2785            "  I I
2786             + 6 8
2787             + 3 8",
2788        );
2789        let chunk_r1 = StreamChunk::from_pretty(
2790            "  I I
2791             + 2 7
2792             + 4 8
2793             + 6 9",
2794        );
2795        let chunk_r2 = StreamChunk::from_pretty(
2796            "  I  I
2797             + 3 10
2798             + 6 11",
2799        );
2800        let (mut tx_l, mut tx_r, mut hash_join) =
2801            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2802
2803        // push the init barrier for left and right
2804        tx_l.push_barrier(test_epoch(1), false);
2805        tx_r.push_barrier(test_epoch(1), false);
2806        hash_join.next_unwrap_ready_barrier()?;
2807
2808        // push the 1st left chunk
2809        tx_l.push_chunk(chunk_l1);
2810        hash_join.next_unwrap_pending();
2811
2812        // push a barrier to left side
2813        tx_l.push_barrier(test_epoch(2), false);
2814
2815        // push the 2nd left chunk
2816        tx_l.push_chunk(chunk_l2);
2817
2818        // join the first right chunk
2819        tx_r.push_chunk(chunk_r1);
2820
2821        // Consume stream chunk
2822        let chunk = hash_join.next_unwrap_ready_chunk()?;
2823        assert_eq!(
2824            chunk,
2825            StreamChunk::from_pretty(
2826                " I I I I
2827                + 2 5 2 7"
2828            )
2829        );
2830
2831        // push a barrier to right side
2832        tx_r.push_barrier(test_epoch(2), false);
2833
2834        // get the aligned barrier here
2835        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2836        assert!(matches!(
2837            hash_join.next_unwrap_ready_barrier()?,
2838            Barrier {
2839                epoch,
2840                mutation: None,
2841                ..
2842            } if epoch == expected_epoch
2843        ));
2844
2845        // join the 2nd left chunk
2846        let chunk = hash_join.next_unwrap_ready_chunk()?;
2847        assert_eq!(
2848            chunk,
2849            StreamChunk::from_pretty(
2850                " I I I I
2851                + 6 8 6 9"
2852            )
2853        );
2854
2855        // push the 2nd right chunk
2856        tx_r.push_chunk(chunk_r2);
2857        let chunk = hash_join.next_unwrap_ready_chunk()?;
2858        assert_eq!(
2859            chunk,
2860            StreamChunk::from_pretty(
2861                " I I I I
2862                + 3 6 3 10
2863                + 3 8 3 10
2864                + 6 8 6 11"
2865            )
2866        );
2867
2868        Ok(())
2869    }
2870
2871    #[tokio::test]
2872    async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2873        let chunk_l1 = StreamChunk::from_pretty(
2874            "  I I
2875             + 1 4
2876             + 2 .
2877             + 3 .",
2878        );
2879        let chunk_l2 = StreamChunk::from_pretty(
2880            "  I I
2881             + 6 .
2882             + 3 8",
2883        );
2884        let chunk_r1 = StreamChunk::from_pretty(
2885            "  I I
2886             + 2 7
2887             + 4 8
2888             + 6 9",
2889        );
2890        let chunk_r2 = StreamChunk::from_pretty(
2891            "  I  I
2892             + 3 10
2893             + 6 11",
2894        );
2895        let (mut tx_l, mut tx_r, mut hash_join) =
2896            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2897
2898        // push the init barrier for left and right
2899        tx_l.push_barrier(test_epoch(1), false);
2900        tx_r.push_barrier(test_epoch(1), false);
2901        hash_join.next_unwrap_ready_barrier()?;
2902
2903        // push the 1st left chunk
2904        tx_l.push_chunk(chunk_l1);
2905        hash_join.next_unwrap_pending();
2906
2907        // push a barrier to left side
2908        tx_l.push_barrier(test_epoch(2), false);
2909
2910        // push the 2nd left chunk
2911        tx_l.push_chunk(chunk_l2);
2912
2913        // join the first right chunk
2914        tx_r.push_chunk(chunk_r1);
2915
2916        // Consume stream chunk
2917        let chunk = hash_join.next_unwrap_ready_chunk()?;
2918        assert_eq!(
2919            chunk,
2920            StreamChunk::from_pretty(
2921                " I I I I
2922                + 2 . 2 7"
2923            )
2924        );
2925
2926        // push a barrier to right side
2927        tx_r.push_barrier(test_epoch(2), false);
2928
2929        // get the aligned barrier here
2930        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2931        assert!(matches!(
2932            hash_join.next_unwrap_ready_barrier()?,
2933            Barrier {
2934                epoch,
2935                mutation: None,
2936                ..
2937            } if epoch == expected_epoch
2938        ));
2939
2940        // join the 2nd left chunk
2941        let chunk = hash_join.next_unwrap_ready_chunk()?;
2942        assert_eq!(
2943            chunk,
2944            StreamChunk::from_pretty(
2945                " I I I I
2946                + 6 . 6 9"
2947            )
2948        );
2949
2950        // push the 2nd right chunk
2951        tx_r.push_chunk(chunk_r2);
2952        let chunk = hash_join.next_unwrap_ready_chunk()?;
2953        assert_eq!(
2954            chunk,
2955            StreamChunk::from_pretty(
2956                " I I I I
2957                + 3 8 3 10
2958                + 3 . 3 10
2959                + 6 . 6 11"
2960            )
2961        );
2962
2963        Ok(())
2964    }
2965
2966    #[tokio::test]
2967    async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2968        let chunk_l1 = StreamChunk::from_pretty(
2969            "  I I
2970             + 1 4
2971             + 2 5
2972             + 3 6",
2973        );
2974        let chunk_l2 = StreamChunk::from_pretty(
2975            "  I I
2976             + 3 8
2977             - 3 8",
2978        );
2979        let chunk_r1 = StreamChunk::from_pretty(
2980            "  I I
2981             + 2 7
2982             + 4 8
2983             + 6 9",
2984        );
2985        let chunk_r2 = StreamChunk::from_pretty(
2986            "  I  I
2987             + 3 10
2988             + 6 11",
2989        );
2990        let (mut tx_l, mut tx_r, mut hash_join) =
2991            create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2992
2993        // push the init barrier for left and right
2994        tx_l.push_barrier(test_epoch(1), false);
2995        tx_r.push_barrier(test_epoch(1), false);
2996        hash_join.next_unwrap_ready_barrier()?;
2997
2998        // push the 1st left chunk
2999        tx_l.push_chunk(chunk_l1);
3000        let chunk = hash_join.next_unwrap_ready_chunk()?;
3001        assert_eq!(
3002            chunk,
3003            StreamChunk::from_pretty(
3004                " I I I I
3005                + 1 4 . .
3006                + 2 5 . .
3007                + 3 6 . ."
3008            )
3009        );
3010
3011        // push the 2nd left chunk
3012        tx_l.push_chunk(chunk_l2);
3013        let chunk = hash_join.next_unwrap_ready_chunk()?;
3014        assert_eq!(
3015            chunk,
3016            StreamChunk::from_pretty(
3017                " I I I I
3018                + 3 8 . . D
3019                - 3 8 . . D"
3020            )
3021        );
3022
3023        // push the 1st right chunk
3024        tx_r.push_chunk(chunk_r1);
3025        let chunk = hash_join.next_unwrap_ready_chunk()?;
3026        assert_eq!(
3027            chunk,
3028            StreamChunk::from_pretty(
3029                " I I I I
3030                - 2 5 . .
3031                + 2 5 2 7"
3032            )
3033        );
3034
3035        // push the 2nd right chunk
3036        tx_r.push_chunk(chunk_r2);
3037        let chunk = hash_join.next_unwrap_ready_chunk()?;
3038        assert_eq!(
3039            chunk,
3040            StreamChunk::from_pretty(
3041                " I I I I
3042                - 3 6 . .
3043                + 3 6 3 10"
3044            )
3045        );
3046
3047        Ok(())
3048    }
3049
3050    #[tokio::test]
3051    async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
3052        let chunk_l1 = StreamChunk::from_pretty(
3053            "  I I
3054             + 1 4
3055             + 2 5
3056             + . 6",
3057        );
3058        let chunk_l2 = StreamChunk::from_pretty(
3059            "  I I
3060             + . 8
3061             - . 8",
3062        );
3063        let chunk_r1 = StreamChunk::from_pretty(
3064            "  I I
3065             + 2 7
3066             + 4 8
3067             + 6 9",
3068        );
3069        let chunk_r2 = StreamChunk::from_pretty(
3070            "  I  I
3071             + . 10
3072             + 6 11",
3073        );
3074        let (mut tx_l, mut tx_r, mut hash_join) =
3075            create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
3076
3077        // push the init barrier for left and right
3078        tx_l.push_barrier(test_epoch(1), false);
3079        tx_r.push_barrier(test_epoch(1), false);
3080        hash_join.next_unwrap_ready_barrier()?;
3081
3082        // push the 1st left chunk
3083        tx_l.push_chunk(chunk_l1);
3084        let chunk = hash_join.next_unwrap_ready_chunk()?;
3085        assert_eq!(
3086            chunk,
3087            StreamChunk::from_pretty(
3088                " I I I I
3089                + 1 4 . .
3090                + 2 5 . .
3091                + . 6 . ."
3092            )
3093        );
3094
3095        // push the 2nd left chunk
3096        tx_l.push_chunk(chunk_l2);
3097        let chunk = hash_join.next_unwrap_ready_chunk()?;
3098        assert_eq!(
3099            chunk,
3100            StreamChunk::from_pretty(
3101                " I I I I
3102                + . 8 . . D
3103                - . 8 . . D"
3104            )
3105        );
3106
3107        // push the 1st right chunk
3108        tx_r.push_chunk(chunk_r1);
3109        let chunk = hash_join.next_unwrap_ready_chunk()?;
3110        assert_eq!(
3111            chunk,
3112            StreamChunk::from_pretty(
3113                " I I I I
3114                - 2 5 . .
3115                + 2 5 2 7"
3116            )
3117        );
3118
3119        // push the 2nd right chunk
3120        tx_r.push_chunk(chunk_r2);
3121        let chunk = hash_join.next_unwrap_ready_chunk()?;
3122        assert_eq!(
3123            chunk,
3124            StreamChunk::from_pretty(
3125                " I I I I
3126                - . 6 . .
3127                + . 6 . 10"
3128            )
3129        );
3130
3131        Ok(())
3132    }
3133
3134    #[tokio::test]
3135    async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
3136        let chunk_l1 = StreamChunk::from_pretty(
3137            "  I I
3138             + 1 4
3139             + 2 5
3140             + 3 6",
3141        );
3142        let chunk_l2 = StreamChunk::from_pretty(
3143            "  I I
3144             + 3 8
3145             - 3 8",
3146        );
3147        let chunk_r1 = StreamChunk::from_pretty(
3148            "  I I
3149             + 2 7
3150             + 4 8
3151             + 6 9",
3152        );
3153        let chunk_r2 = StreamChunk::from_pretty(
3154            "  I  I
3155             + 5 10
3156             - 5 10",
3157        );
3158        let (mut tx_l, mut tx_r, mut hash_join) =
3159            create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
3160
3161        // push the init barrier for left and right
3162        tx_l.push_barrier(test_epoch(1), false);
3163        tx_r.push_barrier(test_epoch(1), false);
3164        hash_join.next_unwrap_ready_barrier()?;
3165
3166        // push the 1st left chunk
3167        tx_l.push_chunk(chunk_l1);
3168        hash_join.next_unwrap_pending();
3169
3170        // push the 2nd left chunk
3171        tx_l.push_chunk(chunk_l2);
3172        hash_join.next_unwrap_pending();
3173
3174        // push the 1st right chunk
3175        tx_r.push_chunk(chunk_r1);
3176        let chunk = hash_join.next_unwrap_ready_chunk()?;
3177        assert_eq!(
3178            chunk,
3179            StreamChunk::from_pretty(
3180                " I I I I
3181                + 2 5 2 7
3182                + . . 4 8
3183                + . . 6 9"
3184            )
3185        );
3186
3187        // push the 2nd right chunk
3188        tx_r.push_chunk(chunk_r2);
3189        let chunk = hash_join.next_unwrap_ready_chunk()?;
3190        assert_eq!(
3191            chunk,
3192            StreamChunk::from_pretty(
3193                " I I I I
3194                + . . 5 10 D
3195                - . . 5 10 D"
3196            )
3197        );
3198
3199        Ok(())
3200    }
3201
3202    #[tokio::test]
3203    async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3204        let chunk_l1 = StreamChunk::from_pretty(
3205            "  I I I
3206             + 1 4 1
3207             + 2 5 2
3208             + 3 6 3",
3209        );
3210        let chunk_l2 = StreamChunk::from_pretty(
3211            "  I I I
3212             + 4 9 4
3213             + 5 10 5",
3214        );
3215        let chunk_r1 = StreamChunk::from_pretty(
3216            "  I I I
3217             + 2 5 1
3218             + 4 9 2
3219             + 6 9 3",
3220        );
3221        let chunk_r2 = StreamChunk::from_pretty(
3222            "  I I I
3223             + 1 4 4
3224             + 3 6 5",
3225        );
3226
3227        let (mut tx_l, mut tx_r, mut hash_join) =
3228            create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3229
3230        // push the init barrier for left and right
3231        tx_l.push_barrier(test_epoch(1), false);
3232        tx_r.push_barrier(test_epoch(1), false);
3233        hash_join.next_unwrap_ready_barrier()?;
3234
3235        // push the 1st left chunk
3236        tx_l.push_chunk(chunk_l1);
3237        let chunk = hash_join.next_unwrap_ready_chunk()?;
3238        assert_eq!(
3239            chunk,
3240            StreamChunk::from_pretty(
3241                " I I I I I I
3242                + 1 4 1 . . .
3243                + 2 5 2 . . .
3244                + 3 6 3 . . ."
3245            )
3246        );
3247
3248        // push the 2nd left chunk
3249        tx_l.push_chunk(chunk_l2);
3250        let chunk = hash_join.next_unwrap_ready_chunk()?;
3251        assert_eq!(
3252            chunk,
3253            StreamChunk::from_pretty(
3254                " I I I I I I
3255                + 4 9 4 . . .
3256                + 5 10 5 . . ."
3257            )
3258        );
3259
3260        // push the 1st right chunk
3261        tx_r.push_chunk(chunk_r1);
3262        let chunk = hash_join.next_unwrap_ready_chunk()?;
3263        assert_eq!(
3264            chunk,
3265            StreamChunk::from_pretty(
3266                " I I I I I I
3267                - 2 5 2 . . .
3268                + 2 5 2 2 5 1
3269                - 4 9 4 . . .
3270                + 4 9 4 4 9 2"
3271            )
3272        );
3273
3274        // push the 2nd right chunk
3275        tx_r.push_chunk(chunk_r2);
3276        let chunk = hash_join.next_unwrap_ready_chunk()?;
3277        assert_eq!(
3278            chunk,
3279            StreamChunk::from_pretty(
3280                " I I I I I I
3281                - 1 4 1 . . .
3282                + 1 4 1 1 4 4
3283                - 3 6 3 . . .
3284                + 3 6 3 3 6 5"
3285            )
3286        );
3287
3288        Ok(())
3289    }
3290
3291    #[tokio::test]
3292    async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3293        let chunk_l1 = StreamChunk::from_pretty(
3294            "  I I I
3295             + 1 4 1
3296             + 2 5 2
3297             + 3 6 3",
3298        );
3299        let chunk_l2 = StreamChunk::from_pretty(
3300            "  I I I
3301             + 4 9 4
3302             + 5 10 5",
3303        );
3304        let chunk_r1 = StreamChunk::from_pretty(
3305            "  I I I
3306             + 2 5 1
3307             + 4 9 2
3308             + 6 9 3",
3309        );
3310        let chunk_r2 = StreamChunk::from_pretty(
3311            "  I I I
3312             + 1 4 4
3313             + 3 6 5
3314             + 7 7 6",
3315        );
3316
3317        let (mut tx_l, mut tx_r, mut hash_join) =
3318            create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3319
3320        // push the init barrier for left and right
3321        tx_l.push_barrier(test_epoch(1), false);
3322        tx_r.push_barrier(test_epoch(1), false);
3323        hash_join.next_unwrap_ready_barrier()?;
3324
3325        // push the 1st left chunk
3326        tx_l.push_chunk(chunk_l1);
3327        hash_join.next_unwrap_pending();
3328
3329        // push the 2nd left chunk
3330        tx_l.push_chunk(chunk_l2);
3331        hash_join.next_unwrap_pending();
3332
3333        // push the 1st right chunk
3334        tx_r.push_chunk(chunk_r1);
3335        let chunk = hash_join.next_unwrap_ready_chunk()?;
3336        assert_eq!(
3337            chunk,
3338            StreamChunk::from_pretty(
3339                "  I I I I I I
3340                + 2 5 2 2 5 1
3341                + 4 9 4 4 9 2
3342                + . . . 6 9 3"
3343            )
3344        );
3345
3346        // push the 2nd right chunk
3347        tx_r.push_chunk(chunk_r2);
3348        let chunk = hash_join.next_unwrap_ready_chunk()?;
3349        assert_eq!(
3350            chunk,
3351            StreamChunk::from_pretty(
3352                "  I I I I I I
3353                + 1 4 1 1 4 4
3354                + 3 6 3 3 6 5
3355                + . . . 7 7 6"
3356            )
3357        );
3358
3359        Ok(())
3360    }
3361
3362    #[tokio::test]
3363    async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3364        let chunk_l1 = StreamChunk::from_pretty(
3365            "  I I
3366             + 1 4
3367             + 2 5
3368             + 3 6",
3369        );
3370        let chunk_l2 = StreamChunk::from_pretty(
3371            "  I I
3372             + 3 8
3373             - 3 8",
3374        );
3375        let chunk_r1 = StreamChunk::from_pretty(
3376            "  I I
3377             + 2 7
3378             + 4 8
3379             + 6 9",
3380        );
3381        let chunk_r2 = StreamChunk::from_pretty(
3382            "  I  I
3383             + 5 10
3384             - 5 10",
3385        );
3386        let (mut tx_l, mut tx_r, mut hash_join) =
3387            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3388
3389        // push the init barrier for left and right
3390        tx_l.push_barrier(test_epoch(1), false);
3391        tx_r.push_barrier(test_epoch(1), false);
3392        hash_join.next_unwrap_ready_barrier()?;
3393
3394        // push the 1st left chunk
3395        tx_l.push_chunk(chunk_l1);
3396        let chunk = hash_join.next_unwrap_ready_chunk()?;
3397        assert_eq!(
3398            chunk,
3399            StreamChunk::from_pretty(
3400                " I I I I
3401                + 1 4 . .
3402                + 2 5 . .
3403                + 3 6 . ."
3404            )
3405        );
3406
3407        // push the 2nd left chunk
3408        tx_l.push_chunk(chunk_l2);
3409        let chunk = hash_join.next_unwrap_ready_chunk()?;
3410        assert_eq!(
3411            chunk,
3412            StreamChunk::from_pretty(
3413                " I I I I
3414                + 3 8 . . D
3415                - 3 8 . . D"
3416            )
3417        );
3418
3419        // push the 1st right chunk
3420        tx_r.push_chunk(chunk_r1);
3421        let chunk = hash_join.next_unwrap_ready_chunk()?;
3422        assert_eq!(
3423            chunk,
3424            StreamChunk::from_pretty(
3425                " I I I I
3426                - 2 5 . .
3427                + 2 5 2 7
3428                + . . 4 8
3429                + . . 6 9"
3430            )
3431        );
3432
3433        // push the 2nd right chunk
3434        tx_r.push_chunk(chunk_r2);
3435        let chunk = hash_join.next_unwrap_ready_chunk()?;
3436        assert_eq!(
3437            chunk,
3438            StreamChunk::from_pretty(
3439                " I I I I
3440                + . . 5 10 D
3441                - . . 5 10 D"
3442            )
3443        );
3444
3445        Ok(())
3446    }
3447
3448    #[tokio::test]
3449    async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3450        let (mut tx_l, mut tx_r, mut hash_join) =
3451            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3452
3453        // push the init barrier for left and right
3454        tx_l.push_barrier(test_epoch(1), false);
3455        tx_r.push_barrier(test_epoch(1), false);
3456        hash_join.next_unwrap_ready_barrier()?;
3457
3458        tx_l.push_chunk(StreamChunk::from_pretty(
3459            "  I I
3460             + 1 1
3461            ",
3462        ));
3463        let chunk = hash_join.next_unwrap_ready_chunk()?;
3464        assert_eq!(
3465            chunk,
3466            StreamChunk::from_pretty(
3467                " I I I I
3468                + 1 1 . ."
3469            )
3470        );
3471
3472        tx_r.push_chunk(StreamChunk::from_pretty(
3473            "  I I
3474             + 1 1
3475            ",
3476        ));
3477        let chunk = hash_join.next_unwrap_ready_chunk()?;
3478
3479        assert_eq!(
3480            chunk,
3481            StreamChunk::from_pretty(
3482                " I I I I
3483                - 1 1 . .
3484                + 1 1 1 1"
3485            )
3486        );
3487
3488        tx_l.push_chunk(StreamChunk::from_pretty(
3489            "   I I
3490              - 1 1
3491              + 1 2
3492            ",
3493        ));
3494        let chunk = hash_join.next_unwrap_ready_chunk()?;
3495        let chunk = chunk.compact_vis();
3496        assert_eq!(
3497            chunk,
3498            StreamChunk::from_pretty(
3499                " I I I I
3500                - 1 1 1 1
3501                + 1 2 1 1
3502                "
3503            )
3504        );
3505
3506        Ok(())
3507    }
3508
3509    #[tokio::test]
3510    async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3511    {
3512        let chunk_l1 = StreamChunk::from_pretty(
3513            "  I I
3514             + 1 4
3515             + 2 5
3516             + 3 6
3517             + 3 7",
3518        );
3519        let chunk_l2 = StreamChunk::from_pretty(
3520            "  I I
3521             + 3 8
3522             - 3 8
3523             - 1 4", // delete row to cause an empty JoinHashEntry
3524        );
3525        let chunk_r1 = StreamChunk::from_pretty(
3526            "  I I
3527             + 2 6
3528             + 4 8
3529             + 3 4",
3530        );
3531        let chunk_r2 = StreamChunk::from_pretty(
3532            "  I  I
3533             + 5 10
3534             - 5 10
3535             + 1 2",
3536        );
3537        let (mut tx_l, mut tx_r, mut hash_join) =
3538            create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3539
3540        // push the init barrier for left and right
3541        tx_l.push_barrier(test_epoch(1), false);
3542        tx_r.push_barrier(test_epoch(1), false);
3543        hash_join.next_unwrap_ready_barrier()?;
3544
3545        // push the 1st left chunk
3546        tx_l.push_chunk(chunk_l1);
3547        let chunk = hash_join.next_unwrap_ready_chunk()?;
3548        assert_eq!(
3549            chunk,
3550            StreamChunk::from_pretty(
3551                " I I I I
3552                + 1 4 . .
3553                + 2 5 . .
3554                + 3 6 . .
3555                + 3 7 . ."
3556            )
3557        );
3558
3559        // push the 2nd left chunk
3560        tx_l.push_chunk(chunk_l2);
3561        let chunk = hash_join.next_unwrap_ready_chunk()?;
3562        assert_eq!(
3563            chunk,
3564            StreamChunk::from_pretty(
3565                " I I I I
3566                + 3 8 . . D
3567                - 3 8 . . D
3568                - 1 4 . ."
3569            )
3570        );
3571
3572        // push the 1st right chunk
3573        tx_r.push_chunk(chunk_r1);
3574        let chunk = hash_join.next_unwrap_ready_chunk()?;
3575        assert_eq!(
3576            chunk,
3577            StreamChunk::from_pretty(
3578                " I I I I
3579                - 2 5 . .
3580                + 2 5 2 6
3581                + . . 4 8
3582                + . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once
3583                            * despite matching on eq join on 2
3584                            * entries */
3585            )
3586        );
3587
3588        // push the 2nd right chunk
3589        tx_r.push_chunk(chunk_r2);
3590        let chunk = hash_join.next_unwrap_ready_chunk()?;
3591        assert_eq!(
3592            chunk,
3593            StreamChunk::from_pretty(
3594                " I I I I
3595                + . . 5 10 D
3596                - . . 5 10 D
3597                + . . 1 2" /* regression test (#2420): 1 2 forwarded even if matches on an empty
3598                            * join entry */
3599            )
3600        );
3601
3602        Ok(())
3603    }
3604
3605    #[tokio::test]
3606    async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3607        let chunk_l1 = StreamChunk::from_pretty(
3608            "  I I
3609             + 1 4
3610             + 2 10
3611             + 3 6",
3612        );
3613        let chunk_l2 = StreamChunk::from_pretty(
3614            "  I I
3615             + 3 8
3616             - 3 8",
3617        );
3618        let chunk_r1 = StreamChunk::from_pretty(
3619            "  I I
3620             + 2 7
3621             + 4 8
3622             + 6 9",
3623        );
3624        let chunk_r2 = StreamChunk::from_pretty(
3625            "  I  I
3626             + 3 10
3627             + 6 11",
3628        );
3629        let (mut tx_l, mut tx_r, mut hash_join) =
3630            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3631
3632        // push the init barrier for left and right
3633        tx_l.push_barrier(test_epoch(1), false);
3634        tx_r.push_barrier(test_epoch(1), false);
3635        hash_join.next_unwrap_ready_barrier()?;
3636
3637        // push the 1st left chunk
3638        tx_l.push_chunk(chunk_l1);
3639        hash_join.next_unwrap_pending();
3640
3641        // push the 2nd left chunk
3642        tx_l.push_chunk(chunk_l2);
3643        hash_join.next_unwrap_pending();
3644
3645        // push the 1st right chunk
3646        tx_r.push_chunk(chunk_r1);
3647        hash_join.next_unwrap_pending();
3648
3649        // push the 2nd right chunk
3650        tx_r.push_chunk(chunk_r2);
3651        let chunk = hash_join.next_unwrap_ready_chunk()?;
3652        assert_eq!(
3653            chunk,
3654            StreamChunk::from_pretty(
3655                " I I I I
3656                + 3 6 3 10"
3657            )
3658        );
3659
3660        Ok(())
3661    }
3662
3663    #[tokio::test]
3664    async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3665        let (mut tx_l, mut tx_r, mut hash_join) =
3666            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3667
3668        // push the init barrier for left and right
3669        tx_l.push_barrier(test_epoch(1), false);
3670        tx_r.push_barrier(test_epoch(1), false);
3671        hash_join.next_unwrap_ready_barrier()?;
3672
3673        tx_l.push_int64_watermark(0, 100);
3674
3675        tx_l.push_int64_watermark(0, 200);
3676
3677        tx_l.push_barrier(test_epoch(2), false);
3678        tx_r.push_barrier(test_epoch(2), false);
3679        hash_join.next_unwrap_ready_barrier()?;
3680
3681        tx_r.push_int64_watermark(0, 50);
3682
3683        let w1 = hash_join.next().await.unwrap().unwrap();
3684        let w1 = w1.as_watermark().unwrap();
3685
3686        let w2 = hash_join.next().await.unwrap().unwrap();
3687        let w2 = w2.as_watermark().unwrap();
3688
3689        tx_r.push_int64_watermark(0, 100);
3690
3691        let w3 = hash_join.next().await.unwrap().unwrap();
3692        let w3 = w3.as_watermark().unwrap();
3693
3694        let w4 = hash_join.next().await.unwrap().unwrap();
3695        let w4 = w4.as_watermark().unwrap();
3696
3697        assert_eq!(
3698            w1,
3699            &Watermark {
3700                col_idx: 2,
3701                data_type: DataType::Int64,
3702                val: ScalarImpl::Int64(50)
3703            }
3704        );
3705
3706        assert_eq!(
3707            w2,
3708            &Watermark {
3709                col_idx: 0,
3710                data_type: DataType::Int64,
3711                val: ScalarImpl::Int64(50)
3712            }
3713        );
3714
3715        assert_eq!(
3716            w3,
3717            &Watermark {
3718                col_idx: 2,
3719                data_type: DataType::Int64,
3720                val: ScalarImpl::Int64(100)
3721            }
3722        );
3723
3724        assert_eq!(
3725            w4,
3726            &Watermark {
3727                col_idx: 0,
3728                data_type: DataType::Int64,
3729                val: ScalarImpl::Int64(100)
3730            }
3731        );
3732
3733        Ok(())
3734    }
3735
3736    async fn create_executor_with_evict_interval<const T: JoinTypePrimitive>(
3737        evict_interval: u32,
3738    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
3739        let schema = Schema {
3740            fields: vec![
3741                Field::unnamed(DataType::Int64), // join key
3742                Field::unnamed(DataType::Int64),
3743            ],
3744        };
3745        let (tx_l, source_l) = MockSource::channel();
3746        let source_l = source_l.into_executor(schema.clone(), vec![1]);
3747        let (tx_r, source_r) = MockSource::channel();
3748        let source_r = source_r.into_executor(schema, vec![1]);
3749        let params_l = JoinParams::new(vec![0], vec![1]);
3750        let params_r = JoinParams::new(vec![0], vec![1]);
3751
3752        let mem_state = MemoryStateStore::new();
3753
3754        let (state_l, degree_state_l) = create_in_memory_state_table(
3755            mem_state.clone(),
3756            &[DataType::Int64, DataType::Int64],
3757            &[OrderType::ascending(), OrderType::ascending()],
3758            &[0, 1],
3759            0,
3760        )
3761        .await;
3762
3763        let (state_r, degree_state_r) = create_in_memory_state_table(
3764            mem_state,
3765            &[DataType::Int64, DataType::Int64],
3766            &[OrderType::ascending(), OrderType::ascending()],
3767            &[0, 1],
3768            2,
3769        )
3770        .await;
3771
3772        let schema = match T {
3773            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
3774            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
3775            _ => [source_l.schema().fields(), source_r.schema().fields()]
3776                .concat()
3777                .into_iter()
3778                .collect(),
3779        };
3780        let schema_len = schema.len();
3781        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
3782
3783        let mut streaming_config = StreamingConfig::default();
3784        streaming_config.developer.join_hash_map_evict_interval_rows = evict_interval;
3785
3786        let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
3787            ActorContext::for_test_with_config(123, streaming_config),
3788            info,
3789            source_l,
3790            source_r,
3791            params_l,
3792            params_r,
3793            vec![false],
3794            (0..schema_len).collect_vec(),
3795            None,
3796            vec![],
3797            state_l,
3798            degree_state_l,
3799            state_r,
3800            degree_state_r,
3801            Arc::new(AtomicU64::new(0)),
3802            false,
3803            Arc::new(StreamingMetrics::unused()),
3804            1024,
3805            2048,
3806            vec![(0, true)],
3807        );
3808        (tx_l, tx_r, executor.boxed().execute())
3809    }
3810
3811    /// Tests that a hash join executor with `join_hash_map_evict_interval_rows = 0`
3812    /// (periodic eviction disabled) produces correct results.
3813    #[tokio::test]
3814    async fn test_hash_join_evict_interval_disabled() -> StreamExecutorResult<()> {
3815        let chunk_l = StreamChunk::from_pretty(
3816            "  I I
3817             + 1 4
3818             + 2 5
3819             + 3 6",
3820        );
3821        let chunk_r = StreamChunk::from_pretty(
3822            "  I I
3823             + 2 7
3824             + 3 8",
3825        );
3826
3827        // 0 disables periodic eviction
3828        let (mut tx_l, mut tx_r, mut hash_join) =
3829            create_executor_with_evict_interval::<{ JoinType::Inner }>(0).await;
3830
3831        tx_l.push_barrier(test_epoch(1), false);
3832        tx_r.push_barrier(test_epoch(1), false);
3833        hash_join.next_unwrap_ready_barrier()?;
3834
3835        tx_l.push_chunk(chunk_l);
3836        hash_join.next_unwrap_pending();
3837
3838        tx_r.push_chunk(chunk_r);
3839        let chunk = hash_join.next_unwrap_ready_chunk()?;
3840        assert_eq!(
3841            chunk,
3842            StreamChunk::from_pretty(
3843                " I I I I
3844                + 2 5 2 7
3845                + 3 6 3 8"
3846            )
3847        );
3848
3849        Ok(())
3850    }
3851
3852    /// Tests that a hash join executor with `join_hash_map_evict_interval_rows = 1`
3853    /// (evict after every row) produces correct results.
3854    #[tokio::test]
3855    async fn test_hash_join_evict_interval_one() -> StreamExecutorResult<()> {
3856        let chunk_l = StreamChunk::from_pretty(
3857            "  I I
3858             + 1 4
3859             + 2 5
3860             + 3 6",
3861        );
3862        let chunk_r = StreamChunk::from_pretty(
3863            "  I I
3864             + 2 7
3865             + 3 8",
3866        );
3867
3868        // evict after every row
3869        let (mut tx_l, mut tx_r, mut hash_join) =
3870            create_executor_with_evict_interval::<{ JoinType::Inner }>(1).await;
3871
3872        tx_l.push_barrier(test_epoch(1), false);
3873        tx_r.push_barrier(test_epoch(1), false);
3874        hash_join.next_unwrap_ready_barrier()?;
3875
3876        tx_l.push_chunk(chunk_l);
3877        hash_join.next_unwrap_pending();
3878
3879        tx_r.push_chunk(chunk_r);
3880        let chunk = hash_join.next_unwrap_ready_chunk()?;
3881        assert_eq!(
3882            chunk,
3883            StreamChunk::from_pretty(
3884                " I I I I
3885                + 2 5 2 7
3886                + 3 6 3 8"
3887            )
3888        );
3889
3890        Ok(())
3891    }
3892
3893    /// Tests that a hash join executor with a custom `join_hash_map_evict_interval_rows`
3894    /// value produces correct results.
3895    #[tokio::test]
3896    async fn test_hash_join_evict_interval_custom() -> StreamExecutorResult<()> {
3897        let chunk_l = StreamChunk::from_pretty(
3898            "  I I
3899             + 1 4
3900             + 2 5
3901             + 3 6
3902             + 4 7
3903             + 5 8",
3904        );
3905        let chunk_r = StreamChunk::from_pretty(
3906            "  I I
3907             + 1 9
3908             + 3 10
3909             + 5 11",
3910        );
3911
3912        // evict every 2 rows
3913        let (mut tx_l, mut tx_r, mut hash_join) =
3914            create_executor_with_evict_interval::<{ JoinType::Inner }>(2).await;
3915
3916        tx_l.push_barrier(test_epoch(1), false);
3917        tx_r.push_barrier(test_epoch(1), false);
3918        hash_join.next_unwrap_ready_barrier()?;
3919
3920        tx_l.push_chunk(chunk_l);
3921        hash_join.next_unwrap_pending();
3922
3923        tx_r.push_chunk(chunk_r);
3924        let chunk = hash_join.next_unwrap_ready_chunk()?;
3925        assert_eq!(
3926            chunk,
3927            StreamChunk::from_pretty(
3928                " I I I I
3929                + 1 4 1 9
3930                + 3 6 3 10
3931                + 5 8 5 11"
3932            )
3933        );
3934
3935        Ok(())
3936    }
3937}