risingwave_stream/executor/
hash_join.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{BTreeMap, HashSet};
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use anyhow::Context;
21use either::Either;
22use itertools::Itertools;
23use multimap::MultiMap;
24use risingwave_common::array::Op;
25use risingwave_common::hash::{HashKey, NullBitmap};
26use risingwave_common::row::RowExt;
27use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::iter_util::ZipEqDebug;
30use risingwave_expr::expr::NonStrictExpression;
31use risingwave_pb::stream_plan::InequalityType;
32use tokio::time::Instant;
33
34use self::builder::JoinChunkBuilder;
35use super::barrier_align::*;
36use super::join::hash_join::*;
37use super::join::row::{JoinEncoding, JoinRow};
38use super::join::*;
39use super::watermark::*;
40use crate::executor::CachedJoinRow;
41use crate::executor::join::builder::JoinStreamChunkBuilder;
42use crate::executor::join::hash_join::CacheResult;
43use crate::executor::prelude::*;
44
45fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
46    HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
47}
48
49/// Information about an inequality condition in the join.
50/// Represents: `left_col <op> right_col` where op is one of `<`, `<=`, `>`, `>=`.
51#[derive(Debug, Clone)]
52pub struct InequalityPairInfo {
53    /// Index of the left side column (from left input).
54    pub left_idx: usize,
55    /// Index of the right side column (from right input).
56    pub right_idx: usize,
57    /// Whether this condition is used to clean left state table.
58    pub clean_left_state: bool,
59    /// Whether this condition is used to clean right state table.
60    pub clean_right_state: bool,
61    /// Comparison operator.
62    pub op: InequalityType,
63}
64
65impl InequalityPairInfo {
66    /// Returns true if left side has larger values (`>` or `>=`).
67    pub fn left_side_is_larger(&self) -> bool {
68        matches!(
69            self.op,
70            InequalityType::GreaterThan | InequalityType::GreaterThanOrEqual
71        )
72    }
73}
74
75pub struct JoinParams {
76    /// Indices of the join keys
77    pub join_key_indices: Vec<usize>,
78    /// Indices of the input pk after dedup
79    pub deduped_pk_indices: Vec<usize>,
80}
81
82impl JoinParams {
83    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
84        Self {
85            join_key_indices,
86            deduped_pk_indices,
87        }
88    }
89}
90
91struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
92    /// Store all data from a one side stream
93    ht: JoinHashMap<K, S, E>,
94    /// Indices of the join key columns
95    join_key_indices: Vec<usize>,
96    /// The data type of all columns without degree.
97    all_data_types: Vec<DataType>,
98    /// The start position for the side in output new columns
99    start_pos: usize,
100    /// The mapping from input indices of a side to output columns.
101    i2o_mapping: Vec<(usize, usize)>,
102    i2o_mapping_indexed: MultiMap<usize, usize>,
103    /// The first field of the ith element indicates that when a watermark at the ith column of
104    /// this side comes, what band join conditions should be updated in order to possibly
105    /// generate a new watermark at that column or the corresponding column in the counterpart
106    /// join side.
107    ///
108    /// The second field indicates that whether the column is required less than the
109    /// the corresponding column in the counterpart join side in the band join condition.
110    input2inequality_index: Vec<Vec<(usize, bool)>>,
111    /// Some fields which are required non null to match due to inequalities.
112    non_null_fields: Vec<usize>,
113    /// (i, j) in this `Vec` means that state data in this join side can be cleaned if the value of
114    /// its ith column is less than the synthetic watermark of the jth band join condition.
115    state_clean_columns: Vec<(usize, usize)>,
116    /// Whether degree table is needed for this side.
117    need_degree_table: bool,
118    _marker: std::marker::PhantomData<E>,
119}
120
121impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
122    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123        f.debug_struct("JoinSide")
124            .field("join_key_indices", &self.join_key_indices)
125            .field("col_types", &self.all_data_types)
126            .field("start_pos", &self.start_pos)
127            .field("i2o_mapping", &self.i2o_mapping)
128            .field("need_degree_table", &self.need_degree_table)
129            .finish()
130    }
131}
132
133impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
134    // WARNING: Please do not call this until we implement it.
135    fn is_dirty(&self) -> bool {
136        unimplemented!()
137    }
138
139    #[expect(dead_code)]
140    fn clear_cache(&mut self) {
141        assert!(
142            !self.is_dirty(),
143            "cannot clear cache while states of hash join are dirty"
144        );
145
146        // TODO: not working with rearranged chain
147        // self.ht.clear();
148    }
149
150    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
151        self.ht.init(epoch).await
152    }
153}
154
155/// `HashJoinExecutor` takes two input streams and runs equal hash join on them.
156/// The output columns are the concatenation of left and right columns.
157pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
158{
159    ctx: ActorContextRef,
160    info: ExecutorInfo,
161
162    /// Left input executor
163    input_l: Option<Executor>,
164    /// Right input executor
165    input_r: Option<Executor>,
166    /// The data types of the formed new columns
167    actual_output_data_types: Vec<DataType>,
168    /// The parameters of the left join executor
169    side_l: JoinSide<K, S, E>,
170    /// The parameters of the right join executor
171    side_r: JoinSide<K, S, E>,
172    /// Optional non-equi join conditions
173    cond: Option<NonStrictExpression>,
174    /// Inequality pairs with output column indices.
175    /// Each entry contains: (`output_indices`, `InequalityPairInfo`).
176    inequality_pairs: Vec<(Vec<usize>, InequalityPairInfo)>,
177    /// The output watermark of each inequality condition and its value is the minimum of the
178    /// watermarks from both sides. It will be used to generate watermark into downstream
179    /// and do state cleaning based on `clean_left_state`/`clean_right_state` fields.
180    inequality_watermarks: Vec<Option<Watermark>>,
181    /// Join-key positions and cleaning flags for watermark handling.
182    /// Each entry: (join-key position, `do_state_cleaning`).
183    watermark_indices_in_jk: Vec<(usize, bool)>,
184
185    /// Whether the logic can be optimized for append-only stream
186    append_only_optimize: bool,
187
188    metrics: Arc<StreamingMetrics>,
189    /// The maximum size of the chunk produced by executor at a time
190    chunk_size: usize,
191    /// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
192    cnt_rows_received: u32,
193
194    /// watermark column index -> `BufferedWatermarks`
195    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
196
197    /// When to alert high join amplification
198    high_join_amplification_threshold: usize,
199
200    /// Max number of rows that will be cached in the entry state.
201    entry_state_max_rows: usize,
202    /// Number of processed rows between periodic manual evictions of the join cache.
203    join_cache_evict_interval_rows: u32,
204}
205
206impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
207    for HashJoinExecutor<K, S, T, E>
208{
209    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210        f.debug_struct("HashJoinExecutor")
211            .field("join_type", &T)
212            .field("input_left", &self.input_l.as_ref().unwrap().identity())
213            .field("input_right", &self.input_r.as_ref().unwrap().identity())
214            .field("side_l", &self.side_l)
215            .field("side_r", &self.side_r)
216            .field("stream_key", &self.info.stream_key)
217            .field("schema", &self.info.schema)
218            .field("actual_output_data_types", &self.actual_output_data_types)
219            .field(
220                "join_cache_evict_interval_rows",
221                &self.join_cache_evict_interval_rows,
222            )
223            .finish()
224    }
225}
226
227impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
228    for HashJoinExecutor<K, S, T, E>
229{
230    fn execute(self: Box<Self>) -> BoxedMessageStream {
231        self.into_stream().boxed()
232    }
233}
234
235struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
236    ctx: &'a ActorContextRef,
237    side_l: &'a mut JoinSide<K, S, E>,
238    side_r: &'a mut JoinSide<K, S, E>,
239    actual_output_data_types: &'a [DataType],
240    cond: &'a mut Option<NonStrictExpression>,
241    inequality_watermarks: &'a [Option<Watermark>],
242    chunk: StreamChunk,
243    append_only_optimize: bool,
244    chunk_size: usize,
245    cnt_rows_received: &'a mut u32,
246    high_join_amplification_threshold: usize,
247    entry_state_max_rows: usize,
248    join_cache_evict_interval_rows: u32,
249}
250
251impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
252    HashJoinExecutor<K, S, T, E>
253{
254    #[allow(clippy::too_many_arguments)]
255    pub fn new(
256        ctx: ActorContextRef,
257        info: ExecutorInfo,
258        input_l: Executor,
259        input_r: Executor,
260        params_l: JoinParams,
261        params_r: JoinParams,
262        null_safe: Vec<bool>,
263        output_indices: Vec<usize>,
264        cond: Option<NonStrictExpression>,
265        inequality_pairs: Vec<InequalityPairInfo>,
266        state_table_l: StateTable<S>,
267        degree_state_table_l: StateTable<S>,
268        state_table_r: StateTable<S>,
269        degree_state_table_r: StateTable<S>,
270        watermark_epoch: AtomicU64Ref,
271        is_append_only: bool,
272        metrics: Arc<StreamingMetrics>,
273        chunk_size: usize,
274        high_join_amplification_threshold: usize,
275        watermark_indices_in_jk: Vec<(usize, bool)>,
276    ) -> Self {
277        Self::new_with_cache_size(
278            ctx,
279            info,
280            input_l,
281            input_r,
282            params_l,
283            params_r,
284            null_safe,
285            output_indices,
286            cond,
287            inequality_pairs,
288            state_table_l,
289            degree_state_table_l,
290            state_table_r,
291            degree_state_table_r,
292            watermark_epoch,
293            is_append_only,
294            metrics,
295            chunk_size,
296            high_join_amplification_threshold,
297            None,
298            watermark_indices_in_jk,
299        )
300    }
301
302    #[allow(clippy::too_many_arguments)]
303    pub fn new_with_cache_size(
304        ctx: ActorContextRef,
305        info: ExecutorInfo,
306        input_l: Executor,
307        input_r: Executor,
308        params_l: JoinParams,
309        params_r: JoinParams,
310        null_safe: Vec<bool>,
311        output_indices: Vec<usize>,
312        cond: Option<NonStrictExpression>,
313        inequality_pairs: Vec<InequalityPairInfo>,
314        state_table_l: StateTable<S>,
315        degree_state_table_l: StateTable<S>,
316        state_table_r: StateTable<S>,
317        degree_state_table_r: StateTable<S>,
318        watermark_epoch: AtomicU64Ref,
319        is_append_only: bool,
320        metrics: Arc<StreamingMetrics>,
321        chunk_size: usize,
322        high_join_amplification_threshold: usize,
323        entry_state_max_rows: Option<usize>,
324        watermark_indices_in_jk: Vec<(usize, bool)>,
325    ) -> Self {
326        let entry_state_max_rows = match entry_state_max_rows {
327            None => ctx.config.developer.hash_join_entry_state_max_rows,
328            Some(entry_state_max_rows) => entry_state_max_rows,
329        };
330        let join_cache_evict_interval_rows = ctx
331            .config
332            .developer
333            .join_hash_map_evict_interval_rows
334            .max(1);
335        let side_l_column_n = input_l.schema().len();
336
337        let schema_fields = match T {
338            JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
339            JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
340            _ => [
341                input_l.schema().fields.clone(),
342                input_r.schema().fields.clone(),
343            ]
344            .concat(),
345        };
346
347        let original_output_data_types = schema_fields
348            .iter()
349            .map(|field| field.data_type())
350            .collect_vec();
351        let actual_output_data_types = output_indices
352            .iter()
353            .map(|&idx| original_output_data_types[idx].clone())
354            .collect_vec();
355
356        // Data types of of hash join state.
357        let state_all_data_types_l = input_l.schema().data_types();
358        let state_all_data_types_r = input_r.schema().data_types();
359
360        let state_pk_indices_l = input_l.stream_key().to_vec();
361        let state_pk_indices_r = input_r.stream_key().to_vec();
362
363        let state_join_key_indices_l = params_l.join_key_indices;
364        let state_join_key_indices_r = params_r.join_key_indices;
365
366        let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
367        let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
368
369        let degree_pk_indices_l = (state_join_key_indices_l.len()
370            ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
371            .collect_vec();
372        let degree_pk_indices_r = (state_join_key_indices_r.len()
373            ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
374            .collect_vec();
375
376        // If pk is contained in join key.
377        let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
378        let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
379
380        // check whether join key contains pk in both side
381        let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
382
383        let join_key_data_types_l = state_join_key_indices_l
384            .iter()
385            .map(|idx| state_all_data_types_l[*idx].clone())
386            .collect_vec();
387
388        let join_key_data_types_r = state_join_key_indices_r
389            .iter()
390            .map(|idx| state_all_data_types_r[*idx].clone())
391            .collect_vec();
392
393        assert_eq!(join_key_data_types_l, join_key_data_types_r);
394
395        let null_matched = K::Bitmap::from_bool_vec(null_safe);
396
397        let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
398        let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
399
400        let (left_to_output, right_to_output) = {
401            let (left_len, right_len) = if is_left_semi_or_anti(T) {
402                (state_all_data_types_l.len(), 0usize)
403            } else if is_right_semi_or_anti(T) {
404                (0usize, state_all_data_types_r.len())
405            } else {
406                (state_all_data_types_l.len(), state_all_data_types_r.len())
407            };
408            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
409        };
410
411        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
412        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
413
414        let left_input_len = input_l.schema().len();
415        let right_input_len = input_r.schema().len();
416        let mut l2inequality_index = vec![vec![]; left_input_len];
417        let mut r2inequality_index = vec![vec![]; right_input_len];
418        let mut l_inequal_state_clean_columns = vec![];
419        let mut r_inequal_state_clean_columns = vec![];
420        let inequality_pairs = inequality_pairs
421            .into_iter()
422            .enumerate()
423            .map(|(index, pair)| {
424                // Map input columns to inequality index
425                // The second field indicates whether this column is required to be less than
426                // the corresponding column on the other side.
427                // For `left >= right`: left is NOT less than right, right IS less than left
428                // For `left <= right`: left IS less than right, right is NOT less than left
429                let left_is_larger = pair.left_side_is_larger();
430                l2inequality_index[pair.left_idx].push((index, !left_is_larger));
431                r2inequality_index[pair.right_idx].push((index, left_is_larger));
432
433                // Determine state cleanup columns
434                if pair.clean_left_state {
435                    l_inequal_state_clean_columns.push((pair.left_idx, index));
436                }
437                if pair.clean_right_state {
438                    r_inequal_state_clean_columns.push((pair.right_idx, index));
439                }
440
441                // Get output indices for watermark emission
442                // We only emit watermarks for the LARGER side's output columns
443                let output_indices = if pair.left_side_is_larger() {
444                    // Left is larger, emit for left output columns
445                    l2o_indexed
446                        .get_vec(&pair.left_idx)
447                        .cloned()
448                        .unwrap_or_default()
449                } else {
450                    // Right is larger, emit for right output columns
451                    r2o_indexed
452                        .get_vec(&pair.right_idx)
453                        .cloned()
454                        .unwrap_or_default()
455                };
456
457                (output_indices, pair)
458            })
459            .collect_vec();
460
461        let mut l_non_null_fields = l2inequality_index
462            .iter()
463            .positions(|inequalities| !inequalities.is_empty())
464            .collect_vec();
465        let mut r_non_null_fields = r2inequality_index
466            .iter()
467            .positions(|inequalities| !inequalities.is_empty())
468            .collect_vec();
469
470        if append_only_optimize {
471            l_inequal_state_clean_columns.clear();
472            r_inequal_state_clean_columns.clear();
473            l_non_null_fields.clear();
474            r_non_null_fields.clear();
475        }
476
477        // Create degree tables with inequality column index for watermark-based cleaning.
478        // The degree_inequality_idx is the column index in the input row that should be
479        // stored in the degree table for inequality-based watermark cleaning.
480        let l_inequality_idx = l_inequal_state_clean_columns
481            .first()
482            .map(|(col_idx, _)| *col_idx);
483        let r_inequality_idx = r_inequal_state_clean_columns
484            .first()
485            .map(|(col_idx, _)| *col_idx);
486
487        let degree_state_l = need_degree_table_l.then(|| {
488            TableInner::new(
489                degree_pk_indices_l,
490                degree_join_key_indices_l,
491                degree_state_table_l,
492                l_inequality_idx,
493            )
494        });
495        let degree_state_r = need_degree_table_r.then(|| {
496            TableInner::new(
497                degree_pk_indices_r,
498                degree_join_key_indices_r,
499                degree_state_table_r,
500                r_inequality_idx,
501            )
502        });
503
504        let inequality_watermarks = vec![None; inequality_pairs.len()];
505        let watermark_buffers = BTreeMap::new();
506        Self {
507            ctx: ctx.clone(),
508            info,
509            input_l: Some(input_l),
510            input_r: Some(input_r),
511            actual_output_data_types,
512            side_l: JoinSide {
513                ht: JoinHashMap::new(
514                    watermark_epoch.clone(),
515                    join_key_data_types_l,
516                    state_join_key_indices_l.clone(),
517                    state_all_data_types_l.clone(),
518                    state_table_l,
519                    params_l.deduped_pk_indices,
520                    degree_state_l,
521                    null_matched.clone(),
522                    pk_contained_in_jk_l,
523                    None,
524                    metrics.clone(),
525                    ctx.id,
526                    ctx.fragment_id,
527                    "left",
528                ),
529                join_key_indices: state_join_key_indices_l,
530                all_data_types: state_all_data_types_l,
531                i2o_mapping: left_to_output,
532                i2o_mapping_indexed: l2o_indexed,
533                input2inequality_index: l2inequality_index,
534                non_null_fields: l_non_null_fields,
535                state_clean_columns: l_inequal_state_clean_columns,
536                start_pos: 0,
537                need_degree_table: need_degree_table_l,
538                _marker: PhantomData,
539            },
540            side_r: JoinSide {
541                ht: JoinHashMap::new(
542                    watermark_epoch,
543                    join_key_data_types_r,
544                    state_join_key_indices_r.clone(),
545                    state_all_data_types_r.clone(),
546                    state_table_r,
547                    params_r.deduped_pk_indices,
548                    degree_state_r,
549                    null_matched,
550                    pk_contained_in_jk_r,
551                    None,
552                    metrics.clone(),
553                    ctx.id,
554                    ctx.fragment_id,
555                    "right",
556                ),
557                join_key_indices: state_join_key_indices_r,
558                all_data_types: state_all_data_types_r,
559                start_pos: side_l_column_n,
560                i2o_mapping: right_to_output,
561                i2o_mapping_indexed: r2o_indexed,
562                input2inequality_index: r2inequality_index,
563                non_null_fields: r_non_null_fields,
564                state_clean_columns: r_inequal_state_clean_columns,
565                need_degree_table: need_degree_table_r,
566                _marker: PhantomData,
567            },
568            cond,
569            inequality_pairs,
570            inequality_watermarks,
571            watermark_indices_in_jk,
572            append_only_optimize,
573            metrics,
574            chunk_size,
575            cnt_rows_received: 0,
576            watermark_buffers,
577            high_join_amplification_threshold,
578            entry_state_max_rows,
579            join_cache_evict_interval_rows,
580        }
581    }
582
583    #[try_stream(ok = Message, error = StreamExecutorError)]
584    async fn into_stream(mut self) {
585        let input_l = self.input_l.take().unwrap();
586        let input_r = self.input_r.take().unwrap();
587        let aligned_stream = barrier_align(
588            input_l.execute(),
589            input_r.execute(),
590            self.ctx.id,
591            self.ctx.fragment_id,
592            self.metrics.clone(),
593            "Join",
594        );
595        pin_mut!(aligned_stream);
596
597        let actor_id = self.ctx.id;
598
599        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
600        let first_epoch = barrier.epoch;
601        // The first barrier message should be propagated.
602        yield Message::Barrier(barrier);
603        self.side_l.init(first_epoch).await?;
604        self.side_r.init(first_epoch).await?;
605
606        let actor_id_str = self.ctx.id.to_string();
607        let fragment_id_str = self.ctx.fragment_id.to_string();
608
609        // initialized some metrics
610        let join_actor_input_waiting_duration_ns = self
611            .metrics
612            .join_actor_input_waiting_duration_ns
613            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
614        let left_join_match_duration_ns = self
615            .metrics
616            .join_match_duration_ns
617            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
618        let right_join_match_duration_ns = self
619            .metrics
620            .join_match_duration_ns
621            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
622
623        let barrier_join_match_duration_ns = self
624            .metrics
625            .join_match_duration_ns
626            .with_guarded_label_values(&[
627                actor_id_str.as_str(),
628                fragment_id_str.as_str(),
629                "barrier",
630            ]);
631
632        let left_join_cached_entry_count = self
633            .metrics
634            .join_cached_entry_count
635            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
636
637        let right_join_cached_entry_count = self
638            .metrics
639            .join_cached_entry_count
640            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
641
642        let mut start_time = Instant::now();
643
644        while let Some(msg) = aligned_stream
645            .next()
646            .instrument_await("hash_join_barrier_align")
647            .await
648        {
649            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
650            match msg? {
651                AlignedMessage::WatermarkLeft(watermark) => {
652                    for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
653                        yield Message::Watermark(watermark_to_emit);
654                    }
655                }
656                AlignedMessage::WatermarkRight(watermark) => {
657                    for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
658                        yield Message::Watermark(watermark_to_emit);
659                    }
660                }
661                AlignedMessage::Left(chunk) => {
662                    let mut left_time = Duration::from_nanos(0);
663                    let mut left_start_time = Instant::now();
664                    #[for_await]
665                    for chunk in Self::eq_join_left(EqJoinArgs {
666                        ctx: &self.ctx,
667                        side_l: &mut self.side_l,
668                        side_r: &mut self.side_r,
669                        actual_output_data_types: &self.actual_output_data_types,
670                        cond: &mut self.cond,
671                        inequality_watermarks: &self.inequality_watermarks,
672                        chunk,
673                        append_only_optimize: self.append_only_optimize,
674                        chunk_size: self.chunk_size,
675                        cnt_rows_received: &mut self.cnt_rows_received,
676                        high_join_amplification_threshold: self.high_join_amplification_threshold,
677                        entry_state_max_rows: self.entry_state_max_rows,
678                        join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
679                    }) {
680                        left_time += left_start_time.elapsed();
681                        yield Message::Chunk(chunk?);
682                        left_start_time = Instant::now();
683                    }
684                    left_time += left_start_time.elapsed();
685                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
686                    self.try_flush_data().await?;
687                }
688                AlignedMessage::Right(chunk) => {
689                    let mut right_time = Duration::from_nanos(0);
690                    let mut right_start_time = Instant::now();
691                    #[for_await]
692                    for chunk in Self::eq_join_right(EqJoinArgs {
693                        ctx: &self.ctx,
694                        side_l: &mut self.side_l,
695                        side_r: &mut self.side_r,
696                        actual_output_data_types: &self.actual_output_data_types,
697                        cond: &mut self.cond,
698                        inequality_watermarks: &self.inequality_watermarks,
699                        chunk,
700                        append_only_optimize: self.append_only_optimize,
701                        chunk_size: self.chunk_size,
702                        cnt_rows_received: &mut self.cnt_rows_received,
703                        high_join_amplification_threshold: self.high_join_amplification_threshold,
704                        entry_state_max_rows: self.entry_state_max_rows,
705                        join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
706                    }) {
707                        right_time += right_start_time.elapsed();
708                        yield Message::Chunk(chunk?);
709                        right_start_time = Instant::now();
710                    }
711                    right_time += right_start_time.elapsed();
712                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
713                    self.try_flush_data().await?;
714                }
715                AlignedMessage::Barrier(barrier) => {
716                    let barrier_start_time = Instant::now();
717                    let (left_post_commit, right_post_commit) =
718                        self.flush_data(barrier.epoch).await?;
719
720                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
721
722                    // We don't include the time post yielding barrier because the vnode update
723                    // is a one-off and rare operation.
724                    barrier_join_match_duration_ns
725                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
726                    yield Message::Barrier(barrier);
727
728                    // Update the vnode bitmap for state tables of both sides if asked.
729                    right_post_commit
730                        .post_yield_barrier(update_vnode_bitmap.clone())
731                        .await?;
732                    if left_post_commit
733                        .post_yield_barrier(update_vnode_bitmap)
734                        .await?
735                        .unwrap_or(false)
736                    {
737                        self.watermark_buffers
738                            .values_mut()
739                            .for_each(|buffers| buffers.clear());
740                        self.inequality_watermarks.fill(None);
741                    }
742
743                    // Report metrics of cached join rows/entries
744                    for (join_cached_entry_count, ht) in [
745                        (&left_join_cached_entry_count, &self.side_l.ht),
746                        (&right_join_cached_entry_count, &self.side_r.ht),
747                    ] {
748                        join_cached_entry_count.set(ht.entry_count() as i64);
749                    }
750                }
751            }
752            start_time = Instant::now();
753        }
754    }
755
756    async fn flush_data(
757        &mut self,
758        epoch: EpochPair,
759    ) -> StreamExecutorResult<(
760        JoinHashMapPostCommit<'_, K, S, E>,
761        JoinHashMapPostCommit<'_, K, S, E>,
762    )> {
763        // All changes to the state has been buffered in the mem-table of the state table. Just
764        // `commit` them here.
765        let left = self.side_l.ht.flush(epoch).await?;
766        let right = self.side_r.ht.flush(epoch).await?;
767        Ok((left, right))
768    }
769
770    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
771        // All changes to the state has been buffered in the mem-table of the state table. Just
772        // `commit` them here.
773        self.side_l.ht.try_flush().await?;
774        self.side_r.ht.try_flush().await?;
775        Ok(())
776    }
777
778    // We need to manually evict the cache.
779    fn evict_cache(
780        side_update: &mut JoinSide<K, S, E>,
781        side_match: &mut JoinSide<K, S, E>,
782        cnt_rows_received: &mut u32,
783        join_cache_evict_interval_rows: u32,
784    ) {
785        *cnt_rows_received += 1;
786        if *cnt_rows_received >= join_cache_evict_interval_rows {
787            side_update.ht.evict();
788            side_match.ht.evict();
789            *cnt_rows_received = 0;
790        }
791    }
792
793    fn handle_watermark(
794        &mut self,
795        side: SideTypePrimitive,
796        watermark: Watermark,
797    ) -> StreamExecutorResult<Vec<Watermark>> {
798        let (side_update, side_match) = if side == SideType::Left {
799            (&mut self.side_l, &mut self.side_r)
800        } else {
801            (&mut self.side_r, &mut self.side_l)
802        };
803
804        // Process join-key watermarks
805        let wm_in_jk = side_update
806            .join_key_indices
807            .iter()
808            .positions(|idx| *idx == watermark.col_idx);
809        let mut watermarks_to_emit = vec![];
810        for idx in wm_in_jk {
811            let buffers = self
812                .watermark_buffers
813                .entry(idx)
814                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
815            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
816                if self
817                    .watermark_indices_in_jk
818                    .iter()
819                    .any(|(jk_pos, do_clean)| *jk_pos == idx && *do_clean)
820                {
821                    side_match
822                        .ht
823                        .update_watermark(selected_watermark.val.clone());
824                    side_update
825                        .ht
826                        .update_watermark(selected_watermark.val.clone());
827                }
828
829                let empty_indices = vec![];
830                let output_indices = side_update
831                    .i2o_mapping_indexed
832                    .get_vec(&side_update.join_key_indices[idx])
833                    .unwrap_or(&empty_indices)
834                    .iter()
835                    .chain(
836                        side_match
837                            .i2o_mapping_indexed
838                            .get_vec(&side_match.join_key_indices[idx])
839                            .unwrap_or(&empty_indices),
840                    );
841                for output_idx in output_indices {
842                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
843                }
844            };
845        }
846
847        // Process inequality watermarks
848        // We can only yield the LARGER side's watermark downstream.
849        // For `left >= right`: left is larger, emit for left output columns
850        // For `left <= right`: right is larger, emit for right output columns
851        let mut update_left_watermark = None;
852        let mut update_right_watermark = None;
853        if let Some(watermark_indices) = side_update.input2inequality_index.get(watermark.col_idx) {
854            for (inequality_index, _) in watermark_indices {
855                let buffers = self
856                    .watermark_buffers
857                    .entry(side_update.join_key_indices.len() + inequality_index)
858                    .or_insert_with(|| {
859                        BufferedWatermarks::with_ids([SideType::Left, SideType::Right])
860                    });
861                if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone())
862                {
863                    let (output_indices, pair_info) = &self.inequality_pairs[*inequality_index];
864                    let left_is_larger = pair_info.left_side_is_larger();
865
866                    // Emit watermark for the larger side's output columns only
867                    for output_idx in output_indices {
868                        watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
869                    }
870                    // Store watermark for state cleaning (via useful_state_clean_columns)
871                    self.inequality_watermarks[*inequality_index] =
872                        Some(selected_watermark.clone());
873
874                    // Mark which side needs state cleaning (only if the clean flag is set)
875                    if left_is_larger && pair_info.clean_left_state {
876                        update_left_watermark = Some(selected_watermark.val.clone());
877                    } else if !left_is_larger && pair_info.clean_right_state {
878                        update_right_watermark = Some(selected_watermark.val.clone());
879                    }
880                }
881            }
882            // Do state cleaning on the larger side.
883            // Now that degree tables include the inequality column, we can clean both
884            // state and degree tables using `update_watermark`.
885            if let Some(val) = update_left_watermark {
886                self.side_l.ht.update_watermark(val);
887            }
888            if let Some(val) = update_right_watermark {
889                self.side_r.ht.update_watermark(val);
890            }
891        }
892        Ok(watermarks_to_emit)
893    }
894
895    fn row_concat(
896        row_update: impl Row,
897        update_start_pos: usize,
898        row_matched: impl Row,
899        matched_start_pos: usize,
900    ) -> OwnedRow {
901        let mut new_row = vec![None; row_update.len() + row_matched.len()];
902
903        for (i, datum_ref) in row_update.iter().enumerate() {
904            new_row[i + update_start_pos] = datum_ref.to_owned_datum();
905        }
906        for (i, datum_ref) in row_matched.iter().enumerate() {
907            new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
908        }
909        OwnedRow::new(new_row)
910    }
911
912    /// Used to forward `eq_join_oneside` to show join side in stack.
913    fn eq_join_left(
914        args: EqJoinArgs<'_, K, S, E>,
915    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
916        Self::eq_join_oneside::<{ SideType::Left }>(args)
917    }
918
919    /// Used to forward `eq_join_oneside` to show join side in stack.
920    fn eq_join_right(
921        args: EqJoinArgs<'_, K, S, E>,
922    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
923        Self::eq_join_oneside::<{ SideType::Right }>(args)
924    }
925
926    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
927    async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
928        let EqJoinArgs {
929            ctx,
930            side_l,
931            side_r,
932            actual_output_data_types,
933            cond,
934            inequality_watermarks,
935            chunk,
936            append_only_optimize,
937            chunk_size,
938            cnt_rows_received,
939            high_join_amplification_threshold,
940            entry_state_max_rows,
941            join_cache_evict_interval_rows,
942            ..
943        } = args;
944
945        let (side_update, side_match) = if SIDE == SideType::Left {
946            (side_l, side_r)
947        } else {
948            (side_r, side_l)
949        };
950
951        let useful_state_clean_columns = side_match
952            .state_clean_columns
953            .iter()
954            .filter_map(|(column_idx, inequality_index)| {
955                inequality_watermarks[*inequality_index]
956                    .as_ref()
957                    .map(|watermark| (*column_idx, watermark))
958            })
959            .collect_vec();
960
961        let mut hashjoin_chunk_builder =
962            JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
963                chunk_size,
964                actual_output_data_types.to_vec(),
965                side_update.i2o_mapping.clone(),
966                side_match.i2o_mapping.clone(),
967            ));
968
969        let join_matched_join_keys = ctx
970            .streaming_metrics
971            .join_matched_join_keys
972            .with_guarded_label_values(&[
973                &ctx.id.to_string(),
974                &ctx.fragment_id.to_string(),
975                &side_update.ht.table_id().to_string(),
976            ]);
977
978        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
979        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
980            let Some((op, row)) = r else {
981                continue;
982            };
983            Self::evict_cache(
984                side_update,
985                side_match,
986                cnt_rows_received,
987                join_cache_evict_interval_rows,
988            );
989
990            let cache_lookup_result = {
991                let probe_non_null_requirement_satisfied = side_update
992                    .non_null_fields
993                    .iter()
994                    .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
995                let build_non_null_requirement_satisfied =
996                    key.null_bitmap().is_subset(side_match.ht.null_matched());
997                if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
998                    side_match.ht.take_state_opt(key)
999                } else {
1000                    CacheResult::NeverMatch
1001                }
1002            };
1003            let mut total_matches = 0;
1004
1005            macro_rules! match_rows {
1006                ($op:ident) => {
1007                    Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
1008                        cache_lookup_result,
1009                        row,
1010                        key,
1011                        &mut hashjoin_chunk_builder,
1012                        side_match,
1013                        side_update,
1014                        &useful_state_clean_columns,
1015                        cond,
1016                        append_only_optimize,
1017                        entry_state_max_rows,
1018                    )
1019                };
1020            }
1021
1022            match op {
1023                Op::Insert | Op::UpdateInsert =>
1024                {
1025                    #[for_await]
1026                    for chunk in match_rows!(Insert) {
1027                        let chunk = chunk?;
1028                        total_matches += chunk.cardinality();
1029                        yield chunk;
1030                    }
1031                }
1032                Op::Delete | Op::UpdateDelete =>
1033                {
1034                    #[for_await]
1035                    for chunk in match_rows!(Delete) {
1036                        let chunk = chunk?;
1037                        total_matches += chunk.cardinality();
1038                        yield chunk;
1039                    }
1040                }
1041            };
1042
1043            join_matched_join_keys.observe(total_matches as _);
1044            if total_matches > high_join_amplification_threshold {
1045                let join_key_data_types = side_update.ht.join_key_data_types();
1046                let key = key.deserialize(join_key_data_types)?;
1047                tracing::warn!(target: "high_join_amplification",
1048                    matched_rows_len = total_matches,
1049                    update_table_id = %side_update.ht.table_id(),
1050                    match_table_id = %side_match.ht.table_id(),
1051                    join_key = ?key,
1052                    actor_id = %ctx.id,
1053                    fragment_id = %ctx.fragment_id,
1054                    "large rows matched for join key"
1055                );
1056            }
1057        }
1058        // NOTE(kwannoel): We don't track metrics for this last chunk.
1059        if let Some(chunk) = hashjoin_chunk_builder.take() {
1060            yield chunk;
1061        }
1062    }
1063
1064    /// For the probe-side row, we need to check if it has values in cache, if not, we need to
1065    /// fetch the matched rows from the state table.
1066    ///
1067    /// Every matched build-side row being processed needs to go through the following phases:
1068    /// 1. Handle join condition evaluation.
1069    /// 2. Always do cache refill, if the state count is good.
1070    /// 3. Handle state cleaning.
1071    /// 4. Handle degree table update.
1072    #[allow(clippy::too_many_arguments)]
1073    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
1074    async fn handle_match_rows<
1075        'a,
1076        const SIDE: SideTypePrimitive,
1077        const JOIN_OP: JoinOpPrimitive,
1078    >(
1079        cached_lookup_result: CacheResult<E>,
1080        row: RowRef<'a>,
1081        key: &'a K,
1082        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1083        side_match: &'a mut JoinSide<K, S, E>,
1084        side_update: &'a mut JoinSide<K, S, E>,
1085        useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1086        cond: &'a mut Option<NonStrictExpression>,
1087        append_only_optimize: bool,
1088        entry_state_max_rows: usize,
1089    ) {
1090        let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1091        let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1092        let mut entry_state_count = 0;
1093
1094        let mut degree = 0;
1095        let mut append_only_matched_row = None;
1096        let mut matched_rows_to_clean = vec![];
1097
1098        macro_rules! match_row {
1099            (
1100                $match_order_key_indices:expr,
1101                $degree_table:expr,
1102                $matched_row:expr,
1103                $matched_row_ref:expr,
1104                $from_cache:literal,
1105                $map_output:expr,
1106            ) => {
1107                Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1108                    row,
1109                    $matched_row,
1110                    $matched_row_ref,
1111                    hashjoin_chunk_builder,
1112                    $match_order_key_indices,
1113                    $degree_table,
1114                    side_update.start_pos,
1115                    side_match.start_pos,
1116                    cond,
1117                    &mut degree,
1118                    useful_state_clean_columns,
1119                    append_only_optimize,
1120                    &mut append_only_matched_row,
1121                    &mut matched_rows_to_clean,
1122                    $map_output,
1123                )
1124            };
1125        }
1126
1127        let entry_state = match cached_lookup_result {
1128            CacheResult::NeverMatch => {
1129                let op = match JOIN_OP {
1130                    JoinOp::Insert => Op::Insert,
1131                    JoinOp::Delete => Op::Delete,
1132                };
1133                if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1134                    yield chunk;
1135                }
1136                return Ok(());
1137            }
1138            CacheResult::Hit(mut cached_rows) => {
1139                let (match_order_key_indices, match_degree_state) =
1140                    side_match.ht.get_degree_state_mut_ref();
1141                // Handle cached rows which match the probe-side row.
1142                for (matched_row_ref, matched_row) in
1143                    cached_rows.values_mut(&side_match.all_data_types)
1144                {
1145                    let matched_row = matched_row?;
1146                    if let Some(chunk) = match_row!(
1147                        match_order_key_indices,
1148                        match_degree_state,
1149                        matched_row,
1150                        Some(matched_row_ref),
1151                        true,
1152                        Either::Left,
1153                    )
1154                    .await
1155                    {
1156                        yield chunk;
1157                    }
1158                }
1159
1160                cached_rows
1161            }
1162            CacheResult::Miss => {
1163                // Handle rows which are not in cache.
1164                let (matched_rows, match_order_key_indices, degree_table) = side_match
1165                    .ht
1166                    .fetch_matched_rows_and_get_degree_table_ref(key)
1167                    .await?;
1168
1169                #[for_await]
1170                for matched_row in matched_rows {
1171                    let (encoded_pk, matched_row) = matched_row?;
1172
1173                    let mut matched_row_ref = None;
1174
1175                    // cache refill
1176                    if entry_state_count <= entry_state_max_rows {
1177                        let row_ref = entry_state
1178                            .insert(encoded_pk, E::encode(&matched_row), None) // TODO(kwannoel): handle ineq key for asof join.
1179                            .with_context(|| format!("row: {}", row.display(),))?;
1180                        matched_row_ref = Some(row_ref);
1181                        entry_state_count += 1;
1182                    }
1183                    if let Some(chunk) = match_row!(
1184                        match_order_key_indices,
1185                        degree_table,
1186                        matched_row,
1187                        matched_row_ref,
1188                        false,
1189                        Either::Right,
1190                    )
1191                    .await
1192                    {
1193                        yield chunk;
1194                    }
1195                }
1196                Box::new(entry_state)
1197            }
1198        };
1199
1200        // forward rows depending on join types
1201        let op = match JOIN_OP {
1202            JoinOp::Insert => Op::Insert,
1203            JoinOp::Delete => Op::Delete,
1204        };
1205        if degree == 0 {
1206            if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1207                yield chunk;
1208            }
1209        } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1210        {
1211            yield chunk;
1212        }
1213
1214        // cache refill
1215        if cache_hit || entry_state_count <= entry_state_max_rows {
1216            side_match.ht.update_state(key, entry_state);
1217        }
1218
1219        // watermark state cleaning
1220        for matched_row in matched_rows_to_clean {
1221            // The committed table watermark is responsible for reclaiming rows in the state table.
1222            side_match.ht.delete_row_in_mem(key, &matched_row.row)?;
1223        }
1224
1225        // apply append_only optimization to clean matched_rows which have been persisted
1226        if append_only_optimize && let Some(row) = append_only_matched_row {
1227            assert_matches!(JOIN_OP, JoinOp::Insert);
1228            side_match.ht.delete_handle_degree(key, row)?;
1229            return Ok(());
1230        }
1231
1232        // no append_only optimization, update state table(s).
1233        match JOIN_OP {
1234            JoinOp::Insert => {
1235                side_update
1236                    .ht
1237                    .insert_handle_degree(key, JoinRow::new(row, degree))?;
1238            }
1239            JoinOp::Delete => {
1240                side_update
1241                    .ht
1242                    .delete_handle_degree(key, JoinRow::new(row, degree))?;
1243            }
1244        }
1245    }
1246
1247    #[allow(clippy::too_many_arguments)]
1248    #[inline]
1249    async fn handle_match_row<
1250        'a,
1251        R: Row,  // input row type
1252        RO: Row, // output row type
1253        const SIDE: SideTypePrimitive,
1254        const JOIN_OP: JoinOpPrimitive,
1255        const MATCHED_ROWS_FROM_CACHE: bool,
1256    >(
1257        update_row: RowRef<'a>,
1258        mut matched_row: JoinRow<R>,
1259        mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1260        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1261        match_order_key_indices: &[usize],
1262        match_degree_table: &mut Option<TableInner<S>>,
1263        side_update_start_pos: usize,
1264        side_match_start_pos: usize,
1265        cond: &Option<NonStrictExpression>,
1266        update_row_degree: &mut u64,
1267        useful_state_clean_columns: &[(usize, &'a Watermark)],
1268        append_only_optimize: bool,
1269        append_only_matched_row: &mut Option<JoinRow<RO>>,
1270        matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1271        map_output: impl Fn(R) -> RO,
1272    ) -> Option<StreamChunk> {
1273        let mut need_state_clean = false;
1274        let mut chunk_opt = None;
1275        // TODO(kwannoel): Instead of evaluating this every loop,
1276        // we can call this only if there's a non-equi expression.
1277        // check join cond
1278        let join_condition_satisfied = Self::check_join_condition(
1279            update_row,
1280            side_update_start_pos,
1281            &matched_row.row,
1282            side_match_start_pos,
1283            cond,
1284        )
1285        .await;
1286
1287        if join_condition_satisfied {
1288            // update degree
1289            *update_row_degree += 1;
1290            // send matched row downstream
1291
1292            // Inserts must happen before degrees are updated,
1293            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1294            if matches!(JOIN_OP, JoinOp::Insert)
1295                && !forward_exactly_once(T, SIDE)
1296                && let Some(chunk) =
1297                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1298            {
1299                chunk_opt = Some(chunk);
1300            }
1301            // TODO(kwannoel): We can actually statically decide this, using join side + join type.
1302            if let Some(degree_table) = match_degree_table {
1303                update_degree::<S, { JOIN_OP }>(
1304                    match_order_key_indices,
1305                    degree_table,
1306                    &mut matched_row,
1307                );
1308                if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1309                    // update matched row in cache
1310                    match JOIN_OP {
1311                        JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1312                        JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1313                    }
1314                }
1315            }
1316
1317            // Deletes must happen after degree table is updated.
1318            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1319            if matches!(JOIN_OP, JoinOp::Delete)
1320                && !forward_exactly_once(T, SIDE)
1321                && let Some(chunk) =
1322                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1323            {
1324                chunk_opt = Some(chunk);
1325            }
1326        } else {
1327            // check if need state cleaning
1328            for (column_idx, watermark) in useful_state_clean_columns {
1329                if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1330                    scalar
1331                        .default_cmp(&watermark.val.as_scalar_ref_impl())
1332                        .is_lt()
1333                }) {
1334                    need_state_clean = true;
1335                    break;
1336                }
1337            }
1338        }
1339        // If the stream is append-only and the join key covers pk in both side,
1340        // then we can remove matched rows since pk is unique and will not be
1341        // inserted again
1342        if append_only_optimize {
1343            assert_matches!(JOIN_OP, JoinOp::Insert);
1344            // Since join key contains pk and pk is unique, there should be only
1345            // one row if matched.
1346            assert!(append_only_matched_row.is_none());
1347            *append_only_matched_row = Some(matched_row.map(map_output));
1348        } else if need_state_clean {
1349            debug_assert!(
1350                !append_only_optimize,
1351                "`append_only_optimize` and `need_state_clean` must not both be true"
1352            );
1353            matched_rows_to_clean.push(matched_row.map(map_output));
1354        }
1355
1356        chunk_opt
1357    }
1358
1359    // TODO(yuhao-su): We should find a better way to eval the expression
1360    // without concat two rows.
1361    // if there are non-equi expressions
1362    // NOTE(kwannoel): We can probably let `eval` use `impl Row` instead of `OwnedRow`.
1363    #[inline]
1364    async fn check_join_condition(
1365        row: impl Row,
1366        side_update_start_pos: usize,
1367        matched_row: impl Row,
1368        side_match_start_pos: usize,
1369        join_condition: &Option<NonStrictExpression>,
1370    ) -> bool {
1371        if let Some(join_condition) = join_condition {
1372            let new_row = Self::row_concat(
1373                row,
1374                side_update_start_pos,
1375                matched_row,
1376                side_match_start_pos,
1377            );
1378            join_condition
1379                .eval_row_infallible(&new_row)
1380                .await
1381                .map(|s| *s.as_bool())
1382                .unwrap_or(false)
1383        } else {
1384            true
1385        }
1386    }
1387}
1388
1389#[cfg(test)]
1390mod tests {
1391    use std::sync::atomic::AtomicU64;
1392
1393    use pretty_assertions::assert_eq;
1394    use risingwave_common::array::*;
1395    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1396    use risingwave_common::config::StreamingConfig;
1397    use risingwave_common::hash::{Key64, Key128};
1398    use risingwave_common::util::epoch::test_epoch;
1399    use risingwave_common::util::sort_util::OrderType;
1400    use risingwave_storage::memory::MemoryStateStore;
1401
1402    use super::*;
1403    use crate::common::table::test_utils::gen_pbtable;
1404    use crate::executor::MemoryEncoding;
1405    use crate::executor::test_utils::expr::build_from_pretty;
1406    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1407
1408    async fn create_in_memory_state_table(
1409        mem_state: MemoryStateStore,
1410        data_types: &[DataType],
1411        order_types: &[OrderType],
1412        pk_indices: &[usize],
1413        table_id: u32,
1414    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1415        create_in_memory_state_table_with_inequality(
1416            mem_state,
1417            data_types,
1418            order_types,
1419            pk_indices,
1420            table_id,
1421            None,
1422        )
1423        .await
1424    }
1425
1426    async fn create_in_memory_state_table_with_inequality(
1427        mem_state: MemoryStateStore,
1428        data_types: &[DataType],
1429        order_types: &[OrderType],
1430        pk_indices: &[usize],
1431        table_id: u32,
1432        degree_inequality_type: Option<DataType>,
1433    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1434        create_in_memory_state_table_with_watermark(
1435            mem_state,
1436            data_types,
1437            order_types,
1438            pk_indices,
1439            table_id,
1440            degree_inequality_type,
1441            vec![],
1442            vec![],
1443        )
1444        .await
1445    }
1446
1447    #[allow(clippy::too_many_arguments)]
1448    async fn create_in_memory_state_table_with_watermark(
1449        mem_state: MemoryStateStore,
1450        data_types: &[DataType],
1451        order_types: &[OrderType],
1452        pk_indices: &[usize],
1453        table_id: u32,
1454        degree_inequality_type: Option<DataType>,
1455        state_clean_watermark_indices: Vec<usize>,
1456        degree_clean_watermark_indices: Vec<usize>,
1457    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1458        let column_descs = data_types
1459            .iter()
1460            .enumerate()
1461            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1462            .collect_vec();
1463        let mut state_table_catalog = gen_pbtable(
1464            TableId::new(table_id),
1465            column_descs,
1466            order_types.to_vec(),
1467            pk_indices.to_vec(),
1468            0,
1469        );
1470        state_table_catalog.clean_watermark_indices = state_clean_watermark_indices
1471            .into_iter()
1472            .map(|idx| idx as u32)
1473            .collect();
1474        let state_table =
1475            StateTable::from_table_catalog(&state_table_catalog, mem_state.clone(), None).await;
1476
1477        // Create degree table with schema: [pk..., _degree, inequality?]
1478        let mut degree_table_column_descs = vec![];
1479        pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1480            degree_table_column_descs.push(ColumnDesc::unnamed(
1481                ColumnId::new(pk_id as i32),
1482                data_types[*idx].clone(),
1483            ))
1484        });
1485        // Add _degree column
1486        degree_table_column_descs.push(ColumnDesc::unnamed(
1487            ColumnId::new(pk_indices.len() as i32),
1488            DataType::Int64,
1489        ));
1490        // Add inequality column if present
1491        if let Some(ineq_type) = degree_inequality_type {
1492            degree_table_column_descs.push(ColumnDesc::unnamed(
1493                ColumnId::new((pk_indices.len() + 1) as i32),
1494                ineq_type,
1495            ));
1496        }
1497        let mut degree_table_catalog = gen_pbtable(
1498            TableId::new(table_id + 1),
1499            degree_table_column_descs,
1500            order_types.to_vec(),
1501            pk_indices.to_vec(),
1502            0,
1503        );
1504        degree_table_catalog.clean_watermark_indices = degree_clean_watermark_indices
1505            .into_iter()
1506            .map(|idx| idx as u32)
1507            .collect();
1508        let degree_state_table =
1509            StateTable::from_table_catalog(&degree_table_catalog, mem_state, None).await;
1510        (state_table, degree_state_table)
1511    }
1512
1513    fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1514        build_from_pretty(
1515            condition_text
1516                .as_deref()
1517                .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1518        )
1519    }
1520
1521    async fn create_executor<const T: JoinTypePrimitive>(
1522        with_condition: bool,
1523        null_safe: bool,
1524        condition_text: Option<String>,
1525        inequality_pairs: Vec<InequalityPairInfo>,
1526    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1527        let schema = Schema {
1528            fields: vec![
1529                Field::unnamed(DataType::Int64), // join key
1530                Field::unnamed(DataType::Int64),
1531            ],
1532        };
1533        let (tx_l, source_l) = MockSource::channel();
1534        let source_l = source_l.into_executor(schema.clone(), vec![1]);
1535        let (tx_r, source_r) = MockSource::channel();
1536        let source_r = source_r.into_executor(schema, vec![1]);
1537        let params_l = JoinParams::new(vec![0], vec![1]);
1538        let params_r = JoinParams::new(vec![0], vec![1]);
1539        let cond = with_condition.then(|| create_cond(condition_text));
1540
1541        let mem_state = MemoryStateStore::new();
1542
1543        // Determine inequality column types for degree tables based on inequality_pairs
1544        let l_degree_ineq_type = inequality_pairs
1545            .iter()
1546            .find(|pair| pair.clean_left_state)
1547            .map(|_| DataType::Int64); // Column 1 is Int64
1548        let r_degree_ineq_type = inequality_pairs
1549            .iter()
1550            .find(|pair| pair.clean_right_state)
1551            .map(|_| DataType::Int64); // Column 1 is Int64
1552        let l_clean_watermark_indices = inequality_pairs
1553            .iter()
1554            .find(|pair| pair.clean_left_state)
1555            .map(|pair| vec![pair.left_idx])
1556            .unwrap_or_default();
1557        let r_clean_watermark_indices = inequality_pairs
1558            .iter()
1559            .find(|pair| pair.clean_right_state)
1560            .map(|pair| vec![pair.right_idx])
1561            .unwrap_or_default();
1562        let degree_inequality_column_idx = 3;
1563        let l_degree_clean_watermark_indices = l_degree_ineq_type
1564            .as_ref()
1565            .map(|_| vec![degree_inequality_column_idx])
1566            .unwrap_or_default();
1567        let r_degree_clean_watermark_indices = r_degree_ineq_type
1568            .as_ref()
1569            .map(|_| vec![degree_inequality_column_idx])
1570            .unwrap_or_default();
1571
1572        let (state_l, degree_state_l) = create_in_memory_state_table_with_watermark(
1573            mem_state.clone(),
1574            &[DataType::Int64, DataType::Int64],
1575            &[OrderType::ascending(), OrderType::ascending()],
1576            &[0, 1],
1577            0,
1578            l_degree_ineq_type,
1579            l_clean_watermark_indices,
1580            l_degree_clean_watermark_indices,
1581        )
1582        .await;
1583
1584        let (state_r, degree_state_r) = create_in_memory_state_table_with_watermark(
1585            mem_state,
1586            &[DataType::Int64, DataType::Int64],
1587            &[OrderType::ascending(), OrderType::ascending()],
1588            &[0, 1],
1589            2,
1590            r_degree_ineq_type,
1591            r_clean_watermark_indices,
1592            r_degree_clean_watermark_indices,
1593        )
1594        .await;
1595
1596        let schema = match T {
1597            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1598            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1599            _ => [source_l.schema().fields(), source_r.schema().fields()]
1600                .concat()
1601                .into_iter()
1602                .collect(),
1603        };
1604        let schema_len = schema.len();
1605        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1606
1607        let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1608            ActorContext::for_test(123),
1609            info,
1610            source_l,
1611            source_r,
1612            params_l,
1613            params_r,
1614            vec![null_safe],
1615            (0..schema_len).collect_vec(),
1616            cond,
1617            inequality_pairs,
1618            state_l,
1619            degree_state_l,
1620            state_r,
1621            degree_state_r,
1622            Arc::new(AtomicU64::new(0)),
1623            false,
1624            Arc::new(StreamingMetrics::unused()),
1625            1024,
1626            2048,
1627            vec![(0, true)],
1628        );
1629        (tx_l, tx_r, executor.boxed().execute())
1630    }
1631
1632    async fn create_classical_executor<const T: JoinTypePrimitive>(
1633        with_condition: bool,
1634        null_safe: bool,
1635        condition_text: Option<String>,
1636    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1637        create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1638    }
1639
1640    async fn create_append_only_executor<const T: JoinTypePrimitive>(
1641        with_condition: bool,
1642    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1643        let schema = Schema {
1644            fields: vec![
1645                Field::unnamed(DataType::Int64),
1646                Field::unnamed(DataType::Int64),
1647                Field::unnamed(DataType::Int64),
1648            ],
1649        };
1650        let (tx_l, source_l) = MockSource::channel();
1651        let source_l = source_l.into_executor(schema.clone(), vec![0]);
1652        let (tx_r, source_r) = MockSource::channel();
1653        let source_r = source_r.into_executor(schema, vec![0]);
1654        let params_l = JoinParams::new(vec![0, 1], vec![]);
1655        let params_r = JoinParams::new(vec![0, 1], vec![]);
1656        let cond = with_condition.then(|| create_cond(None));
1657
1658        let mem_state = MemoryStateStore::new();
1659
1660        let (state_l, degree_state_l) = create_in_memory_state_table(
1661            mem_state.clone(),
1662            &[DataType::Int64, DataType::Int64, DataType::Int64],
1663            &[
1664                OrderType::ascending(),
1665                OrderType::ascending(),
1666                OrderType::ascending(),
1667            ],
1668            &[0, 1, 0],
1669            0,
1670        )
1671        .await;
1672
1673        let (state_r, degree_state_r) = create_in_memory_state_table(
1674            mem_state,
1675            &[DataType::Int64, DataType::Int64, DataType::Int64],
1676            &[
1677                OrderType::ascending(),
1678                OrderType::ascending(),
1679                OrderType::ascending(),
1680            ],
1681            &[0, 1, 1],
1682            1,
1683        )
1684        .await;
1685
1686        let schema = match T {
1687            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1688            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1689            _ => [source_l.schema().fields(), source_r.schema().fields()]
1690                .concat()
1691                .into_iter()
1692                .collect(),
1693        };
1694        let schema_len = schema.len();
1695        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1696
1697        let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1698            ActorContext::for_test(123),
1699            info,
1700            source_l,
1701            source_r,
1702            params_l,
1703            params_r,
1704            vec![false],
1705            (0..schema_len).collect_vec(),
1706            cond,
1707            vec![],
1708            state_l,
1709            degree_state_l,
1710            state_r,
1711            degree_state_r,
1712            Arc::new(AtomicU64::new(0)),
1713            true,
1714            Arc::new(StreamingMetrics::unused()),
1715            1024,
1716            2048,
1717            vec![(0, true)],
1718        );
1719        (tx_l, tx_r, executor.boxed().execute())
1720    }
1721
1722    #[tokio::test]
1723    async fn test_inequality_join_watermark() -> StreamExecutorResult<()> {
1724        // Test inequality join with watermark-based state cleanup
1725        // Condition: left.col1 >= right.col1 (left side is larger, clean left state)
1726        // Left state rows with col1 < watermark will be cleaned
1727        let chunk_l1 = StreamChunk::from_pretty(
1728            "  I I
1729             + 2 4
1730             + 2 7
1731             + 3 8",
1732        );
1733        let chunk_r1 = StreamChunk::from_pretty(
1734            "  I I
1735             + 2 6",
1736        );
1737        let chunk_r2 = StreamChunk::from_pretty(
1738            "  I I
1739             + 2 3",
1740        );
1741        // Test with condition: left.col1 >= right.col1
1742        let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1743            true,
1744            false,
1745            Some(String::from(
1746                "(greater_than_or_equal:boolean $1:int8 $3:int8)",
1747            )),
1748            vec![InequalityPairInfo {
1749                left_idx: 1,
1750                right_idx: 1,
1751                clean_left_state: true, // left >= right, left is larger
1752                clean_right_state: false,
1753                op: InequalityType::GreaterThanOrEqual,
1754            }],
1755        )
1756        .await;
1757
1758        // push the init barrier for left and right
1759        tx_l.push_barrier(test_epoch(1), false);
1760        tx_r.push_barrier(test_epoch(1), false);
1761        hash_join.next_unwrap_ready_barrier()?;
1762
1763        // push the left chunk: (2, 4), (2, 7), (3, 8)
1764        tx_l.push_chunk(chunk_l1);
1765        hash_join.next_unwrap_pending();
1766
1767        // push watermarks: left=10, right=6
1768        // Output watermark is min(10, 6) = 6 for both output columns
1769        tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1770        hash_join.next_unwrap_pending();
1771
1772        tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1773        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1774        assert_eq!(
1775            output_watermark,
1776            Watermark::new(1, DataType::Int64, ScalarImpl::Int64(6))
1777        );
1778
1779        // After watermark, left state rows with col1 < 6 are cleaned
1780        // Row (2, 4) should be cleaned (4 < 6)
1781        // Row (2, 7) should remain (7 >= 6)
1782        // Row (3, 8) should remain (8 >= 6)
1783
1784        // push right chunk (2, 6)
1785        // Only (2, 7) can match: 7 >= 6 is TRUE
1786        // (2, 4) was cleaned, so it won't match
1787        tx_r.push_chunk(chunk_r1);
1788        let chunk = hash_join.next_unwrap_ready_chunk()?;
1789        assert_eq!(
1790            chunk,
1791            StreamChunk::from_pretty(
1792                " I I I I
1793                + 2 7 2 6"
1794            )
1795        );
1796
1797        // push right chunk (2, 3)
1798        // (2, 7) can match: 7 >= 3 is TRUE
1799        // (2, 4) was cleaned, won't match
1800        tx_r.push_chunk(chunk_r2);
1801        let chunk = hash_join.next_unwrap_ready_chunk()?;
1802        assert_eq!(
1803            chunk,
1804            StreamChunk::from_pretty(
1805                " I I I I
1806                + 2 7 2 3"
1807            )
1808        );
1809
1810        Ok(())
1811    }
1812
1813    #[tokio::test]
1814    async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1815        let chunk_l1 = StreamChunk::from_pretty(
1816            "  I I
1817             + 1 4
1818             + 2 5
1819             + 3 6",
1820        );
1821        let chunk_l2 = StreamChunk::from_pretty(
1822            "  I I
1823             + 3 8
1824             - 3 8",
1825        );
1826        let chunk_r1 = StreamChunk::from_pretty(
1827            "  I I
1828             + 2 7
1829             + 4 8
1830             + 6 9",
1831        );
1832        let chunk_r2 = StreamChunk::from_pretty(
1833            "  I  I
1834             + 3 10
1835             + 6 11",
1836        );
1837        let (mut tx_l, mut tx_r, mut hash_join) =
1838            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1839
1840        // push the init barrier for left and right
1841        tx_l.push_barrier(test_epoch(1), false);
1842        tx_r.push_barrier(test_epoch(1), false);
1843        hash_join.next_unwrap_ready_barrier()?;
1844
1845        // push the 1st left chunk
1846        tx_l.push_chunk(chunk_l1);
1847        hash_join.next_unwrap_pending();
1848
1849        // push the init barrier for left and right
1850        tx_l.push_barrier(test_epoch(2), false);
1851        tx_r.push_barrier(test_epoch(2), false);
1852        hash_join.next_unwrap_ready_barrier()?;
1853
1854        // push the 2nd left chunk
1855        tx_l.push_chunk(chunk_l2);
1856        hash_join.next_unwrap_pending();
1857
1858        // push the 1st right chunk
1859        tx_r.push_chunk(chunk_r1);
1860        let chunk = hash_join.next_unwrap_ready_chunk()?;
1861        assert_eq!(
1862            chunk,
1863            StreamChunk::from_pretty(
1864                " I I I I
1865                + 2 5 2 7"
1866            )
1867        );
1868
1869        // push the 2nd right chunk
1870        tx_r.push_chunk(chunk_r2);
1871        let chunk = hash_join.next_unwrap_ready_chunk()?;
1872        assert_eq!(
1873            chunk,
1874            StreamChunk::from_pretty(
1875                " I I I I
1876                + 3 6 3 10"
1877            )
1878        );
1879
1880        Ok(())
1881    }
1882
1883    #[tokio::test]
1884    async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1885        let chunk_l1 = StreamChunk::from_pretty(
1886            "  I I
1887             + 1 4
1888             + 2 5
1889             + . 6",
1890        );
1891        let chunk_l2 = StreamChunk::from_pretty(
1892            "  I I
1893             + . 8
1894             - . 8",
1895        );
1896        let chunk_r1 = StreamChunk::from_pretty(
1897            "  I I
1898             + 2 7
1899             + 4 8
1900             + 6 9",
1901        );
1902        let chunk_r2 = StreamChunk::from_pretty(
1903            "  I  I
1904             + . 10
1905             + 6 11",
1906        );
1907        let (mut tx_l, mut tx_r, mut hash_join) =
1908            create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1909
1910        // push the init barrier for left and right
1911        tx_l.push_barrier(test_epoch(1), false);
1912        tx_r.push_barrier(test_epoch(1), false);
1913        hash_join.next_unwrap_ready_barrier()?;
1914
1915        // push the 1st left chunk
1916        tx_l.push_chunk(chunk_l1);
1917        hash_join.next_unwrap_pending();
1918
1919        // push the init barrier for left and right
1920        tx_l.push_barrier(test_epoch(2), false);
1921        tx_r.push_barrier(test_epoch(2), false);
1922        hash_join.next_unwrap_ready_barrier()?;
1923
1924        // push the 2nd left chunk
1925        tx_l.push_chunk(chunk_l2);
1926        hash_join.next_unwrap_pending();
1927
1928        // push the 1st right chunk
1929        tx_r.push_chunk(chunk_r1);
1930        let chunk = hash_join.next_unwrap_ready_chunk()?;
1931        assert_eq!(
1932            chunk,
1933            StreamChunk::from_pretty(
1934                " I I I I
1935                + 2 5 2 7"
1936            )
1937        );
1938
1939        // push the 2nd right chunk
1940        tx_r.push_chunk(chunk_r2);
1941        let chunk = hash_join.next_unwrap_ready_chunk()?;
1942        assert_eq!(
1943            chunk,
1944            StreamChunk::from_pretty(
1945                " I I I I
1946                + . 6 . 10"
1947            )
1948        );
1949
1950        Ok(())
1951    }
1952
1953    #[tokio::test]
1954    async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1955        let chunk_l1 = StreamChunk::from_pretty(
1956            "  I I
1957             + 1 4
1958             + 2 5
1959             + 3 6",
1960        );
1961        let chunk_l2 = StreamChunk::from_pretty(
1962            "  I I
1963             + 3 8
1964             - 3 8",
1965        );
1966        let chunk_r1 = StreamChunk::from_pretty(
1967            "  I I
1968             + 2 7
1969             + 4 8
1970             + 6 9",
1971        );
1972        let chunk_r2 = StreamChunk::from_pretty(
1973            "  I  I
1974             + 3 10
1975             + 6 11",
1976        );
1977        let chunk_l3 = StreamChunk::from_pretty(
1978            "  I I
1979             + 6 10",
1980        );
1981        let chunk_r3 = StreamChunk::from_pretty(
1982            "  I  I
1983             - 6 11",
1984        );
1985        let chunk_r4 = StreamChunk::from_pretty(
1986            "  I  I
1987             - 6 9",
1988        );
1989        let (mut tx_l, mut tx_r, mut hash_join) =
1990            create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1991
1992        // push the init barrier for left and right
1993        tx_l.push_barrier(test_epoch(1), false);
1994        tx_r.push_barrier(test_epoch(1), false);
1995        hash_join.next_unwrap_ready_barrier()?;
1996
1997        // push the 1st left chunk
1998        tx_l.push_chunk(chunk_l1);
1999        hash_join.next_unwrap_pending();
2000
2001        // push the init barrier for left and right
2002        tx_l.push_barrier(test_epoch(2), false);
2003        tx_r.push_barrier(test_epoch(2), false);
2004        hash_join.next_unwrap_ready_barrier()?;
2005
2006        // push the 2nd left chunk
2007        tx_l.push_chunk(chunk_l2);
2008        hash_join.next_unwrap_pending();
2009
2010        // push the 1st right chunk
2011        tx_r.push_chunk(chunk_r1);
2012        let chunk = hash_join.next_unwrap_ready_chunk()?;
2013        assert_eq!(
2014            chunk,
2015            StreamChunk::from_pretty(
2016                " I I
2017                + 2 5"
2018            )
2019        );
2020
2021        // push the 2nd right chunk
2022        tx_r.push_chunk(chunk_r2);
2023        let chunk = hash_join.next_unwrap_ready_chunk()?;
2024        assert_eq!(
2025            chunk,
2026            StreamChunk::from_pretty(
2027                " I I
2028                + 3 6"
2029            )
2030        );
2031
2032        // push the 3rd left chunk (tests forward_exactly_once)
2033        tx_l.push_chunk(chunk_l3);
2034        let chunk = hash_join.next_unwrap_ready_chunk()?;
2035        assert_eq!(
2036            chunk,
2037            StreamChunk::from_pretty(
2038                " I I
2039                + 6 10"
2040            )
2041        );
2042
2043        // push the 3rd right chunk
2044        // (tests that no change if there are still matches)
2045        tx_r.push_chunk(chunk_r3);
2046        hash_join.next_unwrap_pending();
2047
2048        // push the 3rd left chunk
2049        // (tests that deletion occurs when there are no more matches)
2050        tx_r.push_chunk(chunk_r4);
2051        let chunk = hash_join.next_unwrap_ready_chunk()?;
2052        assert_eq!(
2053            chunk,
2054            StreamChunk::from_pretty(
2055                " I I
2056                - 6 10"
2057            )
2058        );
2059
2060        Ok(())
2061    }
2062
2063    #[tokio::test]
2064    async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
2065        let chunk_l1 = StreamChunk::from_pretty(
2066            "  I I
2067             + 1 4
2068             + 2 5
2069             + . 6",
2070        );
2071        let chunk_l2 = StreamChunk::from_pretty(
2072            "  I I
2073             + . 8
2074             - . 8",
2075        );
2076        let chunk_r1 = StreamChunk::from_pretty(
2077            "  I I
2078             + 2 7
2079             + 4 8
2080             + 6 9",
2081        );
2082        let chunk_r2 = StreamChunk::from_pretty(
2083            "  I  I
2084             + . 10
2085             + 6 11",
2086        );
2087        let chunk_l3 = StreamChunk::from_pretty(
2088            "  I I
2089             + 6 10",
2090        );
2091        let chunk_r3 = StreamChunk::from_pretty(
2092            "  I  I
2093             - 6 11",
2094        );
2095        let chunk_r4 = StreamChunk::from_pretty(
2096            "  I  I
2097             - 6 9",
2098        );
2099        let (mut tx_l, mut tx_r, mut hash_join) =
2100            create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
2101
2102        // push the init barrier for left and right
2103        tx_l.push_barrier(test_epoch(1), false);
2104        tx_r.push_barrier(test_epoch(1), false);
2105        hash_join.next_unwrap_ready_barrier()?;
2106
2107        // push the 1st left chunk
2108        tx_l.push_chunk(chunk_l1);
2109        hash_join.next_unwrap_pending();
2110
2111        // push the init barrier for left and right
2112        tx_l.push_barrier(test_epoch(2), false);
2113        tx_r.push_barrier(test_epoch(2), false);
2114        hash_join.next_unwrap_ready_barrier()?;
2115
2116        // push the 2nd left chunk
2117        tx_l.push_chunk(chunk_l2);
2118        hash_join.next_unwrap_pending();
2119
2120        // push the 1st right chunk
2121        tx_r.push_chunk(chunk_r1);
2122        let chunk = hash_join.next_unwrap_ready_chunk()?;
2123        assert_eq!(
2124            chunk,
2125            StreamChunk::from_pretty(
2126                " I I
2127                + 2 5"
2128            )
2129        );
2130
2131        // push the 2nd right chunk
2132        tx_r.push_chunk(chunk_r2);
2133        let chunk = hash_join.next_unwrap_ready_chunk()?;
2134        assert_eq!(
2135            chunk,
2136            StreamChunk::from_pretty(
2137                " I I
2138                + . 6"
2139            )
2140        );
2141
2142        // push the 3rd left chunk (tests forward_exactly_once)
2143        tx_l.push_chunk(chunk_l3);
2144        let chunk = hash_join.next_unwrap_ready_chunk()?;
2145        assert_eq!(
2146            chunk,
2147            StreamChunk::from_pretty(
2148                " I I
2149                + 6 10"
2150            )
2151        );
2152
2153        // push the 3rd right chunk
2154        // (tests that no change if there are still matches)
2155        tx_r.push_chunk(chunk_r3);
2156        hash_join.next_unwrap_pending();
2157
2158        // push the 3rd left chunk
2159        // (tests that deletion occurs when there are no more matches)
2160        tx_r.push_chunk(chunk_r4);
2161        let chunk = hash_join.next_unwrap_ready_chunk()?;
2162        assert_eq!(
2163            chunk,
2164            StreamChunk::from_pretty(
2165                " I I
2166                - 6 10"
2167            )
2168        );
2169
2170        Ok(())
2171    }
2172
2173    #[tokio::test]
2174    async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
2175        let chunk_l1 = StreamChunk::from_pretty(
2176            "  I I I
2177             + 1 4 1
2178             + 2 5 2
2179             + 3 6 3",
2180        );
2181        let chunk_l2 = StreamChunk::from_pretty(
2182            "  I I I
2183             + 4 9 4
2184             + 5 10 5",
2185        );
2186        let chunk_r1 = StreamChunk::from_pretty(
2187            "  I I I
2188             + 2 5 1
2189             + 4 9 2
2190             + 6 9 3",
2191        );
2192        let chunk_r2 = StreamChunk::from_pretty(
2193            "  I I I
2194             + 1 4 4
2195             + 3 6 5",
2196        );
2197
2198        let (mut tx_l, mut tx_r, mut hash_join) =
2199            create_append_only_executor::<{ JoinType::Inner }>(false).await;
2200
2201        // push the init barrier for left and right
2202        tx_l.push_barrier(test_epoch(1), false);
2203        tx_r.push_barrier(test_epoch(1), false);
2204        hash_join.next_unwrap_ready_barrier()?;
2205
2206        // push the 1st left chunk
2207        tx_l.push_chunk(chunk_l1);
2208        hash_join.next_unwrap_pending();
2209
2210        // push the init barrier for left and right
2211        tx_l.push_barrier(test_epoch(2), false);
2212        tx_r.push_barrier(test_epoch(2), false);
2213        hash_join.next_unwrap_ready_barrier()?;
2214
2215        // push the 2nd left chunk
2216        tx_l.push_chunk(chunk_l2);
2217        hash_join.next_unwrap_pending();
2218
2219        // push the 1st right chunk
2220        tx_r.push_chunk(chunk_r1);
2221        let chunk = hash_join.next_unwrap_ready_chunk()?;
2222        assert_eq!(
2223            chunk,
2224            StreamChunk::from_pretty(
2225                " I I I I I I
2226                + 2 5 2 2 5 1
2227                + 4 9 4 4 9 2"
2228            )
2229        );
2230
2231        // push the 2nd right chunk
2232        tx_r.push_chunk(chunk_r2);
2233        let chunk = hash_join.next_unwrap_ready_chunk()?;
2234        assert_eq!(
2235            chunk,
2236            StreamChunk::from_pretty(
2237                " I I I I I I
2238                + 1 4 1 1 4 4
2239                + 3 6 3 3 6 5"
2240            )
2241        );
2242
2243        Ok(())
2244    }
2245
2246    #[tokio::test]
2247    async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2248        let chunk_l1 = StreamChunk::from_pretty(
2249            "  I I I
2250             + 1 4 1
2251             + 2 5 2
2252             + 3 6 3",
2253        );
2254        let chunk_l2 = StreamChunk::from_pretty(
2255            "  I I I
2256             + 4 9 4
2257             + 5 10 5",
2258        );
2259        let chunk_r1 = StreamChunk::from_pretty(
2260            "  I I I
2261             + 2 5 1
2262             + 4 9 2
2263             + 6 9 3",
2264        );
2265        let chunk_r2 = StreamChunk::from_pretty(
2266            "  I I I
2267             + 1 4 4
2268             + 3 6 5",
2269        );
2270
2271        let (mut tx_l, mut tx_r, mut hash_join) =
2272            create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2273
2274        // push the init barrier for left and right
2275        tx_l.push_barrier(test_epoch(1), false);
2276        tx_r.push_barrier(test_epoch(1), false);
2277        hash_join.next_unwrap_ready_barrier()?;
2278
2279        // push the 1st left chunk
2280        tx_l.push_chunk(chunk_l1);
2281        hash_join.next_unwrap_pending();
2282
2283        // push the init barrier for left and right
2284        tx_l.push_barrier(test_epoch(2), false);
2285        tx_r.push_barrier(test_epoch(2), false);
2286        hash_join.next_unwrap_ready_barrier()?;
2287
2288        // push the 2nd left chunk
2289        tx_l.push_chunk(chunk_l2);
2290        hash_join.next_unwrap_pending();
2291
2292        // push the 1st right chunk
2293        tx_r.push_chunk(chunk_r1);
2294        let chunk = hash_join.next_unwrap_ready_chunk()?;
2295        assert_eq!(
2296            chunk,
2297            StreamChunk::from_pretty(
2298                " I I I
2299                + 2 5 2
2300                + 4 9 4"
2301            )
2302        );
2303
2304        // push the 2nd right chunk
2305        tx_r.push_chunk(chunk_r2);
2306        let chunk = hash_join.next_unwrap_ready_chunk()?;
2307        assert_eq!(
2308            chunk,
2309            StreamChunk::from_pretty(
2310                " I I I
2311                + 1 4 1
2312                + 3 6 3"
2313            )
2314        );
2315
2316        Ok(())
2317    }
2318
2319    #[tokio::test]
2320    async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2321        let chunk_l1 = StreamChunk::from_pretty(
2322            "  I I I
2323             + 1 4 1
2324             + 2 5 2
2325             + 3 6 3",
2326        );
2327        let chunk_l2 = StreamChunk::from_pretty(
2328            "  I I I
2329             + 4 9 4
2330             + 5 10 5",
2331        );
2332        let chunk_r1 = StreamChunk::from_pretty(
2333            "  I I I
2334             + 2 5 1
2335             + 4 9 2
2336             + 6 9 3",
2337        );
2338        let chunk_r2 = StreamChunk::from_pretty(
2339            "  I I I
2340             + 1 4 4
2341             + 3 6 5",
2342        );
2343
2344        let (mut tx_l, mut tx_r, mut hash_join) =
2345            create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2346
2347        // push the init barrier for left and right
2348        tx_l.push_barrier(test_epoch(1), false);
2349        tx_r.push_barrier(test_epoch(1), false);
2350        hash_join.next_unwrap_ready_barrier()?;
2351
2352        // push the 1st left chunk
2353        tx_l.push_chunk(chunk_l1);
2354        hash_join.next_unwrap_pending();
2355
2356        // push the init barrier for left and right
2357        tx_l.push_barrier(test_epoch(2), false);
2358        tx_r.push_barrier(test_epoch(2), false);
2359        hash_join.next_unwrap_ready_barrier()?;
2360
2361        // push the 2nd left chunk
2362        tx_l.push_chunk(chunk_l2);
2363        hash_join.next_unwrap_pending();
2364
2365        // push the 1st right chunk
2366        tx_r.push_chunk(chunk_r1);
2367        let chunk = hash_join.next_unwrap_ready_chunk()?;
2368        assert_eq!(
2369            chunk,
2370            StreamChunk::from_pretty(
2371                " I I I
2372                + 2 5 1
2373                + 4 9 2"
2374            )
2375        );
2376
2377        // push the 2nd right chunk
2378        tx_r.push_chunk(chunk_r2);
2379        let chunk = hash_join.next_unwrap_ready_chunk()?;
2380        assert_eq!(
2381            chunk,
2382            StreamChunk::from_pretty(
2383                " I I I
2384                + 1 4 4
2385                + 3 6 5"
2386            )
2387        );
2388
2389        Ok(())
2390    }
2391
2392    #[tokio::test]
2393    async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2394        let chunk_r1 = StreamChunk::from_pretty(
2395            "  I I
2396             + 1 4
2397             + 2 5
2398             + 3 6",
2399        );
2400        let chunk_r2 = StreamChunk::from_pretty(
2401            "  I I
2402             + 3 8
2403             - 3 8",
2404        );
2405        let chunk_l1 = StreamChunk::from_pretty(
2406            "  I I
2407             + 2 7
2408             + 4 8
2409             + 6 9",
2410        );
2411        let chunk_l2 = StreamChunk::from_pretty(
2412            "  I  I
2413             + 3 10
2414             + 6 11",
2415        );
2416        let chunk_r3 = StreamChunk::from_pretty(
2417            "  I I
2418             + 6 10",
2419        );
2420        let chunk_l3 = StreamChunk::from_pretty(
2421            "  I  I
2422             - 6 11",
2423        );
2424        let chunk_l4 = StreamChunk::from_pretty(
2425            "  I  I
2426             - 6 9",
2427        );
2428        let (mut tx_l, mut tx_r, mut hash_join) =
2429            create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2430
2431        // push the init barrier for left and right
2432        tx_l.push_barrier(test_epoch(1), false);
2433        tx_r.push_barrier(test_epoch(1), false);
2434        hash_join.next_unwrap_ready_barrier()?;
2435
2436        // push the 1st right chunk
2437        tx_r.push_chunk(chunk_r1);
2438        hash_join.next_unwrap_pending();
2439
2440        // push the init barrier for left and right
2441        tx_l.push_barrier(test_epoch(2), false);
2442        tx_r.push_barrier(test_epoch(2), false);
2443        hash_join.next_unwrap_ready_barrier()?;
2444
2445        // push the 2nd right chunk
2446        tx_r.push_chunk(chunk_r2);
2447        hash_join.next_unwrap_pending();
2448
2449        // push the 1st left chunk
2450        tx_l.push_chunk(chunk_l1);
2451        let chunk = hash_join.next_unwrap_ready_chunk()?;
2452        assert_eq!(
2453            chunk,
2454            StreamChunk::from_pretty(
2455                " I I
2456                + 2 5"
2457            )
2458        );
2459
2460        // push the 2nd left chunk
2461        tx_l.push_chunk(chunk_l2);
2462        let chunk = hash_join.next_unwrap_ready_chunk()?;
2463        assert_eq!(
2464            chunk,
2465            StreamChunk::from_pretty(
2466                " I I
2467                + 3 6"
2468            )
2469        );
2470
2471        // push the 3rd right chunk (tests forward_exactly_once)
2472        tx_r.push_chunk(chunk_r3);
2473        let chunk = hash_join.next_unwrap_ready_chunk()?;
2474        assert_eq!(
2475            chunk,
2476            StreamChunk::from_pretty(
2477                " I I
2478                + 6 10"
2479            )
2480        );
2481
2482        // push the 3rd left chunk
2483        // (tests that no change if there are still matches)
2484        tx_l.push_chunk(chunk_l3);
2485        hash_join.next_unwrap_pending();
2486
2487        // push the 3rd right chunk
2488        // (tests that deletion occurs when there are no more matches)
2489        tx_l.push_chunk(chunk_l4);
2490        let chunk = hash_join.next_unwrap_ready_chunk()?;
2491        assert_eq!(
2492            chunk,
2493            StreamChunk::from_pretty(
2494                " I I
2495                - 6 10"
2496            )
2497        );
2498
2499        Ok(())
2500    }
2501
2502    #[tokio::test]
2503    async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2504        let chunk_l1 = StreamChunk::from_pretty(
2505            "  I I
2506             + 1 4
2507             + 2 5
2508             + 3 6",
2509        );
2510        let chunk_l2 = StreamChunk::from_pretty(
2511            "  I I
2512             + 3 8
2513             - 3 8",
2514        );
2515        let chunk_r1 = StreamChunk::from_pretty(
2516            "  I I
2517             + 2 7
2518             + 4 8
2519             + 6 9",
2520        );
2521        let chunk_r2 = StreamChunk::from_pretty(
2522            "  I  I
2523             + 3 10
2524             + 6 11
2525             + 1 2
2526             + 1 3",
2527        );
2528        let chunk_l3 = StreamChunk::from_pretty(
2529            "  I I
2530             + 9 10",
2531        );
2532        let chunk_r3 = StreamChunk::from_pretty(
2533            "  I I
2534             - 1 2",
2535        );
2536        let chunk_r4 = StreamChunk::from_pretty(
2537            "  I I
2538             - 1 3",
2539        );
2540        let (mut tx_l, mut tx_r, mut hash_join) =
2541            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2542
2543        // push the init barrier for left and right
2544        tx_l.push_barrier(test_epoch(1), false);
2545        tx_r.push_barrier(test_epoch(1), false);
2546        hash_join.next_unwrap_ready_barrier()?;
2547
2548        // push the 1st left chunk
2549        tx_l.push_chunk(chunk_l1);
2550        let chunk = hash_join.next_unwrap_ready_chunk()?;
2551        assert_eq!(
2552            chunk,
2553            StreamChunk::from_pretty(
2554                " I I
2555                + 1 4
2556                + 2 5
2557                + 3 6",
2558            )
2559        );
2560
2561        // push the init barrier for left and right
2562        tx_l.push_barrier(test_epoch(2), false);
2563        tx_r.push_barrier(test_epoch(2), false);
2564        hash_join.next_unwrap_ready_barrier()?;
2565
2566        // push the 2nd left chunk
2567        tx_l.push_chunk(chunk_l2);
2568        let chunk = hash_join.next_unwrap_ready_chunk()?;
2569        assert_eq!(
2570            chunk,
2571            StreamChunk::from_pretty(
2572                "  I I
2573                 + 3 8 D
2574                 - 3 8 D",
2575            )
2576        );
2577
2578        // push the 1st right chunk
2579        tx_r.push_chunk(chunk_r1);
2580        let chunk = hash_join.next_unwrap_ready_chunk()?;
2581        assert_eq!(
2582            chunk,
2583            StreamChunk::from_pretty(
2584                " I I
2585                - 2 5"
2586            )
2587        );
2588
2589        // push the 2nd right chunk
2590        tx_r.push_chunk(chunk_r2);
2591        let chunk = hash_join.next_unwrap_ready_chunk()?;
2592        assert_eq!(
2593            chunk,
2594            StreamChunk::from_pretty(
2595                " I I
2596                - 3 6
2597                - 1 4"
2598            )
2599        );
2600
2601        // push the 3rd left chunk (tests forward_exactly_once)
2602        tx_l.push_chunk(chunk_l3);
2603        let chunk = hash_join.next_unwrap_ready_chunk()?;
2604        assert_eq!(
2605            chunk,
2606            StreamChunk::from_pretty(
2607                " I I
2608                + 9 10"
2609            )
2610        );
2611
2612        // push the 3rd right chunk
2613        // (tests that no change if there are still matches)
2614        tx_r.push_chunk(chunk_r3);
2615        hash_join.next_unwrap_pending();
2616
2617        // push the 4th right chunk
2618        // (tests that insertion occurs when there are no more matches)
2619        tx_r.push_chunk(chunk_r4);
2620        let chunk = hash_join.next_unwrap_ready_chunk()?;
2621        assert_eq!(
2622            chunk,
2623            StreamChunk::from_pretty(
2624                " I I
2625                + 1 4"
2626            )
2627        );
2628
2629        Ok(())
2630    }
2631
2632    #[tokio::test]
2633    async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2634        let chunk_r1 = StreamChunk::from_pretty(
2635            "  I I
2636             + 1 4
2637             + 2 5
2638             + 3 6",
2639        );
2640        let chunk_r2 = StreamChunk::from_pretty(
2641            "  I I
2642             + 3 8
2643             - 3 8",
2644        );
2645        let chunk_l1 = StreamChunk::from_pretty(
2646            "  I I
2647             + 2 7
2648             + 4 8
2649             + 6 9",
2650        );
2651        let chunk_l2 = StreamChunk::from_pretty(
2652            "  I  I
2653             + 3 10
2654             + 6 11
2655             + 1 2
2656             + 1 3",
2657        );
2658        let chunk_r3 = StreamChunk::from_pretty(
2659            "  I I
2660             + 9 10",
2661        );
2662        let chunk_l3 = StreamChunk::from_pretty(
2663            "  I I
2664             - 1 2",
2665        );
2666        let chunk_l4 = StreamChunk::from_pretty(
2667            "  I I
2668             - 1 3",
2669        );
2670        let (mut tx_r, mut tx_l, mut hash_join) =
2671            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2672
2673        // push the init barrier for left and right
2674        tx_r.push_barrier(test_epoch(1), false);
2675        tx_l.push_barrier(test_epoch(1), false);
2676        hash_join.next_unwrap_ready_barrier()?;
2677
2678        // push the 1st right chunk
2679        tx_r.push_chunk(chunk_r1);
2680        let chunk = hash_join.next_unwrap_ready_chunk()?;
2681        assert_eq!(
2682            chunk,
2683            StreamChunk::from_pretty(
2684                " I I
2685                + 1 4
2686                + 2 5
2687                + 3 6",
2688            )
2689        );
2690
2691        // push the init barrier for left and right
2692        tx_r.push_barrier(test_epoch(2), false);
2693        tx_l.push_barrier(test_epoch(2), false);
2694        hash_join.next_unwrap_ready_barrier()?;
2695
2696        // push the 2nd right chunk
2697        tx_r.push_chunk(chunk_r2);
2698        let chunk = hash_join.next_unwrap_ready_chunk()?;
2699        assert_eq!(
2700            chunk,
2701            StreamChunk::from_pretty(
2702                "  I I
2703                 + 3 8 D
2704                 - 3 8 D",
2705            )
2706        );
2707
2708        // push the 1st left chunk
2709        tx_l.push_chunk(chunk_l1);
2710        let chunk = hash_join.next_unwrap_ready_chunk()?;
2711        assert_eq!(
2712            chunk,
2713            StreamChunk::from_pretty(
2714                " I I
2715                - 2 5"
2716            )
2717        );
2718
2719        // push the 2nd left chunk
2720        tx_l.push_chunk(chunk_l2);
2721        let chunk = hash_join.next_unwrap_ready_chunk()?;
2722        assert_eq!(
2723            chunk,
2724            StreamChunk::from_pretty(
2725                " I I
2726                - 3 6
2727                - 1 4"
2728            )
2729        );
2730
2731        // push the 3rd right chunk (tests forward_exactly_once)
2732        tx_r.push_chunk(chunk_r3);
2733        let chunk = hash_join.next_unwrap_ready_chunk()?;
2734        assert_eq!(
2735            chunk,
2736            StreamChunk::from_pretty(
2737                " I I
2738                + 9 10"
2739            )
2740        );
2741
2742        // push the 3rd left chunk
2743        // (tests that no change if there are still matches)
2744        tx_l.push_chunk(chunk_l3);
2745        hash_join.next_unwrap_pending();
2746
2747        // push the 4th left chunk
2748        // (tests that insertion occurs when there are no more matches)
2749        tx_l.push_chunk(chunk_l4);
2750        let chunk = hash_join.next_unwrap_ready_chunk()?;
2751        assert_eq!(
2752            chunk,
2753            StreamChunk::from_pretty(
2754                " I I
2755                + 1 4"
2756            )
2757        );
2758
2759        Ok(())
2760    }
2761
2762    #[tokio::test]
2763    async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2764        let chunk_l1 = StreamChunk::from_pretty(
2765            "  I I
2766             + 1 4
2767             + 2 5
2768             + 3 6",
2769        );
2770        let chunk_l2 = StreamChunk::from_pretty(
2771            "  I I
2772             + 6 8
2773             + 3 8",
2774        );
2775        let chunk_r1 = StreamChunk::from_pretty(
2776            "  I I
2777             + 2 7
2778             + 4 8
2779             + 6 9",
2780        );
2781        let chunk_r2 = StreamChunk::from_pretty(
2782            "  I  I
2783             + 3 10
2784             + 6 11",
2785        );
2786        let (mut tx_l, mut tx_r, mut hash_join) =
2787            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2788
2789        // push the init barrier for left and right
2790        tx_l.push_barrier(test_epoch(1), false);
2791        tx_r.push_barrier(test_epoch(1), false);
2792        hash_join.next_unwrap_ready_barrier()?;
2793
2794        // push the 1st left chunk
2795        tx_l.push_chunk(chunk_l1);
2796        hash_join.next_unwrap_pending();
2797
2798        // push a barrier to left side
2799        tx_l.push_barrier(test_epoch(2), false);
2800
2801        // push the 2nd left chunk
2802        tx_l.push_chunk(chunk_l2);
2803
2804        // join the first right chunk
2805        tx_r.push_chunk(chunk_r1);
2806
2807        // Consume stream chunk
2808        let chunk = hash_join.next_unwrap_ready_chunk()?;
2809        assert_eq!(
2810            chunk,
2811            StreamChunk::from_pretty(
2812                " I I I I
2813                + 2 5 2 7"
2814            )
2815        );
2816
2817        // push a barrier to right side
2818        tx_r.push_barrier(test_epoch(2), false);
2819
2820        // get the aligned barrier here
2821        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2822        assert!(matches!(
2823            hash_join.next_unwrap_ready_barrier()?,
2824            Barrier {
2825                epoch,
2826                mutation: None,
2827                ..
2828            } if epoch == expected_epoch
2829        ));
2830
2831        // join the 2nd left chunk
2832        let chunk = hash_join.next_unwrap_ready_chunk()?;
2833        assert_eq!(
2834            chunk,
2835            StreamChunk::from_pretty(
2836                " I I I I
2837                + 6 8 6 9"
2838            )
2839        );
2840
2841        // push the 2nd right chunk
2842        tx_r.push_chunk(chunk_r2);
2843        let chunk = hash_join.next_unwrap_ready_chunk()?;
2844        assert_eq!(
2845            chunk,
2846            StreamChunk::from_pretty(
2847                " I I I I
2848                + 3 6 3 10
2849                + 3 8 3 10
2850                + 6 8 6 11"
2851            )
2852        );
2853
2854        Ok(())
2855    }
2856
2857    #[tokio::test]
2858    async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2859        let chunk_l1 = StreamChunk::from_pretty(
2860            "  I I
2861             + 1 4
2862             + 2 .
2863             + 3 .",
2864        );
2865        let chunk_l2 = StreamChunk::from_pretty(
2866            "  I I
2867             + 6 .
2868             + 3 8",
2869        );
2870        let chunk_r1 = StreamChunk::from_pretty(
2871            "  I I
2872             + 2 7
2873             + 4 8
2874             + 6 9",
2875        );
2876        let chunk_r2 = StreamChunk::from_pretty(
2877            "  I  I
2878             + 3 10
2879             + 6 11",
2880        );
2881        let (mut tx_l, mut tx_r, mut hash_join) =
2882            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2883
2884        // push the init barrier for left and right
2885        tx_l.push_barrier(test_epoch(1), false);
2886        tx_r.push_barrier(test_epoch(1), false);
2887        hash_join.next_unwrap_ready_barrier()?;
2888
2889        // push the 1st left chunk
2890        tx_l.push_chunk(chunk_l1);
2891        hash_join.next_unwrap_pending();
2892
2893        // push a barrier to left side
2894        tx_l.push_barrier(test_epoch(2), false);
2895
2896        // push the 2nd left chunk
2897        tx_l.push_chunk(chunk_l2);
2898
2899        // join the first right chunk
2900        tx_r.push_chunk(chunk_r1);
2901
2902        // Consume stream chunk
2903        let chunk = hash_join.next_unwrap_ready_chunk()?;
2904        assert_eq!(
2905            chunk,
2906            StreamChunk::from_pretty(
2907                " I I I I
2908                + 2 . 2 7"
2909            )
2910        );
2911
2912        // push a barrier to right side
2913        tx_r.push_barrier(test_epoch(2), false);
2914
2915        // get the aligned barrier here
2916        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2917        assert!(matches!(
2918            hash_join.next_unwrap_ready_barrier()?,
2919            Barrier {
2920                epoch,
2921                mutation: None,
2922                ..
2923            } if epoch == expected_epoch
2924        ));
2925
2926        // join the 2nd left chunk
2927        let chunk = hash_join.next_unwrap_ready_chunk()?;
2928        assert_eq!(
2929            chunk,
2930            StreamChunk::from_pretty(
2931                " I I I I
2932                + 6 . 6 9"
2933            )
2934        );
2935
2936        // push the 2nd right chunk
2937        tx_r.push_chunk(chunk_r2);
2938        let chunk = hash_join.next_unwrap_ready_chunk()?;
2939        assert_eq!(
2940            chunk,
2941            StreamChunk::from_pretty(
2942                " I I I I
2943                + 3 8 3 10
2944                + 3 . 3 10
2945                + 6 . 6 11"
2946            )
2947        );
2948
2949        Ok(())
2950    }
2951
2952    #[tokio::test]
2953    async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2954        let chunk_l1 = StreamChunk::from_pretty(
2955            "  I I
2956             + 1 4
2957             + 2 5
2958             + 3 6",
2959        );
2960        let chunk_l2 = StreamChunk::from_pretty(
2961            "  I I
2962             + 3 8
2963             - 3 8",
2964        );
2965        let chunk_r1 = StreamChunk::from_pretty(
2966            "  I I
2967             + 2 7
2968             + 4 8
2969             + 6 9",
2970        );
2971        let chunk_r2 = StreamChunk::from_pretty(
2972            "  I  I
2973             + 3 10
2974             + 6 11",
2975        );
2976        let (mut tx_l, mut tx_r, mut hash_join) =
2977            create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2978
2979        // push the init barrier for left and right
2980        tx_l.push_barrier(test_epoch(1), false);
2981        tx_r.push_barrier(test_epoch(1), false);
2982        hash_join.next_unwrap_ready_barrier()?;
2983
2984        // push the 1st left chunk
2985        tx_l.push_chunk(chunk_l1);
2986        let chunk = hash_join.next_unwrap_ready_chunk()?;
2987        assert_eq!(
2988            chunk,
2989            StreamChunk::from_pretty(
2990                " I I I I
2991                + 1 4 . .
2992                + 2 5 . .
2993                + 3 6 . ."
2994            )
2995        );
2996
2997        // push the 2nd left chunk
2998        tx_l.push_chunk(chunk_l2);
2999        let chunk = hash_join.next_unwrap_ready_chunk()?;
3000        assert_eq!(
3001            chunk,
3002            StreamChunk::from_pretty(
3003                " I I I I
3004                + 3 8 . . D
3005                - 3 8 . . D"
3006            )
3007        );
3008
3009        // push the 1st right chunk
3010        tx_r.push_chunk(chunk_r1);
3011        let chunk = hash_join.next_unwrap_ready_chunk()?;
3012        assert_eq!(
3013            chunk,
3014            StreamChunk::from_pretty(
3015                " I I I I
3016                - 2 5 . .
3017                + 2 5 2 7"
3018            )
3019        );
3020
3021        // push the 2nd right chunk
3022        tx_r.push_chunk(chunk_r2);
3023        let chunk = hash_join.next_unwrap_ready_chunk()?;
3024        assert_eq!(
3025            chunk,
3026            StreamChunk::from_pretty(
3027                " I I I I
3028                - 3 6 . .
3029                + 3 6 3 10"
3030            )
3031        );
3032
3033        Ok(())
3034    }
3035
3036    #[tokio::test]
3037    async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
3038        let chunk_l1 = StreamChunk::from_pretty(
3039            "  I I
3040             + 1 4
3041             + 2 5
3042             + . 6",
3043        );
3044        let chunk_l2 = StreamChunk::from_pretty(
3045            "  I I
3046             + . 8
3047             - . 8",
3048        );
3049        let chunk_r1 = StreamChunk::from_pretty(
3050            "  I I
3051             + 2 7
3052             + 4 8
3053             + 6 9",
3054        );
3055        let chunk_r2 = StreamChunk::from_pretty(
3056            "  I  I
3057             + . 10
3058             + 6 11",
3059        );
3060        let (mut tx_l, mut tx_r, mut hash_join) =
3061            create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
3062
3063        // push the init barrier for left and right
3064        tx_l.push_barrier(test_epoch(1), false);
3065        tx_r.push_barrier(test_epoch(1), false);
3066        hash_join.next_unwrap_ready_barrier()?;
3067
3068        // push the 1st left chunk
3069        tx_l.push_chunk(chunk_l1);
3070        let chunk = hash_join.next_unwrap_ready_chunk()?;
3071        assert_eq!(
3072            chunk,
3073            StreamChunk::from_pretty(
3074                " I I I I
3075                + 1 4 . .
3076                + 2 5 . .
3077                + . 6 . ."
3078            )
3079        );
3080
3081        // push the 2nd left chunk
3082        tx_l.push_chunk(chunk_l2);
3083        let chunk = hash_join.next_unwrap_ready_chunk()?;
3084        assert_eq!(
3085            chunk,
3086            StreamChunk::from_pretty(
3087                " I I I I
3088                + . 8 . . D
3089                - . 8 . . D"
3090            )
3091        );
3092
3093        // push the 1st right chunk
3094        tx_r.push_chunk(chunk_r1);
3095        let chunk = hash_join.next_unwrap_ready_chunk()?;
3096        assert_eq!(
3097            chunk,
3098            StreamChunk::from_pretty(
3099                " I I I I
3100                - 2 5 . .
3101                + 2 5 2 7"
3102            )
3103        );
3104
3105        // push the 2nd right chunk
3106        tx_r.push_chunk(chunk_r2);
3107        let chunk = hash_join.next_unwrap_ready_chunk()?;
3108        assert_eq!(
3109            chunk,
3110            StreamChunk::from_pretty(
3111                " I I I I
3112                - . 6 . .
3113                + . 6 . 10"
3114            )
3115        );
3116
3117        Ok(())
3118    }
3119
3120    #[tokio::test]
3121    async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
3122        let chunk_l1 = StreamChunk::from_pretty(
3123            "  I I
3124             + 1 4
3125             + 2 5
3126             + 3 6",
3127        );
3128        let chunk_l2 = StreamChunk::from_pretty(
3129            "  I I
3130             + 3 8
3131             - 3 8",
3132        );
3133        let chunk_r1 = StreamChunk::from_pretty(
3134            "  I I
3135             + 2 7
3136             + 4 8
3137             + 6 9",
3138        );
3139        let chunk_r2 = StreamChunk::from_pretty(
3140            "  I  I
3141             + 5 10
3142             - 5 10",
3143        );
3144        let (mut tx_l, mut tx_r, mut hash_join) =
3145            create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
3146
3147        // push the init barrier for left and right
3148        tx_l.push_barrier(test_epoch(1), false);
3149        tx_r.push_barrier(test_epoch(1), false);
3150        hash_join.next_unwrap_ready_barrier()?;
3151
3152        // push the 1st left chunk
3153        tx_l.push_chunk(chunk_l1);
3154        hash_join.next_unwrap_pending();
3155
3156        // push the 2nd left chunk
3157        tx_l.push_chunk(chunk_l2);
3158        hash_join.next_unwrap_pending();
3159
3160        // push the 1st right chunk
3161        tx_r.push_chunk(chunk_r1);
3162        let chunk = hash_join.next_unwrap_ready_chunk()?;
3163        assert_eq!(
3164            chunk,
3165            StreamChunk::from_pretty(
3166                " I I I I
3167                + 2 5 2 7
3168                + . . 4 8
3169                + . . 6 9"
3170            )
3171        );
3172
3173        // push the 2nd right chunk
3174        tx_r.push_chunk(chunk_r2);
3175        let chunk = hash_join.next_unwrap_ready_chunk()?;
3176        assert_eq!(
3177            chunk,
3178            StreamChunk::from_pretty(
3179                " I I I I
3180                + . . 5 10 D
3181                - . . 5 10 D"
3182            )
3183        );
3184
3185        Ok(())
3186    }
3187
3188    #[tokio::test]
3189    async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3190        let chunk_l1 = StreamChunk::from_pretty(
3191            "  I I I
3192             + 1 4 1
3193             + 2 5 2
3194             + 3 6 3",
3195        );
3196        let chunk_l2 = StreamChunk::from_pretty(
3197            "  I I I
3198             + 4 9 4
3199             + 5 10 5",
3200        );
3201        let chunk_r1 = StreamChunk::from_pretty(
3202            "  I I I
3203             + 2 5 1
3204             + 4 9 2
3205             + 6 9 3",
3206        );
3207        let chunk_r2 = StreamChunk::from_pretty(
3208            "  I I I
3209             + 1 4 4
3210             + 3 6 5",
3211        );
3212
3213        let (mut tx_l, mut tx_r, mut hash_join) =
3214            create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3215
3216        // push the init barrier for left and right
3217        tx_l.push_barrier(test_epoch(1), false);
3218        tx_r.push_barrier(test_epoch(1), false);
3219        hash_join.next_unwrap_ready_barrier()?;
3220
3221        // push the 1st left chunk
3222        tx_l.push_chunk(chunk_l1);
3223        let chunk = hash_join.next_unwrap_ready_chunk()?;
3224        assert_eq!(
3225            chunk,
3226            StreamChunk::from_pretty(
3227                " I I I I I I
3228                + 1 4 1 . . .
3229                + 2 5 2 . . .
3230                + 3 6 3 . . ."
3231            )
3232        );
3233
3234        // push the 2nd left chunk
3235        tx_l.push_chunk(chunk_l2);
3236        let chunk = hash_join.next_unwrap_ready_chunk()?;
3237        assert_eq!(
3238            chunk,
3239            StreamChunk::from_pretty(
3240                " I I I I I I
3241                + 4 9 4 . . .
3242                + 5 10 5 . . ."
3243            )
3244        );
3245
3246        // push the 1st right chunk
3247        tx_r.push_chunk(chunk_r1);
3248        let chunk = hash_join.next_unwrap_ready_chunk()?;
3249        assert_eq!(
3250            chunk,
3251            StreamChunk::from_pretty(
3252                " I I I I I I
3253                - 2 5 2 . . .
3254                + 2 5 2 2 5 1
3255                - 4 9 4 . . .
3256                + 4 9 4 4 9 2"
3257            )
3258        );
3259
3260        // push the 2nd right chunk
3261        tx_r.push_chunk(chunk_r2);
3262        let chunk = hash_join.next_unwrap_ready_chunk()?;
3263        assert_eq!(
3264            chunk,
3265            StreamChunk::from_pretty(
3266                " I I I I I I
3267                - 1 4 1 . . .
3268                + 1 4 1 1 4 4
3269                - 3 6 3 . . .
3270                + 3 6 3 3 6 5"
3271            )
3272        );
3273
3274        Ok(())
3275    }
3276
3277    #[tokio::test]
3278    async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3279        let chunk_l1 = StreamChunk::from_pretty(
3280            "  I I I
3281             + 1 4 1
3282             + 2 5 2
3283             + 3 6 3",
3284        );
3285        let chunk_l2 = StreamChunk::from_pretty(
3286            "  I I I
3287             + 4 9 4
3288             + 5 10 5",
3289        );
3290        let chunk_r1 = StreamChunk::from_pretty(
3291            "  I I I
3292             + 2 5 1
3293             + 4 9 2
3294             + 6 9 3",
3295        );
3296        let chunk_r2 = StreamChunk::from_pretty(
3297            "  I I I
3298             + 1 4 4
3299             + 3 6 5
3300             + 7 7 6",
3301        );
3302
3303        let (mut tx_l, mut tx_r, mut hash_join) =
3304            create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3305
3306        // push the init barrier for left and right
3307        tx_l.push_barrier(test_epoch(1), false);
3308        tx_r.push_barrier(test_epoch(1), false);
3309        hash_join.next_unwrap_ready_barrier()?;
3310
3311        // push the 1st left chunk
3312        tx_l.push_chunk(chunk_l1);
3313        hash_join.next_unwrap_pending();
3314
3315        // push the 2nd left chunk
3316        tx_l.push_chunk(chunk_l2);
3317        hash_join.next_unwrap_pending();
3318
3319        // push the 1st right chunk
3320        tx_r.push_chunk(chunk_r1);
3321        let chunk = hash_join.next_unwrap_ready_chunk()?;
3322        assert_eq!(
3323            chunk,
3324            StreamChunk::from_pretty(
3325                "  I I I I I I
3326                + 2 5 2 2 5 1
3327                + 4 9 4 4 9 2
3328                + . . . 6 9 3"
3329            )
3330        );
3331
3332        // push the 2nd right chunk
3333        tx_r.push_chunk(chunk_r2);
3334        let chunk = hash_join.next_unwrap_ready_chunk()?;
3335        assert_eq!(
3336            chunk,
3337            StreamChunk::from_pretty(
3338                "  I I I I I I
3339                + 1 4 1 1 4 4
3340                + 3 6 3 3 6 5
3341                + . . . 7 7 6"
3342            )
3343        );
3344
3345        Ok(())
3346    }
3347
3348    #[tokio::test]
3349    async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3350        let chunk_l1 = StreamChunk::from_pretty(
3351            "  I I
3352             + 1 4
3353             + 2 5
3354             + 3 6",
3355        );
3356        let chunk_l2 = StreamChunk::from_pretty(
3357            "  I I
3358             + 3 8
3359             - 3 8",
3360        );
3361        let chunk_r1 = StreamChunk::from_pretty(
3362            "  I I
3363             + 2 7
3364             + 4 8
3365             + 6 9",
3366        );
3367        let chunk_r2 = StreamChunk::from_pretty(
3368            "  I  I
3369             + 5 10
3370             - 5 10",
3371        );
3372        let (mut tx_l, mut tx_r, mut hash_join) =
3373            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3374
3375        // push the init barrier for left and right
3376        tx_l.push_barrier(test_epoch(1), false);
3377        tx_r.push_barrier(test_epoch(1), false);
3378        hash_join.next_unwrap_ready_barrier()?;
3379
3380        // push the 1st left chunk
3381        tx_l.push_chunk(chunk_l1);
3382        let chunk = hash_join.next_unwrap_ready_chunk()?;
3383        assert_eq!(
3384            chunk,
3385            StreamChunk::from_pretty(
3386                " I I I I
3387                + 1 4 . .
3388                + 2 5 . .
3389                + 3 6 . ."
3390            )
3391        );
3392
3393        // push the 2nd left chunk
3394        tx_l.push_chunk(chunk_l2);
3395        let chunk = hash_join.next_unwrap_ready_chunk()?;
3396        assert_eq!(
3397            chunk,
3398            StreamChunk::from_pretty(
3399                " I I I I
3400                + 3 8 . . D
3401                - 3 8 . . D"
3402            )
3403        );
3404
3405        // push the 1st right chunk
3406        tx_r.push_chunk(chunk_r1);
3407        let chunk = hash_join.next_unwrap_ready_chunk()?;
3408        assert_eq!(
3409            chunk,
3410            StreamChunk::from_pretty(
3411                " I I I I
3412                - 2 5 . .
3413                + 2 5 2 7
3414                + . . 4 8
3415                + . . 6 9"
3416            )
3417        );
3418
3419        // push the 2nd right chunk
3420        tx_r.push_chunk(chunk_r2);
3421        let chunk = hash_join.next_unwrap_ready_chunk()?;
3422        assert_eq!(
3423            chunk,
3424            StreamChunk::from_pretty(
3425                " I I I I
3426                + . . 5 10 D
3427                - . . 5 10 D"
3428            )
3429        );
3430
3431        Ok(())
3432    }
3433
3434    #[tokio::test]
3435    async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3436        let (mut tx_l, mut tx_r, mut hash_join) =
3437            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3438
3439        // push the init barrier for left and right
3440        tx_l.push_barrier(test_epoch(1), false);
3441        tx_r.push_barrier(test_epoch(1), false);
3442        hash_join.next_unwrap_ready_barrier()?;
3443
3444        tx_l.push_chunk(StreamChunk::from_pretty(
3445            "  I I
3446             + 1 1
3447            ",
3448        ));
3449        let chunk = hash_join.next_unwrap_ready_chunk()?;
3450        assert_eq!(
3451            chunk,
3452            StreamChunk::from_pretty(
3453                " I I I I
3454                + 1 1 . ."
3455            )
3456        );
3457
3458        tx_r.push_chunk(StreamChunk::from_pretty(
3459            "  I I
3460             + 1 1
3461            ",
3462        ));
3463        let chunk = hash_join.next_unwrap_ready_chunk()?;
3464
3465        assert_eq!(
3466            chunk,
3467            StreamChunk::from_pretty(
3468                " I I I I
3469                - 1 1 . .
3470                + 1 1 1 1"
3471            )
3472        );
3473
3474        tx_l.push_chunk(StreamChunk::from_pretty(
3475            "   I I
3476              - 1 1
3477              + 1 2
3478            ",
3479        ));
3480        let chunk = hash_join.next_unwrap_ready_chunk()?;
3481        let chunk = chunk.compact_vis();
3482        assert_eq!(
3483            chunk,
3484            StreamChunk::from_pretty(
3485                " I I I I
3486                - 1 1 1 1
3487                + 1 2 1 1
3488                "
3489            )
3490        );
3491
3492        Ok(())
3493    }
3494
3495    #[tokio::test]
3496    async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3497    {
3498        let chunk_l1 = StreamChunk::from_pretty(
3499            "  I I
3500             + 1 4
3501             + 2 5
3502             + 3 6
3503             + 3 7",
3504        );
3505        let chunk_l2 = StreamChunk::from_pretty(
3506            "  I I
3507             + 3 8
3508             - 3 8
3509             - 1 4", // delete row to cause an empty JoinHashEntry
3510        );
3511        let chunk_r1 = StreamChunk::from_pretty(
3512            "  I I
3513             + 2 6
3514             + 4 8
3515             + 3 4",
3516        );
3517        let chunk_r2 = StreamChunk::from_pretty(
3518            "  I  I
3519             + 5 10
3520             - 5 10
3521             + 1 2",
3522        );
3523        let (mut tx_l, mut tx_r, mut hash_join) =
3524            create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3525
3526        // push the init barrier for left and right
3527        tx_l.push_barrier(test_epoch(1), false);
3528        tx_r.push_barrier(test_epoch(1), false);
3529        hash_join.next_unwrap_ready_barrier()?;
3530
3531        // push the 1st left chunk
3532        tx_l.push_chunk(chunk_l1);
3533        let chunk = hash_join.next_unwrap_ready_chunk()?;
3534        assert_eq!(
3535            chunk,
3536            StreamChunk::from_pretty(
3537                " I I I I
3538                + 1 4 . .
3539                + 2 5 . .
3540                + 3 6 . .
3541                + 3 7 . ."
3542            )
3543        );
3544
3545        // push the 2nd left chunk
3546        tx_l.push_chunk(chunk_l2);
3547        let chunk = hash_join.next_unwrap_ready_chunk()?;
3548        assert_eq!(
3549            chunk,
3550            StreamChunk::from_pretty(
3551                " I I I I
3552                + 3 8 . . D
3553                - 3 8 . . D
3554                - 1 4 . ."
3555            )
3556        );
3557
3558        // push the 1st right chunk
3559        tx_r.push_chunk(chunk_r1);
3560        let chunk = hash_join.next_unwrap_ready_chunk()?;
3561        assert_eq!(
3562            chunk,
3563            StreamChunk::from_pretty(
3564                " I I I I
3565                - 2 5 . .
3566                + 2 5 2 6
3567                + . . 4 8
3568                + . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once
3569                            * despite matching on eq join on 2
3570                            * entries */
3571            )
3572        );
3573
3574        // push the 2nd right chunk
3575        tx_r.push_chunk(chunk_r2);
3576        let chunk = hash_join.next_unwrap_ready_chunk()?;
3577        assert_eq!(
3578            chunk,
3579            StreamChunk::from_pretty(
3580                " I I I I
3581                + . . 5 10 D
3582                - . . 5 10 D
3583                + . . 1 2" /* regression test (#2420): 1 2 forwarded even if matches on an empty
3584                            * join entry */
3585            )
3586        );
3587
3588        Ok(())
3589    }
3590
3591    #[tokio::test]
3592    async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3593        let chunk_l1 = StreamChunk::from_pretty(
3594            "  I I
3595             + 1 4
3596             + 2 10
3597             + 3 6",
3598        );
3599        let chunk_l2 = StreamChunk::from_pretty(
3600            "  I I
3601             + 3 8
3602             - 3 8",
3603        );
3604        let chunk_r1 = StreamChunk::from_pretty(
3605            "  I I
3606             + 2 7
3607             + 4 8
3608             + 6 9",
3609        );
3610        let chunk_r2 = StreamChunk::from_pretty(
3611            "  I  I
3612             + 3 10
3613             + 6 11",
3614        );
3615        let (mut tx_l, mut tx_r, mut hash_join) =
3616            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3617
3618        // push the init barrier for left and right
3619        tx_l.push_barrier(test_epoch(1), false);
3620        tx_r.push_barrier(test_epoch(1), false);
3621        hash_join.next_unwrap_ready_barrier()?;
3622
3623        // push the 1st left chunk
3624        tx_l.push_chunk(chunk_l1);
3625        hash_join.next_unwrap_pending();
3626
3627        // push the 2nd left chunk
3628        tx_l.push_chunk(chunk_l2);
3629        hash_join.next_unwrap_pending();
3630
3631        // push the 1st right chunk
3632        tx_r.push_chunk(chunk_r1);
3633        hash_join.next_unwrap_pending();
3634
3635        // push the 2nd right chunk
3636        tx_r.push_chunk(chunk_r2);
3637        let chunk = hash_join.next_unwrap_ready_chunk()?;
3638        assert_eq!(
3639            chunk,
3640            StreamChunk::from_pretty(
3641                " I I I I
3642                + 3 6 3 10"
3643            )
3644        );
3645
3646        Ok(())
3647    }
3648
3649    #[tokio::test]
3650    async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3651        let (mut tx_l, mut tx_r, mut hash_join) =
3652            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3653
3654        // push the init barrier for left and right
3655        tx_l.push_barrier(test_epoch(1), false);
3656        tx_r.push_barrier(test_epoch(1), false);
3657        hash_join.next_unwrap_ready_barrier()?;
3658
3659        tx_l.push_int64_watermark(0, 100);
3660
3661        tx_l.push_int64_watermark(0, 200);
3662
3663        tx_l.push_barrier(test_epoch(2), false);
3664        tx_r.push_barrier(test_epoch(2), false);
3665        hash_join.next_unwrap_ready_barrier()?;
3666
3667        tx_r.push_int64_watermark(0, 50);
3668
3669        let w1 = hash_join.next().await.unwrap().unwrap();
3670        let w1 = w1.as_watermark().unwrap();
3671
3672        let w2 = hash_join.next().await.unwrap().unwrap();
3673        let w2 = w2.as_watermark().unwrap();
3674
3675        tx_r.push_int64_watermark(0, 100);
3676
3677        let w3 = hash_join.next().await.unwrap().unwrap();
3678        let w3 = w3.as_watermark().unwrap();
3679
3680        let w4 = hash_join.next().await.unwrap().unwrap();
3681        let w4 = w4.as_watermark().unwrap();
3682
3683        assert_eq!(
3684            w1,
3685            &Watermark {
3686                col_idx: 2,
3687                data_type: DataType::Int64,
3688                val: ScalarImpl::Int64(50)
3689            }
3690        );
3691
3692        assert_eq!(
3693            w2,
3694            &Watermark {
3695                col_idx: 0,
3696                data_type: DataType::Int64,
3697                val: ScalarImpl::Int64(50)
3698            }
3699        );
3700
3701        assert_eq!(
3702            w3,
3703            &Watermark {
3704                col_idx: 2,
3705                data_type: DataType::Int64,
3706                val: ScalarImpl::Int64(100)
3707            }
3708        );
3709
3710        assert_eq!(
3711            w4,
3712            &Watermark {
3713                col_idx: 0,
3714                data_type: DataType::Int64,
3715                val: ScalarImpl::Int64(100)
3716            }
3717        );
3718
3719        Ok(())
3720    }
3721
3722    async fn create_executor_with_evict_interval<const T: JoinTypePrimitive>(
3723        evict_interval: u32,
3724    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
3725        let schema = Schema {
3726            fields: vec![
3727                Field::unnamed(DataType::Int64), // join key
3728                Field::unnamed(DataType::Int64),
3729            ],
3730        };
3731        let (tx_l, source_l) = MockSource::channel();
3732        let source_l = source_l.into_executor(schema.clone(), vec![1]);
3733        let (tx_r, source_r) = MockSource::channel();
3734        let source_r = source_r.into_executor(schema, vec![1]);
3735        let params_l = JoinParams::new(vec![0], vec![1]);
3736        let params_r = JoinParams::new(vec![0], vec![1]);
3737
3738        let mem_state = MemoryStateStore::new();
3739
3740        let (state_l, degree_state_l) = create_in_memory_state_table(
3741            mem_state.clone(),
3742            &[DataType::Int64, DataType::Int64],
3743            &[OrderType::ascending(), OrderType::ascending()],
3744            &[0, 1],
3745            0,
3746        )
3747        .await;
3748
3749        let (state_r, degree_state_r) = create_in_memory_state_table(
3750            mem_state,
3751            &[DataType::Int64, DataType::Int64],
3752            &[OrderType::ascending(), OrderType::ascending()],
3753            &[0, 1],
3754            2,
3755        )
3756        .await;
3757
3758        let schema = match T {
3759            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
3760            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
3761            _ => [source_l.schema().fields(), source_r.schema().fields()]
3762                .concat()
3763                .into_iter()
3764                .collect(),
3765        };
3766        let schema_len = schema.len();
3767        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
3768
3769        let mut streaming_config = StreamingConfig::default();
3770        streaming_config.developer.join_hash_map_evict_interval_rows = evict_interval;
3771
3772        let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
3773            ActorContext::for_test_with_config(123, streaming_config),
3774            info,
3775            source_l,
3776            source_r,
3777            params_l,
3778            params_r,
3779            vec![false],
3780            (0..schema_len).collect_vec(),
3781            None,
3782            vec![],
3783            state_l,
3784            degree_state_l,
3785            state_r,
3786            degree_state_r,
3787            Arc::new(AtomicU64::new(0)),
3788            false,
3789            Arc::new(StreamingMetrics::unused()),
3790            1024,
3791            2048,
3792            vec![(0, true)],
3793        );
3794        (tx_l, tx_r, executor.boxed().execute())
3795    }
3796
3797    /// Tests that a hash join executor with `join_hash_map_evict_interval_rows = 0`
3798    /// (periodic eviction disabled) produces correct results.
3799    #[tokio::test]
3800    async fn test_hash_join_evict_interval_disabled() -> StreamExecutorResult<()> {
3801        let chunk_l = StreamChunk::from_pretty(
3802            "  I I
3803             + 1 4
3804             + 2 5
3805             + 3 6",
3806        );
3807        let chunk_r = StreamChunk::from_pretty(
3808            "  I I
3809             + 2 7
3810             + 3 8",
3811        );
3812
3813        // 0 disables periodic eviction
3814        let (mut tx_l, mut tx_r, mut hash_join) =
3815            create_executor_with_evict_interval::<{ JoinType::Inner }>(0).await;
3816
3817        tx_l.push_barrier(test_epoch(1), false);
3818        tx_r.push_barrier(test_epoch(1), false);
3819        hash_join.next_unwrap_ready_barrier()?;
3820
3821        tx_l.push_chunk(chunk_l);
3822        hash_join.next_unwrap_pending();
3823
3824        tx_r.push_chunk(chunk_r);
3825        let chunk = hash_join.next_unwrap_ready_chunk()?;
3826        assert_eq!(
3827            chunk,
3828            StreamChunk::from_pretty(
3829                " I I I I
3830                + 2 5 2 7
3831                + 3 6 3 8"
3832            )
3833        );
3834
3835        Ok(())
3836    }
3837
3838    /// Tests that a hash join executor with `join_hash_map_evict_interval_rows = 1`
3839    /// (evict after every row) produces correct results.
3840    #[tokio::test]
3841    async fn test_hash_join_evict_interval_one() -> StreamExecutorResult<()> {
3842        let chunk_l = StreamChunk::from_pretty(
3843            "  I I
3844             + 1 4
3845             + 2 5
3846             + 3 6",
3847        );
3848        let chunk_r = StreamChunk::from_pretty(
3849            "  I I
3850             + 2 7
3851             + 3 8",
3852        );
3853
3854        // evict after every row
3855        let (mut tx_l, mut tx_r, mut hash_join) =
3856            create_executor_with_evict_interval::<{ JoinType::Inner }>(1).await;
3857
3858        tx_l.push_barrier(test_epoch(1), false);
3859        tx_r.push_barrier(test_epoch(1), false);
3860        hash_join.next_unwrap_ready_barrier()?;
3861
3862        tx_l.push_chunk(chunk_l);
3863        hash_join.next_unwrap_pending();
3864
3865        tx_r.push_chunk(chunk_r);
3866        let chunk = hash_join.next_unwrap_ready_chunk()?;
3867        assert_eq!(
3868            chunk,
3869            StreamChunk::from_pretty(
3870                " I I I I
3871                + 2 5 2 7
3872                + 3 6 3 8"
3873            )
3874        );
3875
3876        Ok(())
3877    }
3878
3879    /// Tests that a hash join executor with a custom `join_hash_map_evict_interval_rows`
3880    /// value produces correct results.
3881    #[tokio::test]
3882    async fn test_hash_join_evict_interval_custom() -> StreamExecutorResult<()> {
3883        let chunk_l = StreamChunk::from_pretty(
3884            "  I I
3885             + 1 4
3886             + 2 5
3887             + 3 6
3888             + 4 7
3889             + 5 8",
3890        );
3891        let chunk_r = StreamChunk::from_pretty(
3892            "  I I
3893             + 1 9
3894             + 3 10
3895             + 5 11",
3896        );
3897
3898        // evict every 2 rows
3899        let (mut tx_l, mut tx_r, mut hash_join) =
3900            create_executor_with_evict_interval::<{ JoinType::Inner }>(2).await;
3901
3902        tx_l.push_barrier(test_epoch(1), false);
3903        tx_r.push_barrier(test_epoch(1), false);
3904        hash_join.next_unwrap_ready_barrier()?;
3905
3906        tx_l.push_chunk(chunk_l);
3907        hash_join.next_unwrap_pending();
3908
3909        tx_r.push_chunk(chunk_r);
3910        let chunk = hash_join.next_unwrap_ready_chunk()?;
3911        assert_eq!(
3912            chunk,
3913            StreamChunk::from_pretty(
3914                " I I I I
3915                + 1 4 1 9
3916                + 3 6 3 10
3917                + 5 8 5 11"
3918            )
3919        );
3920
3921        Ok(())
3922    }
3923}