risingwave_stream/executor/
hash_join.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{BTreeMap, HashSet};
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use anyhow::Context;
21use either::Either;
22use itertools::Itertools;
23use multimap::MultiMap;
24use risingwave_common::array::Op;
25use risingwave_common::hash::{HashKey, NullBitmap};
26use risingwave_common::row::RowExt;
27use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::iter_util::ZipEqDebug;
30use risingwave_expr::expr::NonStrictExpression;
31use risingwave_pb::stream_plan::InequalityType;
32use tokio::time::Instant;
33
34use self::builder::JoinChunkBuilder;
35use super::barrier_align::*;
36use super::join::hash_join::*;
37use super::join::row::{JoinEncoding, JoinRow};
38use super::join::*;
39use super::watermark::*;
40use crate::executor::CachedJoinRow;
41use crate::executor::join::builder::JoinStreamChunkBuilder;
42use crate::executor::join::hash_join::CacheResult;
43use crate::executor::prelude::*;
44
45/// Evict the cache every n rows.
46const EVICT_EVERY_N_ROWS: u32 = 16;
47
48fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
49    HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
50}
51
52/// Information about an inequality condition in the join.
53/// Represents: `left_col <op> right_col` where op is one of `<`, `<=`, `>`, `>=`.
54#[derive(Debug, Clone)]
55pub struct InequalityPairInfo {
56    /// Index of the left side column (from left input).
57    pub left_idx: usize,
58    /// Index of the right side column (from right input).
59    pub right_idx: usize,
60    /// Whether this condition is used to clean left state table.
61    pub clean_left_state: bool,
62    /// Whether this condition is used to clean right state table.
63    pub clean_right_state: bool,
64    /// Comparison operator.
65    pub op: InequalityType,
66}
67
68impl InequalityPairInfo {
69    /// Returns true if left side has larger values (`>` or `>=`).
70    pub fn left_side_is_larger(&self) -> bool {
71        matches!(
72            self.op,
73            InequalityType::GreaterThan | InequalityType::GreaterThanOrEqual
74        )
75    }
76}
77
78pub struct JoinParams {
79    /// Indices of the join keys
80    pub join_key_indices: Vec<usize>,
81    /// Indices of the input pk after dedup
82    pub deduped_pk_indices: Vec<usize>,
83}
84
85impl JoinParams {
86    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
87        Self {
88            join_key_indices,
89            deduped_pk_indices,
90        }
91    }
92}
93
94struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
95    /// Store all data from a one side stream
96    ht: JoinHashMap<K, S, E>,
97    /// Indices of the join key columns
98    join_key_indices: Vec<usize>,
99    /// The data type of all columns without degree.
100    all_data_types: Vec<DataType>,
101    /// The start position for the side in output new columns
102    start_pos: usize,
103    /// The mapping from input indices of a side to output columns.
104    i2o_mapping: Vec<(usize, usize)>,
105    i2o_mapping_indexed: MultiMap<usize, usize>,
106    /// The first field of the ith element indicates that when a watermark at the ith column of
107    /// this side comes, what band join conditions should be updated in order to possibly
108    /// generate a new watermark at that column or the corresponding column in the counterpart
109    /// join side.
110    ///
111    /// The second field indicates that whether the column is required less than the
112    /// the corresponding column in the counterpart join side in the band join condition.
113    input2inequality_index: Vec<Vec<(usize, bool)>>,
114    /// Some fields which are required non null to match due to inequalities.
115    non_null_fields: Vec<usize>,
116    /// (i, j) in this `Vec` means that state data in this join side can be cleaned if the value of
117    /// its ith column is less than the synthetic watermark of the jth band join condition.
118    state_clean_columns: Vec<(usize, usize)>,
119    /// Whether degree table is needed for this side.
120    need_degree_table: bool,
121    _marker: std::marker::PhantomData<E>,
122}
123
124impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        f.debug_struct("JoinSide")
127            .field("join_key_indices", &self.join_key_indices)
128            .field("col_types", &self.all_data_types)
129            .field("start_pos", &self.start_pos)
130            .field("i2o_mapping", &self.i2o_mapping)
131            .field("need_degree_table", &self.need_degree_table)
132            .finish()
133    }
134}
135
136impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
137    // WARNING: Please do not call this until we implement it.
138    fn is_dirty(&self) -> bool {
139        unimplemented!()
140    }
141
142    #[expect(dead_code)]
143    fn clear_cache(&mut self) {
144        assert!(
145            !self.is_dirty(),
146            "cannot clear cache while states of hash join are dirty"
147        );
148
149        // TODO: not working with rearranged chain
150        // self.ht.clear();
151    }
152
153    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
154        self.ht.init(epoch).await
155    }
156}
157
158/// `HashJoinExecutor` takes two input streams and runs equal hash join on them.
159/// The output columns are the concatenation of left and right columns.
160pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
161{
162    ctx: ActorContextRef,
163    info: ExecutorInfo,
164
165    /// Left input executor
166    input_l: Option<Executor>,
167    /// Right input executor
168    input_r: Option<Executor>,
169    /// The data types of the formed new columns
170    actual_output_data_types: Vec<DataType>,
171    /// The parameters of the left join executor
172    side_l: JoinSide<K, S, E>,
173    /// The parameters of the right join executor
174    side_r: JoinSide<K, S, E>,
175    /// Optional non-equi join conditions
176    cond: Option<NonStrictExpression>,
177    /// Inequality pairs with output column indices.
178    /// Each entry contains: (`output_indices`, `InequalityPairInfo`).
179    inequality_pairs: Vec<(Vec<usize>, InequalityPairInfo)>,
180    /// The output watermark of each inequality condition and its value is the minimum of the
181    /// watermarks from both sides. It will be used to generate watermark into downstream
182    /// and do state cleaning based on `clean_left_state`/`clean_right_state` fields.
183    inequality_watermarks: Vec<Option<Watermark>>,
184    /// Join-key positions and cleaning flags for watermark handling.
185    /// Each entry: (join-key position, `do_state_cleaning`).
186    watermark_indices_in_jk: Vec<(usize, bool)>,
187
188    /// Whether the logic can be optimized for append-only stream
189    append_only_optimize: bool,
190
191    metrics: Arc<StreamingMetrics>,
192    /// The maximum size of the chunk produced by executor at a time
193    chunk_size: usize,
194    /// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
195    cnt_rows_received: u32,
196
197    /// watermark column index -> `BufferedWatermarks`
198    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
199
200    /// When to alert high join amplification
201    high_join_amplification_threshold: usize,
202
203    /// Max number of rows that will be cached in the entry state.
204    entry_state_max_rows: usize,
205}
206
207impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
208    for HashJoinExecutor<K, S, T, E>
209{
210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211        f.debug_struct("HashJoinExecutor")
212            .field("join_type", &T)
213            .field("input_left", &self.input_l.as_ref().unwrap().identity())
214            .field("input_right", &self.input_r.as_ref().unwrap().identity())
215            .field("side_l", &self.side_l)
216            .field("side_r", &self.side_r)
217            .field("stream_key", &self.info.stream_key)
218            .field("schema", &self.info.schema)
219            .field("actual_output_data_types", &self.actual_output_data_types)
220            .finish()
221    }
222}
223
224impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
225    for HashJoinExecutor<K, S, T, E>
226{
227    fn execute(self: Box<Self>) -> BoxedMessageStream {
228        self.into_stream().boxed()
229    }
230}
231
232struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
233    ctx: &'a ActorContextRef,
234    side_l: &'a mut JoinSide<K, S, E>,
235    side_r: &'a mut JoinSide<K, S, E>,
236    actual_output_data_types: &'a [DataType],
237    cond: &'a mut Option<NonStrictExpression>,
238    inequality_watermarks: &'a [Option<Watermark>],
239    chunk: StreamChunk,
240    append_only_optimize: bool,
241    chunk_size: usize,
242    cnt_rows_received: &'a mut u32,
243    high_join_amplification_threshold: usize,
244    entry_state_max_rows: usize,
245}
246
247impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
248    HashJoinExecutor<K, S, T, E>
249{
250    #[allow(clippy::too_many_arguments)]
251    pub fn new(
252        ctx: ActorContextRef,
253        info: ExecutorInfo,
254        input_l: Executor,
255        input_r: Executor,
256        params_l: JoinParams,
257        params_r: JoinParams,
258        null_safe: Vec<bool>,
259        output_indices: Vec<usize>,
260        cond: Option<NonStrictExpression>,
261        inequality_pairs: Vec<InequalityPairInfo>,
262        state_table_l: StateTable<S>,
263        degree_state_table_l: StateTable<S>,
264        state_table_r: StateTable<S>,
265        degree_state_table_r: StateTable<S>,
266        watermark_epoch: AtomicU64Ref,
267        is_append_only: bool,
268        metrics: Arc<StreamingMetrics>,
269        chunk_size: usize,
270        high_join_amplification_threshold: usize,
271        watermark_indices_in_jk: Vec<(usize, bool)>,
272    ) -> Self {
273        Self::new_with_cache_size(
274            ctx,
275            info,
276            input_l,
277            input_r,
278            params_l,
279            params_r,
280            null_safe,
281            output_indices,
282            cond,
283            inequality_pairs,
284            state_table_l,
285            degree_state_table_l,
286            state_table_r,
287            degree_state_table_r,
288            watermark_epoch,
289            is_append_only,
290            metrics,
291            chunk_size,
292            high_join_amplification_threshold,
293            None,
294            watermark_indices_in_jk,
295        )
296    }
297
298    #[allow(clippy::too_many_arguments)]
299    pub fn new_with_cache_size(
300        ctx: ActorContextRef,
301        info: ExecutorInfo,
302        input_l: Executor,
303        input_r: Executor,
304        params_l: JoinParams,
305        params_r: JoinParams,
306        null_safe: Vec<bool>,
307        output_indices: Vec<usize>,
308        cond: Option<NonStrictExpression>,
309        inequality_pairs: Vec<InequalityPairInfo>,
310        state_table_l: StateTable<S>,
311        degree_state_table_l: StateTable<S>,
312        state_table_r: StateTable<S>,
313        degree_state_table_r: StateTable<S>,
314        watermark_epoch: AtomicU64Ref,
315        is_append_only: bool,
316        metrics: Arc<StreamingMetrics>,
317        chunk_size: usize,
318        high_join_amplification_threshold: usize,
319        entry_state_max_rows: Option<usize>,
320        watermark_indices_in_jk: Vec<(usize, bool)>,
321    ) -> Self {
322        let entry_state_max_rows = match entry_state_max_rows {
323            None => ctx.config.developer.hash_join_entry_state_max_rows,
324            Some(entry_state_max_rows) => entry_state_max_rows,
325        };
326        let side_l_column_n = input_l.schema().len();
327
328        let schema_fields = match T {
329            JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
330            JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
331            _ => [
332                input_l.schema().fields.clone(),
333                input_r.schema().fields.clone(),
334            ]
335            .concat(),
336        };
337
338        let original_output_data_types = schema_fields
339            .iter()
340            .map(|field| field.data_type())
341            .collect_vec();
342        let actual_output_data_types = output_indices
343            .iter()
344            .map(|&idx| original_output_data_types[idx].clone())
345            .collect_vec();
346
347        // Data types of of hash join state.
348        let state_all_data_types_l = input_l.schema().data_types();
349        let state_all_data_types_r = input_r.schema().data_types();
350
351        let state_pk_indices_l = input_l.stream_key().to_vec();
352        let state_pk_indices_r = input_r.stream_key().to_vec();
353
354        let state_join_key_indices_l = params_l.join_key_indices;
355        let state_join_key_indices_r = params_r.join_key_indices;
356
357        let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
358        let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
359
360        let degree_pk_indices_l = (state_join_key_indices_l.len()
361            ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
362            .collect_vec();
363        let degree_pk_indices_r = (state_join_key_indices_r.len()
364            ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
365            .collect_vec();
366
367        // If pk is contained in join key.
368        let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
369        let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
370
371        // check whether join key contains pk in both side
372        let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
373
374        let join_key_data_types_l = state_join_key_indices_l
375            .iter()
376            .map(|idx| state_all_data_types_l[*idx].clone())
377            .collect_vec();
378
379        let join_key_data_types_r = state_join_key_indices_r
380            .iter()
381            .map(|idx| state_all_data_types_r[*idx].clone())
382            .collect_vec();
383
384        assert_eq!(join_key_data_types_l, join_key_data_types_r);
385
386        let null_matched = K::Bitmap::from_bool_vec(null_safe);
387
388        let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
389        let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
390
391        let (left_to_output, right_to_output) = {
392            let (left_len, right_len) = if is_left_semi_or_anti(T) {
393                (state_all_data_types_l.len(), 0usize)
394            } else if is_right_semi_or_anti(T) {
395                (0usize, state_all_data_types_r.len())
396            } else {
397                (state_all_data_types_l.len(), state_all_data_types_r.len())
398            };
399            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
400        };
401
402        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
403        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
404
405        let left_input_len = input_l.schema().len();
406        let right_input_len = input_r.schema().len();
407        let mut l2inequality_index = vec![vec![]; left_input_len];
408        let mut r2inequality_index = vec![vec![]; right_input_len];
409        let mut l_inequal_state_clean_columns = vec![];
410        let mut r_inequal_state_clean_columns = vec![];
411        let inequality_pairs = inequality_pairs
412            .into_iter()
413            .enumerate()
414            .map(|(index, pair)| {
415                // Map input columns to inequality index
416                // The second field indicates whether this column is required to be less than
417                // the corresponding column on the other side.
418                // For `left >= right`: left is NOT less than right, right IS less than left
419                // For `left <= right`: left IS less than right, right is NOT less than left
420                let left_is_larger = pair.left_side_is_larger();
421                l2inequality_index[pair.left_idx].push((index, !left_is_larger));
422                r2inequality_index[pair.right_idx].push((index, left_is_larger));
423
424                // Determine state cleanup columns
425                if pair.clean_left_state {
426                    l_inequal_state_clean_columns.push((pair.left_idx, index));
427                }
428                if pair.clean_right_state {
429                    r_inequal_state_clean_columns.push((pair.right_idx, index));
430                }
431
432                // Get output indices for watermark emission
433                // We only emit watermarks for the LARGER side's output columns
434                let output_indices = if pair.left_side_is_larger() {
435                    // Left is larger, emit for left output columns
436                    l2o_indexed
437                        .get_vec(&pair.left_idx)
438                        .cloned()
439                        .unwrap_or_default()
440                } else {
441                    // Right is larger, emit for right output columns
442                    r2o_indexed
443                        .get_vec(&pair.right_idx)
444                        .cloned()
445                        .unwrap_or_default()
446                };
447
448                (output_indices, pair)
449            })
450            .collect_vec();
451
452        let mut l_non_null_fields = l2inequality_index
453            .iter()
454            .positions(|inequalities| !inequalities.is_empty())
455            .collect_vec();
456        let mut r_non_null_fields = r2inequality_index
457            .iter()
458            .positions(|inequalities| !inequalities.is_empty())
459            .collect_vec();
460
461        if append_only_optimize {
462            l_inequal_state_clean_columns.clear();
463            r_inequal_state_clean_columns.clear();
464            l_non_null_fields.clear();
465            r_non_null_fields.clear();
466        }
467
468        // Create degree tables with inequality column index for watermark-based cleaning.
469        // The degree_inequality_idx is the column index in the input row that should be
470        // stored in the degree table for inequality-based watermark cleaning.
471        let l_inequality_idx = l_inequal_state_clean_columns
472            .first()
473            .map(|(col_idx, _)| *col_idx);
474        let r_inequality_idx = r_inequal_state_clean_columns
475            .first()
476            .map(|(col_idx, _)| *col_idx);
477
478        let degree_state_l = need_degree_table_l.then(|| {
479            TableInner::new(
480                degree_pk_indices_l,
481                degree_join_key_indices_l,
482                degree_state_table_l,
483                l_inequality_idx,
484            )
485        });
486        let degree_state_r = need_degree_table_r.then(|| {
487            TableInner::new(
488                degree_pk_indices_r,
489                degree_join_key_indices_r,
490                degree_state_table_r,
491                r_inequality_idx,
492            )
493        });
494
495        let inequality_watermarks = vec![None; inequality_pairs.len()];
496        let watermark_buffers = BTreeMap::new();
497        Self {
498            ctx: ctx.clone(),
499            info,
500            input_l: Some(input_l),
501            input_r: Some(input_r),
502            actual_output_data_types,
503            side_l: JoinSide {
504                ht: JoinHashMap::new(
505                    watermark_epoch.clone(),
506                    join_key_data_types_l,
507                    state_join_key_indices_l.clone(),
508                    state_all_data_types_l.clone(),
509                    state_table_l,
510                    params_l.deduped_pk_indices,
511                    degree_state_l,
512                    null_matched.clone(),
513                    pk_contained_in_jk_l,
514                    None,
515                    metrics.clone(),
516                    ctx.id,
517                    ctx.fragment_id,
518                    "left",
519                ),
520                join_key_indices: state_join_key_indices_l,
521                all_data_types: state_all_data_types_l,
522                i2o_mapping: left_to_output,
523                i2o_mapping_indexed: l2o_indexed,
524                input2inequality_index: l2inequality_index,
525                non_null_fields: l_non_null_fields,
526                state_clean_columns: l_inequal_state_clean_columns,
527                start_pos: 0,
528                need_degree_table: need_degree_table_l,
529                _marker: PhantomData,
530            },
531            side_r: JoinSide {
532                ht: JoinHashMap::new(
533                    watermark_epoch,
534                    join_key_data_types_r,
535                    state_join_key_indices_r.clone(),
536                    state_all_data_types_r.clone(),
537                    state_table_r,
538                    params_r.deduped_pk_indices,
539                    degree_state_r,
540                    null_matched,
541                    pk_contained_in_jk_r,
542                    None,
543                    metrics.clone(),
544                    ctx.id,
545                    ctx.fragment_id,
546                    "right",
547                ),
548                join_key_indices: state_join_key_indices_r,
549                all_data_types: state_all_data_types_r,
550                start_pos: side_l_column_n,
551                i2o_mapping: right_to_output,
552                i2o_mapping_indexed: r2o_indexed,
553                input2inequality_index: r2inequality_index,
554                non_null_fields: r_non_null_fields,
555                state_clean_columns: r_inequal_state_clean_columns,
556                need_degree_table: need_degree_table_r,
557                _marker: PhantomData,
558            },
559            cond,
560            inequality_pairs,
561            inequality_watermarks,
562            watermark_indices_in_jk,
563            append_only_optimize,
564            metrics,
565            chunk_size,
566            cnt_rows_received: 0,
567            watermark_buffers,
568            high_join_amplification_threshold,
569            entry_state_max_rows,
570        }
571    }
572
573    #[try_stream(ok = Message, error = StreamExecutorError)]
574    async fn into_stream(mut self) {
575        let input_l = self.input_l.take().unwrap();
576        let input_r = self.input_r.take().unwrap();
577        let aligned_stream = barrier_align(
578            input_l.execute(),
579            input_r.execute(),
580            self.ctx.id,
581            self.ctx.fragment_id,
582            self.metrics.clone(),
583            "Join",
584        );
585        pin_mut!(aligned_stream);
586
587        let actor_id = self.ctx.id;
588
589        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
590        let first_epoch = barrier.epoch;
591        // The first barrier message should be propagated.
592        yield Message::Barrier(barrier);
593        self.side_l.init(first_epoch).await?;
594        self.side_r.init(first_epoch).await?;
595
596        let actor_id_str = self.ctx.id.to_string();
597        let fragment_id_str = self.ctx.fragment_id.to_string();
598
599        // initialized some metrics
600        let join_actor_input_waiting_duration_ns = self
601            .metrics
602            .join_actor_input_waiting_duration_ns
603            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
604        let left_join_match_duration_ns = self
605            .metrics
606            .join_match_duration_ns
607            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
608        let right_join_match_duration_ns = self
609            .metrics
610            .join_match_duration_ns
611            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
612
613        let barrier_join_match_duration_ns = self
614            .metrics
615            .join_match_duration_ns
616            .with_guarded_label_values(&[
617                actor_id_str.as_str(),
618                fragment_id_str.as_str(),
619                "barrier",
620            ]);
621
622        let left_join_cached_entry_count = self
623            .metrics
624            .join_cached_entry_count
625            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
626
627        let right_join_cached_entry_count = self
628            .metrics
629            .join_cached_entry_count
630            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
631
632        let mut start_time = Instant::now();
633
634        while let Some(msg) = aligned_stream
635            .next()
636            .instrument_await("hash_join_barrier_align")
637            .await
638        {
639            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
640            match msg? {
641                AlignedMessage::WatermarkLeft(watermark) => {
642                    for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
643                        yield Message::Watermark(watermark_to_emit);
644                    }
645                }
646                AlignedMessage::WatermarkRight(watermark) => {
647                    for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
648                        yield Message::Watermark(watermark_to_emit);
649                    }
650                }
651                AlignedMessage::Left(chunk) => {
652                    let mut left_time = Duration::from_nanos(0);
653                    let mut left_start_time = Instant::now();
654                    #[for_await]
655                    for chunk in Self::eq_join_left(EqJoinArgs {
656                        ctx: &self.ctx,
657                        side_l: &mut self.side_l,
658                        side_r: &mut self.side_r,
659                        actual_output_data_types: &self.actual_output_data_types,
660                        cond: &mut self.cond,
661                        inequality_watermarks: &self.inequality_watermarks,
662                        chunk,
663                        append_only_optimize: self.append_only_optimize,
664                        chunk_size: self.chunk_size,
665                        cnt_rows_received: &mut self.cnt_rows_received,
666                        high_join_amplification_threshold: self.high_join_amplification_threshold,
667                        entry_state_max_rows: self.entry_state_max_rows,
668                    }) {
669                        left_time += left_start_time.elapsed();
670                        yield Message::Chunk(chunk?);
671                        left_start_time = Instant::now();
672                    }
673                    left_time += left_start_time.elapsed();
674                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
675                    self.try_flush_data().await?;
676                }
677                AlignedMessage::Right(chunk) => {
678                    let mut right_time = Duration::from_nanos(0);
679                    let mut right_start_time = Instant::now();
680                    #[for_await]
681                    for chunk in Self::eq_join_right(EqJoinArgs {
682                        ctx: &self.ctx,
683                        side_l: &mut self.side_l,
684                        side_r: &mut self.side_r,
685                        actual_output_data_types: &self.actual_output_data_types,
686                        cond: &mut self.cond,
687                        inequality_watermarks: &self.inequality_watermarks,
688                        chunk,
689                        append_only_optimize: self.append_only_optimize,
690                        chunk_size: self.chunk_size,
691                        cnt_rows_received: &mut self.cnt_rows_received,
692                        high_join_amplification_threshold: self.high_join_amplification_threshold,
693                        entry_state_max_rows: self.entry_state_max_rows,
694                    }) {
695                        right_time += right_start_time.elapsed();
696                        yield Message::Chunk(chunk?);
697                        right_start_time = Instant::now();
698                    }
699                    right_time += right_start_time.elapsed();
700                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
701                    self.try_flush_data().await?;
702                }
703                AlignedMessage::Barrier(barrier) => {
704                    let barrier_start_time = Instant::now();
705                    let (left_post_commit, right_post_commit) =
706                        self.flush_data(barrier.epoch).await?;
707
708                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
709
710                    // We don't include the time post yielding barrier because the vnode update
711                    // is a one-off and rare operation.
712                    barrier_join_match_duration_ns
713                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
714                    yield Message::Barrier(barrier);
715
716                    // Update the vnode bitmap for state tables of both sides if asked.
717                    right_post_commit
718                        .post_yield_barrier(update_vnode_bitmap.clone())
719                        .await?;
720                    if left_post_commit
721                        .post_yield_barrier(update_vnode_bitmap)
722                        .await?
723                        .unwrap_or(false)
724                    {
725                        self.watermark_buffers
726                            .values_mut()
727                            .for_each(|buffers| buffers.clear());
728                        self.inequality_watermarks.fill(None);
729                    }
730
731                    // Report metrics of cached join rows/entries
732                    for (join_cached_entry_count, ht) in [
733                        (&left_join_cached_entry_count, &self.side_l.ht),
734                        (&right_join_cached_entry_count, &self.side_r.ht),
735                    ] {
736                        join_cached_entry_count.set(ht.entry_count() as i64);
737                    }
738                }
739            }
740            start_time = Instant::now();
741        }
742    }
743
744    async fn flush_data(
745        &mut self,
746        epoch: EpochPair,
747    ) -> StreamExecutorResult<(
748        JoinHashMapPostCommit<'_, K, S, E>,
749        JoinHashMapPostCommit<'_, K, S, E>,
750    )> {
751        // All changes to the state has been buffered in the mem-table of the state table. Just
752        // `commit` them here.
753        let left = self.side_l.ht.flush(epoch).await?;
754        let right = self.side_r.ht.flush(epoch).await?;
755        Ok((left, right))
756    }
757
758    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
759        // All changes to the state has been buffered in the mem-table of the state table. Just
760        // `commit` them here.
761        self.side_l.ht.try_flush().await?;
762        self.side_r.ht.try_flush().await?;
763        Ok(())
764    }
765
766    // We need to manually evict the cache.
767    fn evict_cache(
768        side_update: &mut JoinSide<K, S, E>,
769        side_match: &mut JoinSide<K, S, E>,
770        cnt_rows_received: &mut u32,
771    ) {
772        *cnt_rows_received += 1;
773        if *cnt_rows_received == EVICT_EVERY_N_ROWS {
774            side_update.ht.evict();
775            side_match.ht.evict();
776            *cnt_rows_received = 0;
777        }
778    }
779
780    fn handle_watermark(
781        &mut self,
782        side: SideTypePrimitive,
783        watermark: Watermark,
784    ) -> StreamExecutorResult<Vec<Watermark>> {
785        let (side_update, side_match) = if side == SideType::Left {
786            (&mut self.side_l, &mut self.side_r)
787        } else {
788            (&mut self.side_r, &mut self.side_l)
789        };
790
791        // Process join-key watermarks
792        let wm_in_jk = side_update
793            .join_key_indices
794            .iter()
795            .positions(|idx| *idx == watermark.col_idx);
796        let mut watermarks_to_emit = vec![];
797        for idx in wm_in_jk {
798            let buffers = self
799                .watermark_buffers
800                .entry(idx)
801                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
802            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
803                if self
804                    .watermark_indices_in_jk
805                    .iter()
806                    .any(|(jk_pos, do_clean)| *jk_pos == idx && *do_clean)
807                {
808                    side_match
809                        .ht
810                        .update_watermark(selected_watermark.val.clone());
811                    side_update
812                        .ht
813                        .update_watermark(selected_watermark.val.clone());
814                }
815
816                let empty_indices = vec![];
817                let output_indices = side_update
818                    .i2o_mapping_indexed
819                    .get_vec(&side_update.join_key_indices[idx])
820                    .unwrap_or(&empty_indices)
821                    .iter()
822                    .chain(
823                        side_match
824                            .i2o_mapping_indexed
825                            .get_vec(&side_match.join_key_indices[idx])
826                            .unwrap_or(&empty_indices),
827                    );
828                for output_idx in output_indices {
829                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
830                }
831            };
832        }
833
834        // Process inequality watermarks
835        // We can only yield the LARGER side's watermark downstream.
836        // For `left >= right`: left is larger, emit for left output columns
837        // For `left <= right`: right is larger, emit for right output columns
838        let mut update_left_watermark = None;
839        let mut update_right_watermark = None;
840        if let Some(watermark_indices) = side_update.input2inequality_index.get(watermark.col_idx) {
841            for (inequality_index, _) in watermark_indices {
842                let buffers = self
843                    .watermark_buffers
844                    .entry(side_update.join_key_indices.len() + inequality_index)
845                    .or_insert_with(|| {
846                        BufferedWatermarks::with_ids([SideType::Left, SideType::Right])
847                    });
848                if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone())
849                {
850                    let (output_indices, pair_info) = &self.inequality_pairs[*inequality_index];
851                    let left_is_larger = pair_info.left_side_is_larger();
852
853                    // Emit watermark for the larger side's output columns only
854                    for output_idx in output_indices {
855                        watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
856                    }
857                    // Store watermark for state cleaning (via useful_state_clean_columns)
858                    self.inequality_watermarks[*inequality_index] =
859                        Some(selected_watermark.clone());
860
861                    // Mark which side needs state cleaning (only if the clean flag is set)
862                    if left_is_larger && pair_info.clean_left_state {
863                        update_left_watermark = Some(selected_watermark.val.clone());
864                    } else if !left_is_larger && pair_info.clean_right_state {
865                        update_right_watermark = Some(selected_watermark.val.clone());
866                    }
867                }
868            }
869            // Do state cleaning on the larger side.
870            // Now that degree tables include the inequality column, we can clean both
871            // state and degree tables using `update_watermark`.
872            if let Some(val) = update_left_watermark {
873                self.side_l.ht.update_watermark(val);
874            }
875            if let Some(val) = update_right_watermark {
876                self.side_r.ht.update_watermark(val);
877            }
878        }
879        Ok(watermarks_to_emit)
880    }
881
882    fn row_concat(
883        row_update: impl Row,
884        update_start_pos: usize,
885        row_matched: impl Row,
886        matched_start_pos: usize,
887    ) -> OwnedRow {
888        let mut new_row = vec![None; row_update.len() + row_matched.len()];
889
890        for (i, datum_ref) in row_update.iter().enumerate() {
891            new_row[i + update_start_pos] = datum_ref.to_owned_datum();
892        }
893        for (i, datum_ref) in row_matched.iter().enumerate() {
894            new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
895        }
896        OwnedRow::new(new_row)
897    }
898
899    /// Used to forward `eq_join_oneside` to show join side in stack.
900    fn eq_join_left(
901        args: EqJoinArgs<'_, K, S, E>,
902    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
903        Self::eq_join_oneside::<{ SideType::Left }>(args)
904    }
905
906    /// Used to forward `eq_join_oneside` to show join side in stack.
907    fn eq_join_right(
908        args: EqJoinArgs<'_, K, S, E>,
909    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
910        Self::eq_join_oneside::<{ SideType::Right }>(args)
911    }
912
913    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
914    async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
915        let EqJoinArgs {
916            ctx,
917            side_l,
918            side_r,
919            actual_output_data_types,
920            cond,
921            inequality_watermarks,
922            chunk,
923            append_only_optimize,
924            chunk_size,
925            cnt_rows_received,
926            high_join_amplification_threshold,
927            entry_state_max_rows,
928            ..
929        } = args;
930
931        let (side_update, side_match) = if SIDE == SideType::Left {
932            (side_l, side_r)
933        } else {
934            (side_r, side_l)
935        };
936
937        let useful_state_clean_columns = side_match
938            .state_clean_columns
939            .iter()
940            .filter_map(|(column_idx, inequality_index)| {
941                inequality_watermarks[*inequality_index]
942                    .as_ref()
943                    .map(|watermark| (*column_idx, watermark))
944            })
945            .collect_vec();
946
947        let mut hashjoin_chunk_builder =
948            JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
949                chunk_size,
950                actual_output_data_types.to_vec(),
951                side_update.i2o_mapping.clone(),
952                side_match.i2o_mapping.clone(),
953            ));
954
955        let join_matched_join_keys = ctx
956            .streaming_metrics
957            .join_matched_join_keys
958            .with_guarded_label_values(&[
959                &ctx.id.to_string(),
960                &ctx.fragment_id.to_string(),
961                &side_update.ht.table_id().to_string(),
962            ]);
963
964        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
965        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
966            let Some((op, row)) = r else {
967                continue;
968            };
969            Self::evict_cache(side_update, side_match, cnt_rows_received);
970
971            let cache_lookup_result = {
972                let probe_non_null_requirement_satisfied = side_update
973                    .non_null_fields
974                    .iter()
975                    .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
976                let build_non_null_requirement_satisfied =
977                    key.null_bitmap().is_subset(side_match.ht.null_matched());
978                if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
979                    side_match.ht.take_state_opt(key)
980                } else {
981                    CacheResult::NeverMatch
982                }
983            };
984            let mut total_matches = 0;
985
986            macro_rules! match_rows {
987                ($op:ident) => {
988                    Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
989                        cache_lookup_result,
990                        row,
991                        key,
992                        &mut hashjoin_chunk_builder,
993                        side_match,
994                        side_update,
995                        &useful_state_clean_columns,
996                        cond,
997                        append_only_optimize,
998                        entry_state_max_rows,
999                    )
1000                };
1001            }
1002
1003            match op {
1004                Op::Insert | Op::UpdateInsert =>
1005                {
1006                    #[for_await]
1007                    for chunk in match_rows!(Insert) {
1008                        let chunk = chunk?;
1009                        total_matches += chunk.cardinality();
1010                        yield chunk;
1011                    }
1012                }
1013                Op::Delete | Op::UpdateDelete =>
1014                {
1015                    #[for_await]
1016                    for chunk in match_rows!(Delete) {
1017                        let chunk = chunk?;
1018                        total_matches += chunk.cardinality();
1019                        yield chunk;
1020                    }
1021                }
1022            };
1023
1024            join_matched_join_keys.observe(total_matches as _);
1025            if total_matches > high_join_amplification_threshold {
1026                let join_key_data_types = side_update.ht.join_key_data_types();
1027                let key = key.deserialize(join_key_data_types)?;
1028                tracing::warn!(target: "high_join_amplification",
1029                    matched_rows_len = total_matches,
1030                    update_table_id = %side_update.ht.table_id(),
1031                    match_table_id = %side_match.ht.table_id(),
1032                    join_key = ?key,
1033                    actor_id = %ctx.id,
1034                    fragment_id = %ctx.fragment_id,
1035                    "large rows matched for join key"
1036                );
1037            }
1038        }
1039        // NOTE(kwannoel): We don't track metrics for this last chunk.
1040        if let Some(chunk) = hashjoin_chunk_builder.take() {
1041            yield chunk;
1042        }
1043    }
1044
1045    /// For the probe-side row, we need to check if it has values in cache, if not, we need to
1046    /// fetch the matched rows from the state table.
1047    ///
1048    /// Every matched build-side row being processed needs to go through the following phases:
1049    /// 1. Handle join condition evaluation.
1050    /// 2. Always do cache refill, if the state count is good.
1051    /// 3. Handle state cleaning.
1052    /// 4. Handle degree table update.
1053    #[allow(clippy::too_many_arguments)]
1054    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
1055    async fn handle_match_rows<
1056        'a,
1057        const SIDE: SideTypePrimitive,
1058        const JOIN_OP: JoinOpPrimitive,
1059    >(
1060        cached_lookup_result: CacheResult<E>,
1061        row: RowRef<'a>,
1062        key: &'a K,
1063        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1064        side_match: &'a mut JoinSide<K, S, E>,
1065        side_update: &'a mut JoinSide<K, S, E>,
1066        useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1067        cond: &'a mut Option<NonStrictExpression>,
1068        append_only_optimize: bool,
1069        entry_state_max_rows: usize,
1070    ) {
1071        let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1072        let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1073        let mut entry_state_count = 0;
1074
1075        let mut degree = 0;
1076        let mut append_only_matched_row = None;
1077        let mut matched_rows_to_clean = vec![];
1078
1079        macro_rules! match_row {
1080            (
1081                $match_order_key_indices:expr,
1082                $degree_table:expr,
1083                $matched_row:expr,
1084                $matched_row_ref:expr,
1085                $from_cache:literal,
1086                $map_output:expr,
1087            ) => {
1088                Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1089                    row,
1090                    $matched_row,
1091                    $matched_row_ref,
1092                    hashjoin_chunk_builder,
1093                    $match_order_key_indices,
1094                    $degree_table,
1095                    side_update.start_pos,
1096                    side_match.start_pos,
1097                    cond,
1098                    &mut degree,
1099                    useful_state_clean_columns,
1100                    append_only_optimize,
1101                    &mut append_only_matched_row,
1102                    &mut matched_rows_to_clean,
1103                    $map_output,
1104                )
1105            };
1106        }
1107
1108        let entry_state = match cached_lookup_result {
1109            CacheResult::NeverMatch => {
1110                let op = match JOIN_OP {
1111                    JoinOp::Insert => Op::Insert,
1112                    JoinOp::Delete => Op::Delete,
1113                };
1114                if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1115                    yield chunk;
1116                }
1117                return Ok(());
1118            }
1119            CacheResult::Hit(mut cached_rows) => {
1120                let (match_order_key_indices, match_degree_state) =
1121                    side_match.ht.get_degree_state_mut_ref();
1122                // Handle cached rows which match the probe-side row.
1123                for (matched_row_ref, matched_row) in
1124                    cached_rows.values_mut(&side_match.all_data_types)
1125                {
1126                    let matched_row = matched_row?;
1127                    if let Some(chunk) = match_row!(
1128                        match_order_key_indices,
1129                        match_degree_state,
1130                        matched_row,
1131                        Some(matched_row_ref),
1132                        true,
1133                        Either::Left,
1134                    )
1135                    .await
1136                    {
1137                        yield chunk;
1138                    }
1139                }
1140
1141                cached_rows
1142            }
1143            CacheResult::Miss => {
1144                // Handle rows which are not in cache.
1145                let (matched_rows, match_order_key_indices, degree_table) = side_match
1146                    .ht
1147                    .fetch_matched_rows_and_get_degree_table_ref(key)
1148                    .await?;
1149
1150                #[for_await]
1151                for matched_row in matched_rows {
1152                    let (encoded_pk, matched_row) = matched_row?;
1153
1154                    let mut matched_row_ref = None;
1155
1156                    // cache refill
1157                    if entry_state_count <= entry_state_max_rows {
1158                        let row_ref = entry_state
1159                            .insert(encoded_pk, E::encode(&matched_row), None) // TODO(kwannoel): handle ineq key for asof join.
1160                            .with_context(|| format!("row: {}", row.display(),))?;
1161                        matched_row_ref = Some(row_ref);
1162                        entry_state_count += 1;
1163                    }
1164                    if let Some(chunk) = match_row!(
1165                        match_order_key_indices,
1166                        degree_table,
1167                        matched_row,
1168                        matched_row_ref,
1169                        false,
1170                        Either::Right,
1171                    )
1172                    .await
1173                    {
1174                        yield chunk;
1175                    }
1176                }
1177                Box::new(entry_state)
1178            }
1179        };
1180
1181        // forward rows depending on join types
1182        let op = match JOIN_OP {
1183            JoinOp::Insert => Op::Insert,
1184            JoinOp::Delete => Op::Delete,
1185        };
1186        if degree == 0 {
1187            if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1188                yield chunk;
1189            }
1190        } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1191        {
1192            yield chunk;
1193        }
1194
1195        // cache refill
1196        if cache_hit || entry_state_count <= entry_state_max_rows {
1197            side_match.ht.update_state(key, entry_state);
1198        }
1199
1200        // watermark state cleaning
1201        for matched_row in matched_rows_to_clean {
1202            // The committed table watermark is responsible for reclaiming rows in the state table.
1203            side_match.ht.delete_row_in_mem(key, &matched_row.row)?;
1204        }
1205
1206        // apply append_only optimization to clean matched_rows which have been persisted
1207        if append_only_optimize && let Some(row) = append_only_matched_row {
1208            assert_matches!(JOIN_OP, JoinOp::Insert);
1209            side_match.ht.delete_handle_degree(key, row)?;
1210            return Ok(());
1211        }
1212
1213        // no append_only optimization, update state table(s).
1214        match JOIN_OP {
1215            JoinOp::Insert => {
1216                side_update
1217                    .ht
1218                    .insert_handle_degree(key, JoinRow::new(row, degree))?;
1219            }
1220            JoinOp::Delete => {
1221                side_update
1222                    .ht
1223                    .delete_handle_degree(key, JoinRow::new(row, degree))?;
1224            }
1225        }
1226    }
1227
1228    #[allow(clippy::too_many_arguments)]
1229    #[inline]
1230    async fn handle_match_row<
1231        'a,
1232        R: Row,  // input row type
1233        RO: Row, // output row type
1234        const SIDE: SideTypePrimitive,
1235        const JOIN_OP: JoinOpPrimitive,
1236        const MATCHED_ROWS_FROM_CACHE: bool,
1237    >(
1238        update_row: RowRef<'a>,
1239        mut matched_row: JoinRow<R>,
1240        mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1241        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1242        match_order_key_indices: &[usize],
1243        match_degree_table: &mut Option<TableInner<S>>,
1244        side_update_start_pos: usize,
1245        side_match_start_pos: usize,
1246        cond: &Option<NonStrictExpression>,
1247        update_row_degree: &mut u64,
1248        useful_state_clean_columns: &[(usize, &'a Watermark)],
1249        append_only_optimize: bool,
1250        append_only_matched_row: &mut Option<JoinRow<RO>>,
1251        matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1252        map_output: impl Fn(R) -> RO,
1253    ) -> Option<StreamChunk> {
1254        let mut need_state_clean = false;
1255        let mut chunk_opt = None;
1256        // TODO(kwannoel): Instead of evaluating this every loop,
1257        // we can call this only if there's a non-equi expression.
1258        // check join cond
1259        let join_condition_satisfied = Self::check_join_condition(
1260            update_row,
1261            side_update_start_pos,
1262            &matched_row.row,
1263            side_match_start_pos,
1264            cond,
1265        )
1266        .await;
1267
1268        if join_condition_satisfied {
1269            // update degree
1270            *update_row_degree += 1;
1271            // send matched row downstream
1272
1273            // Inserts must happen before degrees are updated,
1274            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1275            if matches!(JOIN_OP, JoinOp::Insert)
1276                && !forward_exactly_once(T, SIDE)
1277                && let Some(chunk) =
1278                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1279            {
1280                chunk_opt = Some(chunk);
1281            }
1282            // TODO(kwannoel): We can actually statically decide this, using join side + join type.
1283            if let Some(degree_table) = match_degree_table {
1284                update_degree::<S, { JOIN_OP }>(
1285                    match_order_key_indices,
1286                    degree_table,
1287                    &mut matched_row,
1288                );
1289                if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1290                    // update matched row in cache
1291                    match JOIN_OP {
1292                        JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1293                        JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1294                    }
1295                }
1296            }
1297
1298            // Deletes must happen after degree table is updated.
1299            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1300            if matches!(JOIN_OP, JoinOp::Delete)
1301                && !forward_exactly_once(T, SIDE)
1302                && let Some(chunk) =
1303                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1304            {
1305                chunk_opt = Some(chunk);
1306            }
1307        } else {
1308            // check if need state cleaning
1309            for (column_idx, watermark) in useful_state_clean_columns {
1310                if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1311                    scalar
1312                        .default_cmp(&watermark.val.as_scalar_ref_impl())
1313                        .is_lt()
1314                }) {
1315                    need_state_clean = true;
1316                    break;
1317                }
1318            }
1319        }
1320        // If the stream is append-only and the join key covers pk in both side,
1321        // then we can remove matched rows since pk is unique and will not be
1322        // inserted again
1323        if append_only_optimize {
1324            assert_matches!(JOIN_OP, JoinOp::Insert);
1325            // Since join key contains pk and pk is unique, there should be only
1326            // one row if matched.
1327            assert!(append_only_matched_row.is_none());
1328            *append_only_matched_row = Some(matched_row.map(map_output));
1329        } else if need_state_clean {
1330            debug_assert!(
1331                !append_only_optimize,
1332                "`append_only_optimize` and `need_state_clean` must not both be true"
1333            );
1334            matched_rows_to_clean.push(matched_row.map(map_output));
1335        }
1336
1337        chunk_opt
1338    }
1339
1340    // TODO(yuhao-su): We should find a better way to eval the expression
1341    // without concat two rows.
1342    // if there are non-equi expressions
1343    // NOTE(kwannoel): We can probably let `eval` use `impl Row` instead of `OwnedRow`.
1344    #[inline]
1345    async fn check_join_condition(
1346        row: impl Row,
1347        side_update_start_pos: usize,
1348        matched_row: impl Row,
1349        side_match_start_pos: usize,
1350        join_condition: &Option<NonStrictExpression>,
1351    ) -> bool {
1352        if let Some(join_condition) = join_condition {
1353            let new_row = Self::row_concat(
1354                row,
1355                side_update_start_pos,
1356                matched_row,
1357                side_match_start_pos,
1358            );
1359            join_condition
1360                .eval_row_infallible(&new_row)
1361                .await
1362                .map(|s| *s.as_bool())
1363                .unwrap_or(false)
1364        } else {
1365            true
1366        }
1367    }
1368}
1369
1370#[cfg(test)]
1371mod tests {
1372    use std::sync::atomic::AtomicU64;
1373
1374    use pretty_assertions::assert_eq;
1375    use risingwave_common::array::*;
1376    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1377    use risingwave_common::hash::{Key64, Key128};
1378    use risingwave_common::util::epoch::test_epoch;
1379    use risingwave_common::util::sort_util::OrderType;
1380    use risingwave_storage::memory::MemoryStateStore;
1381
1382    use super::*;
1383    use crate::common::table::test_utils::gen_pbtable;
1384    use crate::executor::MemoryEncoding;
1385    use crate::executor::test_utils::expr::build_from_pretty;
1386    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1387
1388    async fn create_in_memory_state_table(
1389        mem_state: MemoryStateStore,
1390        data_types: &[DataType],
1391        order_types: &[OrderType],
1392        pk_indices: &[usize],
1393        table_id: u32,
1394    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1395        create_in_memory_state_table_with_inequality(
1396            mem_state,
1397            data_types,
1398            order_types,
1399            pk_indices,
1400            table_id,
1401            None,
1402        )
1403        .await
1404    }
1405
1406    async fn create_in_memory_state_table_with_inequality(
1407        mem_state: MemoryStateStore,
1408        data_types: &[DataType],
1409        order_types: &[OrderType],
1410        pk_indices: &[usize],
1411        table_id: u32,
1412        degree_inequality_type: Option<DataType>,
1413    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1414        create_in_memory_state_table_with_watermark(
1415            mem_state,
1416            data_types,
1417            order_types,
1418            pk_indices,
1419            table_id,
1420            degree_inequality_type,
1421            vec![],
1422            vec![],
1423        )
1424        .await
1425    }
1426
1427    #[allow(clippy::too_many_arguments)]
1428    async fn create_in_memory_state_table_with_watermark(
1429        mem_state: MemoryStateStore,
1430        data_types: &[DataType],
1431        order_types: &[OrderType],
1432        pk_indices: &[usize],
1433        table_id: u32,
1434        degree_inequality_type: Option<DataType>,
1435        state_clean_watermark_indices: Vec<usize>,
1436        degree_clean_watermark_indices: Vec<usize>,
1437    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1438        let column_descs = data_types
1439            .iter()
1440            .enumerate()
1441            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1442            .collect_vec();
1443        let mut state_table_catalog = gen_pbtable(
1444            TableId::new(table_id),
1445            column_descs,
1446            order_types.to_vec(),
1447            pk_indices.to_vec(),
1448            0,
1449        );
1450        state_table_catalog.clean_watermark_indices = state_clean_watermark_indices
1451            .into_iter()
1452            .map(|idx| idx as u32)
1453            .collect();
1454        let state_table =
1455            StateTable::from_table_catalog(&state_table_catalog, mem_state.clone(), None).await;
1456
1457        // Create degree table with schema: [pk..., _degree, inequality?]
1458        let mut degree_table_column_descs = vec![];
1459        pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1460            degree_table_column_descs.push(ColumnDesc::unnamed(
1461                ColumnId::new(pk_id as i32),
1462                data_types[*idx].clone(),
1463            ))
1464        });
1465        // Add _degree column
1466        degree_table_column_descs.push(ColumnDesc::unnamed(
1467            ColumnId::new(pk_indices.len() as i32),
1468            DataType::Int64,
1469        ));
1470        // Add inequality column if present
1471        if let Some(ineq_type) = degree_inequality_type {
1472            degree_table_column_descs.push(ColumnDesc::unnamed(
1473                ColumnId::new((pk_indices.len() + 1) as i32),
1474                ineq_type,
1475            ));
1476        }
1477        let mut degree_table_catalog = gen_pbtable(
1478            TableId::new(table_id + 1),
1479            degree_table_column_descs,
1480            order_types.to_vec(),
1481            pk_indices.to_vec(),
1482            0,
1483        );
1484        degree_table_catalog.clean_watermark_indices = degree_clean_watermark_indices
1485            .into_iter()
1486            .map(|idx| idx as u32)
1487            .collect();
1488        let degree_state_table =
1489            StateTable::from_table_catalog(&degree_table_catalog, mem_state, None).await;
1490        (state_table, degree_state_table)
1491    }
1492
1493    fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1494        build_from_pretty(
1495            condition_text
1496                .as_deref()
1497                .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1498        )
1499    }
1500
1501    async fn create_executor<const T: JoinTypePrimitive>(
1502        with_condition: bool,
1503        null_safe: bool,
1504        condition_text: Option<String>,
1505        inequality_pairs: Vec<InequalityPairInfo>,
1506    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1507        let schema = Schema {
1508            fields: vec![
1509                Field::unnamed(DataType::Int64), // join key
1510                Field::unnamed(DataType::Int64),
1511            ],
1512        };
1513        let (tx_l, source_l) = MockSource::channel();
1514        let source_l = source_l.into_executor(schema.clone(), vec![1]);
1515        let (tx_r, source_r) = MockSource::channel();
1516        let source_r = source_r.into_executor(schema, vec![1]);
1517        let params_l = JoinParams::new(vec![0], vec![1]);
1518        let params_r = JoinParams::new(vec![0], vec![1]);
1519        let cond = with_condition.then(|| create_cond(condition_text));
1520
1521        let mem_state = MemoryStateStore::new();
1522
1523        // Determine inequality column types for degree tables based on inequality_pairs
1524        let l_degree_ineq_type = inequality_pairs
1525            .iter()
1526            .find(|pair| pair.clean_left_state)
1527            .map(|_| DataType::Int64); // Column 1 is Int64
1528        let r_degree_ineq_type = inequality_pairs
1529            .iter()
1530            .find(|pair| pair.clean_right_state)
1531            .map(|_| DataType::Int64); // Column 1 is Int64
1532        let l_clean_watermark_indices = inequality_pairs
1533            .iter()
1534            .find(|pair| pair.clean_left_state)
1535            .map(|pair| vec![pair.left_idx])
1536            .unwrap_or_default();
1537        let r_clean_watermark_indices = inequality_pairs
1538            .iter()
1539            .find(|pair| pair.clean_right_state)
1540            .map(|pair| vec![pair.right_idx])
1541            .unwrap_or_default();
1542        let degree_inequality_column_idx = 3;
1543        let l_degree_clean_watermark_indices = l_degree_ineq_type
1544            .as_ref()
1545            .map(|_| vec![degree_inequality_column_idx])
1546            .unwrap_or_default();
1547        let r_degree_clean_watermark_indices = r_degree_ineq_type
1548            .as_ref()
1549            .map(|_| vec![degree_inequality_column_idx])
1550            .unwrap_or_default();
1551
1552        let (state_l, degree_state_l) = create_in_memory_state_table_with_watermark(
1553            mem_state.clone(),
1554            &[DataType::Int64, DataType::Int64],
1555            &[OrderType::ascending(), OrderType::ascending()],
1556            &[0, 1],
1557            0,
1558            l_degree_ineq_type,
1559            l_clean_watermark_indices,
1560            l_degree_clean_watermark_indices,
1561        )
1562        .await;
1563
1564        let (state_r, degree_state_r) = create_in_memory_state_table_with_watermark(
1565            mem_state,
1566            &[DataType::Int64, DataType::Int64],
1567            &[OrderType::ascending(), OrderType::ascending()],
1568            &[0, 1],
1569            2,
1570            r_degree_ineq_type,
1571            r_clean_watermark_indices,
1572            r_degree_clean_watermark_indices,
1573        )
1574        .await;
1575
1576        let schema = match T {
1577            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1578            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1579            _ => [source_l.schema().fields(), source_r.schema().fields()]
1580                .concat()
1581                .into_iter()
1582                .collect(),
1583        };
1584        let schema_len = schema.len();
1585        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1586
1587        let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1588            ActorContext::for_test(123),
1589            info,
1590            source_l,
1591            source_r,
1592            params_l,
1593            params_r,
1594            vec![null_safe],
1595            (0..schema_len).collect_vec(),
1596            cond,
1597            inequality_pairs,
1598            state_l,
1599            degree_state_l,
1600            state_r,
1601            degree_state_r,
1602            Arc::new(AtomicU64::new(0)),
1603            false,
1604            Arc::new(StreamingMetrics::unused()),
1605            1024,
1606            2048,
1607            vec![(0, true)],
1608        );
1609        (tx_l, tx_r, executor.boxed().execute())
1610    }
1611
1612    async fn create_classical_executor<const T: JoinTypePrimitive>(
1613        with_condition: bool,
1614        null_safe: bool,
1615        condition_text: Option<String>,
1616    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1617        create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1618    }
1619
1620    async fn create_append_only_executor<const T: JoinTypePrimitive>(
1621        with_condition: bool,
1622    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1623        let schema = Schema {
1624            fields: vec![
1625                Field::unnamed(DataType::Int64),
1626                Field::unnamed(DataType::Int64),
1627                Field::unnamed(DataType::Int64),
1628            ],
1629        };
1630        let (tx_l, source_l) = MockSource::channel();
1631        let source_l = source_l.into_executor(schema.clone(), vec![0]);
1632        let (tx_r, source_r) = MockSource::channel();
1633        let source_r = source_r.into_executor(schema, vec![0]);
1634        let params_l = JoinParams::new(vec![0, 1], vec![]);
1635        let params_r = JoinParams::new(vec![0, 1], vec![]);
1636        let cond = with_condition.then(|| create_cond(None));
1637
1638        let mem_state = MemoryStateStore::new();
1639
1640        let (state_l, degree_state_l) = create_in_memory_state_table(
1641            mem_state.clone(),
1642            &[DataType::Int64, DataType::Int64, DataType::Int64],
1643            &[
1644                OrderType::ascending(),
1645                OrderType::ascending(),
1646                OrderType::ascending(),
1647            ],
1648            &[0, 1, 0],
1649            0,
1650        )
1651        .await;
1652
1653        let (state_r, degree_state_r) = create_in_memory_state_table(
1654            mem_state,
1655            &[DataType::Int64, DataType::Int64, DataType::Int64],
1656            &[
1657                OrderType::ascending(),
1658                OrderType::ascending(),
1659                OrderType::ascending(),
1660            ],
1661            &[0, 1, 1],
1662            1,
1663        )
1664        .await;
1665
1666        let schema = match T {
1667            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1668            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1669            _ => [source_l.schema().fields(), source_r.schema().fields()]
1670                .concat()
1671                .into_iter()
1672                .collect(),
1673        };
1674        let schema_len = schema.len();
1675        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1676
1677        let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1678            ActorContext::for_test(123),
1679            info,
1680            source_l,
1681            source_r,
1682            params_l,
1683            params_r,
1684            vec![false],
1685            (0..schema_len).collect_vec(),
1686            cond,
1687            vec![],
1688            state_l,
1689            degree_state_l,
1690            state_r,
1691            degree_state_r,
1692            Arc::new(AtomicU64::new(0)),
1693            true,
1694            Arc::new(StreamingMetrics::unused()),
1695            1024,
1696            2048,
1697            vec![(0, true)],
1698        );
1699        (tx_l, tx_r, executor.boxed().execute())
1700    }
1701
1702    #[tokio::test]
1703    async fn test_inequality_join_watermark() -> StreamExecutorResult<()> {
1704        // Test inequality join with watermark-based state cleanup
1705        // Condition: left.col1 >= right.col1 (left side is larger, clean left state)
1706        // Left state rows with col1 < watermark will be cleaned
1707        let chunk_l1 = StreamChunk::from_pretty(
1708            "  I I
1709             + 2 4
1710             + 2 7
1711             + 3 8",
1712        );
1713        let chunk_r1 = StreamChunk::from_pretty(
1714            "  I I
1715             + 2 6",
1716        );
1717        let chunk_r2 = StreamChunk::from_pretty(
1718            "  I I
1719             + 2 3",
1720        );
1721        // Test with condition: left.col1 >= right.col1
1722        let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1723            true,
1724            false,
1725            Some(String::from(
1726                "(greater_than_or_equal:boolean $1:int8 $3:int8)",
1727            )),
1728            vec![InequalityPairInfo {
1729                left_idx: 1,
1730                right_idx: 1,
1731                clean_left_state: true, // left >= right, left is larger
1732                clean_right_state: false,
1733                op: InequalityType::GreaterThanOrEqual,
1734            }],
1735        )
1736        .await;
1737
1738        // push the init barrier for left and right
1739        tx_l.push_barrier(test_epoch(1), false);
1740        tx_r.push_barrier(test_epoch(1), false);
1741        hash_join.next_unwrap_ready_barrier()?;
1742
1743        // push the left chunk: (2, 4), (2, 7), (3, 8)
1744        tx_l.push_chunk(chunk_l1);
1745        hash_join.next_unwrap_pending();
1746
1747        // push watermarks: left=10, right=6
1748        // Output watermark is min(10, 6) = 6 for both output columns
1749        tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1750        hash_join.next_unwrap_pending();
1751
1752        tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1753        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1754        assert_eq!(
1755            output_watermark,
1756            Watermark::new(1, DataType::Int64, ScalarImpl::Int64(6))
1757        );
1758
1759        // After watermark, left state rows with col1 < 6 are cleaned
1760        // Row (2, 4) should be cleaned (4 < 6)
1761        // Row (2, 7) should remain (7 >= 6)
1762        // Row (3, 8) should remain (8 >= 6)
1763
1764        // push right chunk (2, 6)
1765        // Only (2, 7) can match: 7 >= 6 is TRUE
1766        // (2, 4) was cleaned, so it won't match
1767        tx_r.push_chunk(chunk_r1);
1768        let chunk = hash_join.next_unwrap_ready_chunk()?;
1769        assert_eq!(
1770            chunk,
1771            StreamChunk::from_pretty(
1772                " I I I I
1773                + 2 7 2 6"
1774            )
1775        );
1776
1777        // push right chunk (2, 3)
1778        // (2, 7) can match: 7 >= 3 is TRUE
1779        // (2, 4) was cleaned, won't match
1780        tx_r.push_chunk(chunk_r2);
1781        let chunk = hash_join.next_unwrap_ready_chunk()?;
1782        assert_eq!(
1783            chunk,
1784            StreamChunk::from_pretty(
1785                " I I I I
1786                + 2 7 2 3"
1787            )
1788        );
1789
1790        Ok(())
1791    }
1792
1793    #[tokio::test]
1794    async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1795        let chunk_l1 = StreamChunk::from_pretty(
1796            "  I I
1797             + 1 4
1798             + 2 5
1799             + 3 6",
1800        );
1801        let chunk_l2 = StreamChunk::from_pretty(
1802            "  I I
1803             + 3 8
1804             - 3 8",
1805        );
1806        let chunk_r1 = StreamChunk::from_pretty(
1807            "  I I
1808             + 2 7
1809             + 4 8
1810             + 6 9",
1811        );
1812        let chunk_r2 = StreamChunk::from_pretty(
1813            "  I  I
1814             + 3 10
1815             + 6 11",
1816        );
1817        let (mut tx_l, mut tx_r, mut hash_join) =
1818            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1819
1820        // push the init barrier for left and right
1821        tx_l.push_barrier(test_epoch(1), false);
1822        tx_r.push_barrier(test_epoch(1), false);
1823        hash_join.next_unwrap_ready_barrier()?;
1824
1825        // push the 1st left chunk
1826        tx_l.push_chunk(chunk_l1);
1827        hash_join.next_unwrap_pending();
1828
1829        // push the init barrier for left and right
1830        tx_l.push_barrier(test_epoch(2), false);
1831        tx_r.push_barrier(test_epoch(2), false);
1832        hash_join.next_unwrap_ready_barrier()?;
1833
1834        // push the 2nd left chunk
1835        tx_l.push_chunk(chunk_l2);
1836        hash_join.next_unwrap_pending();
1837
1838        // push the 1st right chunk
1839        tx_r.push_chunk(chunk_r1);
1840        let chunk = hash_join.next_unwrap_ready_chunk()?;
1841        assert_eq!(
1842            chunk,
1843            StreamChunk::from_pretty(
1844                " I I I I
1845                + 2 5 2 7"
1846            )
1847        );
1848
1849        // push the 2nd right chunk
1850        tx_r.push_chunk(chunk_r2);
1851        let chunk = hash_join.next_unwrap_ready_chunk()?;
1852        assert_eq!(
1853            chunk,
1854            StreamChunk::from_pretty(
1855                " I I I I
1856                + 3 6 3 10"
1857            )
1858        );
1859
1860        Ok(())
1861    }
1862
1863    #[tokio::test]
1864    async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1865        let chunk_l1 = StreamChunk::from_pretty(
1866            "  I I
1867             + 1 4
1868             + 2 5
1869             + . 6",
1870        );
1871        let chunk_l2 = StreamChunk::from_pretty(
1872            "  I I
1873             + . 8
1874             - . 8",
1875        );
1876        let chunk_r1 = StreamChunk::from_pretty(
1877            "  I I
1878             + 2 7
1879             + 4 8
1880             + 6 9",
1881        );
1882        let chunk_r2 = StreamChunk::from_pretty(
1883            "  I  I
1884             + . 10
1885             + 6 11",
1886        );
1887        let (mut tx_l, mut tx_r, mut hash_join) =
1888            create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1889
1890        // push the init barrier for left and right
1891        tx_l.push_barrier(test_epoch(1), false);
1892        tx_r.push_barrier(test_epoch(1), false);
1893        hash_join.next_unwrap_ready_barrier()?;
1894
1895        // push the 1st left chunk
1896        tx_l.push_chunk(chunk_l1);
1897        hash_join.next_unwrap_pending();
1898
1899        // push the init barrier for left and right
1900        tx_l.push_barrier(test_epoch(2), false);
1901        tx_r.push_barrier(test_epoch(2), false);
1902        hash_join.next_unwrap_ready_barrier()?;
1903
1904        // push the 2nd left chunk
1905        tx_l.push_chunk(chunk_l2);
1906        hash_join.next_unwrap_pending();
1907
1908        // push the 1st right chunk
1909        tx_r.push_chunk(chunk_r1);
1910        let chunk = hash_join.next_unwrap_ready_chunk()?;
1911        assert_eq!(
1912            chunk,
1913            StreamChunk::from_pretty(
1914                " I I I I
1915                + 2 5 2 7"
1916            )
1917        );
1918
1919        // push the 2nd right chunk
1920        tx_r.push_chunk(chunk_r2);
1921        let chunk = hash_join.next_unwrap_ready_chunk()?;
1922        assert_eq!(
1923            chunk,
1924            StreamChunk::from_pretty(
1925                " I I I I
1926                + . 6 . 10"
1927            )
1928        );
1929
1930        Ok(())
1931    }
1932
1933    #[tokio::test]
1934    async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1935        let chunk_l1 = StreamChunk::from_pretty(
1936            "  I I
1937             + 1 4
1938             + 2 5
1939             + 3 6",
1940        );
1941        let chunk_l2 = StreamChunk::from_pretty(
1942            "  I I
1943             + 3 8
1944             - 3 8",
1945        );
1946        let chunk_r1 = StreamChunk::from_pretty(
1947            "  I I
1948             + 2 7
1949             + 4 8
1950             + 6 9",
1951        );
1952        let chunk_r2 = StreamChunk::from_pretty(
1953            "  I  I
1954             + 3 10
1955             + 6 11",
1956        );
1957        let chunk_l3 = StreamChunk::from_pretty(
1958            "  I I
1959             + 6 10",
1960        );
1961        let chunk_r3 = StreamChunk::from_pretty(
1962            "  I  I
1963             - 6 11",
1964        );
1965        let chunk_r4 = StreamChunk::from_pretty(
1966            "  I  I
1967             - 6 9",
1968        );
1969        let (mut tx_l, mut tx_r, mut hash_join) =
1970            create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1971
1972        // push the init barrier for left and right
1973        tx_l.push_barrier(test_epoch(1), false);
1974        tx_r.push_barrier(test_epoch(1), false);
1975        hash_join.next_unwrap_ready_barrier()?;
1976
1977        // push the 1st left chunk
1978        tx_l.push_chunk(chunk_l1);
1979        hash_join.next_unwrap_pending();
1980
1981        // push the init barrier for left and right
1982        tx_l.push_barrier(test_epoch(2), false);
1983        tx_r.push_barrier(test_epoch(2), false);
1984        hash_join.next_unwrap_ready_barrier()?;
1985
1986        // push the 2nd left chunk
1987        tx_l.push_chunk(chunk_l2);
1988        hash_join.next_unwrap_pending();
1989
1990        // push the 1st right chunk
1991        tx_r.push_chunk(chunk_r1);
1992        let chunk = hash_join.next_unwrap_ready_chunk()?;
1993        assert_eq!(
1994            chunk,
1995            StreamChunk::from_pretty(
1996                " I I
1997                + 2 5"
1998            )
1999        );
2000
2001        // push the 2nd right chunk
2002        tx_r.push_chunk(chunk_r2);
2003        let chunk = hash_join.next_unwrap_ready_chunk()?;
2004        assert_eq!(
2005            chunk,
2006            StreamChunk::from_pretty(
2007                " I I
2008                + 3 6"
2009            )
2010        );
2011
2012        // push the 3rd left chunk (tests forward_exactly_once)
2013        tx_l.push_chunk(chunk_l3);
2014        let chunk = hash_join.next_unwrap_ready_chunk()?;
2015        assert_eq!(
2016            chunk,
2017            StreamChunk::from_pretty(
2018                " I I
2019                + 6 10"
2020            )
2021        );
2022
2023        // push the 3rd right chunk
2024        // (tests that no change if there are still matches)
2025        tx_r.push_chunk(chunk_r3);
2026        hash_join.next_unwrap_pending();
2027
2028        // push the 3rd left chunk
2029        // (tests that deletion occurs when there are no more matches)
2030        tx_r.push_chunk(chunk_r4);
2031        let chunk = hash_join.next_unwrap_ready_chunk()?;
2032        assert_eq!(
2033            chunk,
2034            StreamChunk::from_pretty(
2035                " I I
2036                - 6 10"
2037            )
2038        );
2039
2040        Ok(())
2041    }
2042
2043    #[tokio::test]
2044    async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
2045        let chunk_l1 = StreamChunk::from_pretty(
2046            "  I I
2047             + 1 4
2048             + 2 5
2049             + . 6",
2050        );
2051        let chunk_l2 = StreamChunk::from_pretty(
2052            "  I I
2053             + . 8
2054             - . 8",
2055        );
2056        let chunk_r1 = StreamChunk::from_pretty(
2057            "  I I
2058             + 2 7
2059             + 4 8
2060             + 6 9",
2061        );
2062        let chunk_r2 = StreamChunk::from_pretty(
2063            "  I  I
2064             + . 10
2065             + 6 11",
2066        );
2067        let chunk_l3 = StreamChunk::from_pretty(
2068            "  I I
2069             + 6 10",
2070        );
2071        let chunk_r3 = StreamChunk::from_pretty(
2072            "  I  I
2073             - 6 11",
2074        );
2075        let chunk_r4 = StreamChunk::from_pretty(
2076            "  I  I
2077             - 6 9",
2078        );
2079        let (mut tx_l, mut tx_r, mut hash_join) =
2080            create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
2081
2082        // push the init barrier for left and right
2083        tx_l.push_barrier(test_epoch(1), false);
2084        tx_r.push_barrier(test_epoch(1), false);
2085        hash_join.next_unwrap_ready_barrier()?;
2086
2087        // push the 1st left chunk
2088        tx_l.push_chunk(chunk_l1);
2089        hash_join.next_unwrap_pending();
2090
2091        // push the init barrier for left and right
2092        tx_l.push_barrier(test_epoch(2), false);
2093        tx_r.push_barrier(test_epoch(2), false);
2094        hash_join.next_unwrap_ready_barrier()?;
2095
2096        // push the 2nd left chunk
2097        tx_l.push_chunk(chunk_l2);
2098        hash_join.next_unwrap_pending();
2099
2100        // push the 1st right chunk
2101        tx_r.push_chunk(chunk_r1);
2102        let chunk = hash_join.next_unwrap_ready_chunk()?;
2103        assert_eq!(
2104            chunk,
2105            StreamChunk::from_pretty(
2106                " I I
2107                + 2 5"
2108            )
2109        );
2110
2111        // push the 2nd right chunk
2112        tx_r.push_chunk(chunk_r2);
2113        let chunk = hash_join.next_unwrap_ready_chunk()?;
2114        assert_eq!(
2115            chunk,
2116            StreamChunk::from_pretty(
2117                " I I
2118                + . 6"
2119            )
2120        );
2121
2122        // push the 3rd left chunk (tests forward_exactly_once)
2123        tx_l.push_chunk(chunk_l3);
2124        let chunk = hash_join.next_unwrap_ready_chunk()?;
2125        assert_eq!(
2126            chunk,
2127            StreamChunk::from_pretty(
2128                " I I
2129                + 6 10"
2130            )
2131        );
2132
2133        // push the 3rd right chunk
2134        // (tests that no change if there are still matches)
2135        tx_r.push_chunk(chunk_r3);
2136        hash_join.next_unwrap_pending();
2137
2138        // push the 3rd left chunk
2139        // (tests that deletion occurs when there are no more matches)
2140        tx_r.push_chunk(chunk_r4);
2141        let chunk = hash_join.next_unwrap_ready_chunk()?;
2142        assert_eq!(
2143            chunk,
2144            StreamChunk::from_pretty(
2145                " I I
2146                - 6 10"
2147            )
2148        );
2149
2150        Ok(())
2151    }
2152
2153    #[tokio::test]
2154    async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
2155        let chunk_l1 = StreamChunk::from_pretty(
2156            "  I I I
2157             + 1 4 1
2158             + 2 5 2
2159             + 3 6 3",
2160        );
2161        let chunk_l2 = StreamChunk::from_pretty(
2162            "  I I I
2163             + 4 9 4
2164             + 5 10 5",
2165        );
2166        let chunk_r1 = StreamChunk::from_pretty(
2167            "  I I I
2168             + 2 5 1
2169             + 4 9 2
2170             + 6 9 3",
2171        );
2172        let chunk_r2 = StreamChunk::from_pretty(
2173            "  I I I
2174             + 1 4 4
2175             + 3 6 5",
2176        );
2177
2178        let (mut tx_l, mut tx_r, mut hash_join) =
2179            create_append_only_executor::<{ JoinType::Inner }>(false).await;
2180
2181        // push the init barrier for left and right
2182        tx_l.push_barrier(test_epoch(1), false);
2183        tx_r.push_barrier(test_epoch(1), false);
2184        hash_join.next_unwrap_ready_barrier()?;
2185
2186        // push the 1st left chunk
2187        tx_l.push_chunk(chunk_l1);
2188        hash_join.next_unwrap_pending();
2189
2190        // push the init barrier for left and right
2191        tx_l.push_barrier(test_epoch(2), false);
2192        tx_r.push_barrier(test_epoch(2), false);
2193        hash_join.next_unwrap_ready_barrier()?;
2194
2195        // push the 2nd left chunk
2196        tx_l.push_chunk(chunk_l2);
2197        hash_join.next_unwrap_pending();
2198
2199        // push the 1st right chunk
2200        tx_r.push_chunk(chunk_r1);
2201        let chunk = hash_join.next_unwrap_ready_chunk()?;
2202        assert_eq!(
2203            chunk,
2204            StreamChunk::from_pretty(
2205                " I I I I I I
2206                + 2 5 2 2 5 1
2207                + 4 9 4 4 9 2"
2208            )
2209        );
2210
2211        // push the 2nd right chunk
2212        tx_r.push_chunk(chunk_r2);
2213        let chunk = hash_join.next_unwrap_ready_chunk()?;
2214        assert_eq!(
2215            chunk,
2216            StreamChunk::from_pretty(
2217                " I I I I I I
2218                + 1 4 1 1 4 4
2219                + 3 6 3 3 6 5"
2220            )
2221        );
2222
2223        Ok(())
2224    }
2225
2226    #[tokio::test]
2227    async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2228        let chunk_l1 = StreamChunk::from_pretty(
2229            "  I I I
2230             + 1 4 1
2231             + 2 5 2
2232             + 3 6 3",
2233        );
2234        let chunk_l2 = StreamChunk::from_pretty(
2235            "  I I I
2236             + 4 9 4
2237             + 5 10 5",
2238        );
2239        let chunk_r1 = StreamChunk::from_pretty(
2240            "  I I I
2241             + 2 5 1
2242             + 4 9 2
2243             + 6 9 3",
2244        );
2245        let chunk_r2 = StreamChunk::from_pretty(
2246            "  I I I
2247             + 1 4 4
2248             + 3 6 5",
2249        );
2250
2251        let (mut tx_l, mut tx_r, mut hash_join) =
2252            create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2253
2254        // push the init barrier for left and right
2255        tx_l.push_barrier(test_epoch(1), false);
2256        tx_r.push_barrier(test_epoch(1), false);
2257        hash_join.next_unwrap_ready_barrier()?;
2258
2259        // push the 1st left chunk
2260        tx_l.push_chunk(chunk_l1);
2261        hash_join.next_unwrap_pending();
2262
2263        // push the init barrier for left and right
2264        tx_l.push_barrier(test_epoch(2), false);
2265        tx_r.push_barrier(test_epoch(2), false);
2266        hash_join.next_unwrap_ready_barrier()?;
2267
2268        // push the 2nd left chunk
2269        tx_l.push_chunk(chunk_l2);
2270        hash_join.next_unwrap_pending();
2271
2272        // push the 1st right chunk
2273        tx_r.push_chunk(chunk_r1);
2274        let chunk = hash_join.next_unwrap_ready_chunk()?;
2275        assert_eq!(
2276            chunk,
2277            StreamChunk::from_pretty(
2278                " I I I
2279                + 2 5 2
2280                + 4 9 4"
2281            )
2282        );
2283
2284        // push the 2nd right chunk
2285        tx_r.push_chunk(chunk_r2);
2286        let chunk = hash_join.next_unwrap_ready_chunk()?;
2287        assert_eq!(
2288            chunk,
2289            StreamChunk::from_pretty(
2290                " I I I
2291                + 1 4 1
2292                + 3 6 3"
2293            )
2294        );
2295
2296        Ok(())
2297    }
2298
2299    #[tokio::test]
2300    async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2301        let chunk_l1 = StreamChunk::from_pretty(
2302            "  I I I
2303             + 1 4 1
2304             + 2 5 2
2305             + 3 6 3",
2306        );
2307        let chunk_l2 = StreamChunk::from_pretty(
2308            "  I I I
2309             + 4 9 4
2310             + 5 10 5",
2311        );
2312        let chunk_r1 = StreamChunk::from_pretty(
2313            "  I I I
2314             + 2 5 1
2315             + 4 9 2
2316             + 6 9 3",
2317        );
2318        let chunk_r2 = StreamChunk::from_pretty(
2319            "  I I I
2320             + 1 4 4
2321             + 3 6 5",
2322        );
2323
2324        let (mut tx_l, mut tx_r, mut hash_join) =
2325            create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2326
2327        // push the init barrier for left and right
2328        tx_l.push_barrier(test_epoch(1), false);
2329        tx_r.push_barrier(test_epoch(1), false);
2330        hash_join.next_unwrap_ready_barrier()?;
2331
2332        // push the 1st left chunk
2333        tx_l.push_chunk(chunk_l1);
2334        hash_join.next_unwrap_pending();
2335
2336        // push the init barrier for left and right
2337        tx_l.push_barrier(test_epoch(2), false);
2338        tx_r.push_barrier(test_epoch(2), false);
2339        hash_join.next_unwrap_ready_barrier()?;
2340
2341        // push the 2nd left chunk
2342        tx_l.push_chunk(chunk_l2);
2343        hash_join.next_unwrap_pending();
2344
2345        // push the 1st right chunk
2346        tx_r.push_chunk(chunk_r1);
2347        let chunk = hash_join.next_unwrap_ready_chunk()?;
2348        assert_eq!(
2349            chunk,
2350            StreamChunk::from_pretty(
2351                " I I I
2352                + 2 5 1
2353                + 4 9 2"
2354            )
2355        );
2356
2357        // push the 2nd right chunk
2358        tx_r.push_chunk(chunk_r2);
2359        let chunk = hash_join.next_unwrap_ready_chunk()?;
2360        assert_eq!(
2361            chunk,
2362            StreamChunk::from_pretty(
2363                " I I I
2364                + 1 4 4
2365                + 3 6 5"
2366            )
2367        );
2368
2369        Ok(())
2370    }
2371
2372    #[tokio::test]
2373    async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2374        let chunk_r1 = StreamChunk::from_pretty(
2375            "  I I
2376             + 1 4
2377             + 2 5
2378             + 3 6",
2379        );
2380        let chunk_r2 = StreamChunk::from_pretty(
2381            "  I I
2382             + 3 8
2383             - 3 8",
2384        );
2385        let chunk_l1 = StreamChunk::from_pretty(
2386            "  I I
2387             + 2 7
2388             + 4 8
2389             + 6 9",
2390        );
2391        let chunk_l2 = StreamChunk::from_pretty(
2392            "  I  I
2393             + 3 10
2394             + 6 11",
2395        );
2396        let chunk_r3 = StreamChunk::from_pretty(
2397            "  I I
2398             + 6 10",
2399        );
2400        let chunk_l3 = StreamChunk::from_pretty(
2401            "  I  I
2402             - 6 11",
2403        );
2404        let chunk_l4 = StreamChunk::from_pretty(
2405            "  I  I
2406             - 6 9",
2407        );
2408        let (mut tx_l, mut tx_r, mut hash_join) =
2409            create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2410
2411        // push the init barrier for left and right
2412        tx_l.push_barrier(test_epoch(1), false);
2413        tx_r.push_barrier(test_epoch(1), false);
2414        hash_join.next_unwrap_ready_barrier()?;
2415
2416        // push the 1st right chunk
2417        tx_r.push_chunk(chunk_r1);
2418        hash_join.next_unwrap_pending();
2419
2420        // push the init barrier for left and right
2421        tx_l.push_barrier(test_epoch(2), false);
2422        tx_r.push_barrier(test_epoch(2), false);
2423        hash_join.next_unwrap_ready_barrier()?;
2424
2425        // push the 2nd right chunk
2426        tx_r.push_chunk(chunk_r2);
2427        hash_join.next_unwrap_pending();
2428
2429        // push the 1st left chunk
2430        tx_l.push_chunk(chunk_l1);
2431        let chunk = hash_join.next_unwrap_ready_chunk()?;
2432        assert_eq!(
2433            chunk,
2434            StreamChunk::from_pretty(
2435                " I I
2436                + 2 5"
2437            )
2438        );
2439
2440        // push the 2nd left chunk
2441        tx_l.push_chunk(chunk_l2);
2442        let chunk = hash_join.next_unwrap_ready_chunk()?;
2443        assert_eq!(
2444            chunk,
2445            StreamChunk::from_pretty(
2446                " I I
2447                + 3 6"
2448            )
2449        );
2450
2451        // push the 3rd right chunk (tests forward_exactly_once)
2452        tx_r.push_chunk(chunk_r3);
2453        let chunk = hash_join.next_unwrap_ready_chunk()?;
2454        assert_eq!(
2455            chunk,
2456            StreamChunk::from_pretty(
2457                " I I
2458                + 6 10"
2459            )
2460        );
2461
2462        // push the 3rd left chunk
2463        // (tests that no change if there are still matches)
2464        tx_l.push_chunk(chunk_l3);
2465        hash_join.next_unwrap_pending();
2466
2467        // push the 3rd right chunk
2468        // (tests that deletion occurs when there are no more matches)
2469        tx_l.push_chunk(chunk_l4);
2470        let chunk = hash_join.next_unwrap_ready_chunk()?;
2471        assert_eq!(
2472            chunk,
2473            StreamChunk::from_pretty(
2474                " I I
2475                - 6 10"
2476            )
2477        );
2478
2479        Ok(())
2480    }
2481
2482    #[tokio::test]
2483    async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2484        let chunk_l1 = StreamChunk::from_pretty(
2485            "  I I
2486             + 1 4
2487             + 2 5
2488             + 3 6",
2489        );
2490        let chunk_l2 = StreamChunk::from_pretty(
2491            "  I I
2492             + 3 8
2493             - 3 8",
2494        );
2495        let chunk_r1 = StreamChunk::from_pretty(
2496            "  I I
2497             + 2 7
2498             + 4 8
2499             + 6 9",
2500        );
2501        let chunk_r2 = StreamChunk::from_pretty(
2502            "  I  I
2503             + 3 10
2504             + 6 11
2505             + 1 2
2506             + 1 3",
2507        );
2508        let chunk_l3 = StreamChunk::from_pretty(
2509            "  I I
2510             + 9 10",
2511        );
2512        let chunk_r3 = StreamChunk::from_pretty(
2513            "  I I
2514             - 1 2",
2515        );
2516        let chunk_r4 = StreamChunk::from_pretty(
2517            "  I I
2518             - 1 3",
2519        );
2520        let (mut tx_l, mut tx_r, mut hash_join) =
2521            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2522
2523        // push the init barrier for left and right
2524        tx_l.push_barrier(test_epoch(1), false);
2525        tx_r.push_barrier(test_epoch(1), false);
2526        hash_join.next_unwrap_ready_barrier()?;
2527
2528        // push the 1st left chunk
2529        tx_l.push_chunk(chunk_l1);
2530        let chunk = hash_join.next_unwrap_ready_chunk()?;
2531        assert_eq!(
2532            chunk,
2533            StreamChunk::from_pretty(
2534                " I I
2535                + 1 4
2536                + 2 5
2537                + 3 6",
2538            )
2539        );
2540
2541        // push the init barrier for left and right
2542        tx_l.push_barrier(test_epoch(2), false);
2543        tx_r.push_barrier(test_epoch(2), false);
2544        hash_join.next_unwrap_ready_barrier()?;
2545
2546        // push the 2nd left chunk
2547        tx_l.push_chunk(chunk_l2);
2548        let chunk = hash_join.next_unwrap_ready_chunk()?;
2549        assert_eq!(
2550            chunk,
2551            StreamChunk::from_pretty(
2552                "  I I
2553                 + 3 8 D
2554                 - 3 8 D",
2555            )
2556        );
2557
2558        // push the 1st right chunk
2559        tx_r.push_chunk(chunk_r1);
2560        let chunk = hash_join.next_unwrap_ready_chunk()?;
2561        assert_eq!(
2562            chunk,
2563            StreamChunk::from_pretty(
2564                " I I
2565                - 2 5"
2566            )
2567        );
2568
2569        // push the 2nd right chunk
2570        tx_r.push_chunk(chunk_r2);
2571        let chunk = hash_join.next_unwrap_ready_chunk()?;
2572        assert_eq!(
2573            chunk,
2574            StreamChunk::from_pretty(
2575                " I I
2576                - 3 6
2577                - 1 4"
2578            )
2579        );
2580
2581        // push the 3rd left chunk (tests forward_exactly_once)
2582        tx_l.push_chunk(chunk_l3);
2583        let chunk = hash_join.next_unwrap_ready_chunk()?;
2584        assert_eq!(
2585            chunk,
2586            StreamChunk::from_pretty(
2587                " I I
2588                + 9 10"
2589            )
2590        );
2591
2592        // push the 3rd right chunk
2593        // (tests that no change if there are still matches)
2594        tx_r.push_chunk(chunk_r3);
2595        hash_join.next_unwrap_pending();
2596
2597        // push the 4th right chunk
2598        // (tests that insertion occurs when there are no more matches)
2599        tx_r.push_chunk(chunk_r4);
2600        let chunk = hash_join.next_unwrap_ready_chunk()?;
2601        assert_eq!(
2602            chunk,
2603            StreamChunk::from_pretty(
2604                " I I
2605                + 1 4"
2606            )
2607        );
2608
2609        Ok(())
2610    }
2611
2612    #[tokio::test]
2613    async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2614        let chunk_r1 = StreamChunk::from_pretty(
2615            "  I I
2616             + 1 4
2617             + 2 5
2618             + 3 6",
2619        );
2620        let chunk_r2 = StreamChunk::from_pretty(
2621            "  I I
2622             + 3 8
2623             - 3 8",
2624        );
2625        let chunk_l1 = StreamChunk::from_pretty(
2626            "  I I
2627             + 2 7
2628             + 4 8
2629             + 6 9",
2630        );
2631        let chunk_l2 = StreamChunk::from_pretty(
2632            "  I  I
2633             + 3 10
2634             + 6 11
2635             + 1 2
2636             + 1 3",
2637        );
2638        let chunk_r3 = StreamChunk::from_pretty(
2639            "  I I
2640             + 9 10",
2641        );
2642        let chunk_l3 = StreamChunk::from_pretty(
2643            "  I I
2644             - 1 2",
2645        );
2646        let chunk_l4 = StreamChunk::from_pretty(
2647            "  I I
2648             - 1 3",
2649        );
2650        let (mut tx_r, mut tx_l, mut hash_join) =
2651            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2652
2653        // push the init barrier for left and right
2654        tx_r.push_barrier(test_epoch(1), false);
2655        tx_l.push_barrier(test_epoch(1), false);
2656        hash_join.next_unwrap_ready_barrier()?;
2657
2658        // push the 1st right chunk
2659        tx_r.push_chunk(chunk_r1);
2660        let chunk = hash_join.next_unwrap_ready_chunk()?;
2661        assert_eq!(
2662            chunk,
2663            StreamChunk::from_pretty(
2664                " I I
2665                + 1 4
2666                + 2 5
2667                + 3 6",
2668            )
2669        );
2670
2671        // push the init barrier for left and right
2672        tx_r.push_barrier(test_epoch(2), false);
2673        tx_l.push_barrier(test_epoch(2), false);
2674        hash_join.next_unwrap_ready_barrier()?;
2675
2676        // push the 2nd right chunk
2677        tx_r.push_chunk(chunk_r2);
2678        let chunk = hash_join.next_unwrap_ready_chunk()?;
2679        assert_eq!(
2680            chunk,
2681            StreamChunk::from_pretty(
2682                "  I I
2683                 + 3 8 D
2684                 - 3 8 D",
2685            )
2686        );
2687
2688        // push the 1st left chunk
2689        tx_l.push_chunk(chunk_l1);
2690        let chunk = hash_join.next_unwrap_ready_chunk()?;
2691        assert_eq!(
2692            chunk,
2693            StreamChunk::from_pretty(
2694                " I I
2695                - 2 5"
2696            )
2697        );
2698
2699        // push the 2nd left chunk
2700        tx_l.push_chunk(chunk_l2);
2701        let chunk = hash_join.next_unwrap_ready_chunk()?;
2702        assert_eq!(
2703            chunk,
2704            StreamChunk::from_pretty(
2705                " I I
2706                - 3 6
2707                - 1 4"
2708            )
2709        );
2710
2711        // push the 3rd right chunk (tests forward_exactly_once)
2712        tx_r.push_chunk(chunk_r3);
2713        let chunk = hash_join.next_unwrap_ready_chunk()?;
2714        assert_eq!(
2715            chunk,
2716            StreamChunk::from_pretty(
2717                " I I
2718                + 9 10"
2719            )
2720        );
2721
2722        // push the 3rd left chunk
2723        // (tests that no change if there are still matches)
2724        tx_l.push_chunk(chunk_l3);
2725        hash_join.next_unwrap_pending();
2726
2727        // push the 4th left chunk
2728        // (tests that insertion occurs when there are no more matches)
2729        tx_l.push_chunk(chunk_l4);
2730        let chunk = hash_join.next_unwrap_ready_chunk()?;
2731        assert_eq!(
2732            chunk,
2733            StreamChunk::from_pretty(
2734                " I I
2735                + 1 4"
2736            )
2737        );
2738
2739        Ok(())
2740    }
2741
2742    #[tokio::test]
2743    async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2744        let chunk_l1 = StreamChunk::from_pretty(
2745            "  I I
2746             + 1 4
2747             + 2 5
2748             + 3 6",
2749        );
2750        let chunk_l2 = StreamChunk::from_pretty(
2751            "  I I
2752             + 6 8
2753             + 3 8",
2754        );
2755        let chunk_r1 = StreamChunk::from_pretty(
2756            "  I I
2757             + 2 7
2758             + 4 8
2759             + 6 9",
2760        );
2761        let chunk_r2 = StreamChunk::from_pretty(
2762            "  I  I
2763             + 3 10
2764             + 6 11",
2765        );
2766        let (mut tx_l, mut tx_r, mut hash_join) =
2767            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2768
2769        // push the init barrier for left and right
2770        tx_l.push_barrier(test_epoch(1), false);
2771        tx_r.push_barrier(test_epoch(1), false);
2772        hash_join.next_unwrap_ready_barrier()?;
2773
2774        // push the 1st left chunk
2775        tx_l.push_chunk(chunk_l1);
2776        hash_join.next_unwrap_pending();
2777
2778        // push a barrier to left side
2779        tx_l.push_barrier(test_epoch(2), false);
2780
2781        // push the 2nd left chunk
2782        tx_l.push_chunk(chunk_l2);
2783
2784        // join the first right chunk
2785        tx_r.push_chunk(chunk_r1);
2786
2787        // Consume stream chunk
2788        let chunk = hash_join.next_unwrap_ready_chunk()?;
2789        assert_eq!(
2790            chunk,
2791            StreamChunk::from_pretty(
2792                " I I I I
2793                + 2 5 2 7"
2794            )
2795        );
2796
2797        // push a barrier to right side
2798        tx_r.push_barrier(test_epoch(2), false);
2799
2800        // get the aligned barrier here
2801        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2802        assert!(matches!(
2803            hash_join.next_unwrap_ready_barrier()?,
2804            Barrier {
2805                epoch,
2806                mutation: None,
2807                ..
2808            } if epoch == expected_epoch
2809        ));
2810
2811        // join the 2nd left chunk
2812        let chunk = hash_join.next_unwrap_ready_chunk()?;
2813        assert_eq!(
2814            chunk,
2815            StreamChunk::from_pretty(
2816                " I I I I
2817                + 6 8 6 9"
2818            )
2819        );
2820
2821        // push the 2nd right chunk
2822        tx_r.push_chunk(chunk_r2);
2823        let chunk = hash_join.next_unwrap_ready_chunk()?;
2824        assert_eq!(
2825            chunk,
2826            StreamChunk::from_pretty(
2827                " I I I I
2828                + 3 6 3 10
2829                + 3 8 3 10
2830                + 6 8 6 11"
2831            )
2832        );
2833
2834        Ok(())
2835    }
2836
2837    #[tokio::test]
2838    async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2839        let chunk_l1 = StreamChunk::from_pretty(
2840            "  I I
2841             + 1 4
2842             + 2 .
2843             + 3 .",
2844        );
2845        let chunk_l2 = StreamChunk::from_pretty(
2846            "  I I
2847             + 6 .
2848             + 3 8",
2849        );
2850        let chunk_r1 = StreamChunk::from_pretty(
2851            "  I I
2852             + 2 7
2853             + 4 8
2854             + 6 9",
2855        );
2856        let chunk_r2 = StreamChunk::from_pretty(
2857            "  I  I
2858             + 3 10
2859             + 6 11",
2860        );
2861        let (mut tx_l, mut tx_r, mut hash_join) =
2862            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2863
2864        // push the init barrier for left and right
2865        tx_l.push_barrier(test_epoch(1), false);
2866        tx_r.push_barrier(test_epoch(1), false);
2867        hash_join.next_unwrap_ready_barrier()?;
2868
2869        // push the 1st left chunk
2870        tx_l.push_chunk(chunk_l1);
2871        hash_join.next_unwrap_pending();
2872
2873        // push a barrier to left side
2874        tx_l.push_barrier(test_epoch(2), false);
2875
2876        // push the 2nd left chunk
2877        tx_l.push_chunk(chunk_l2);
2878
2879        // join the first right chunk
2880        tx_r.push_chunk(chunk_r1);
2881
2882        // Consume stream chunk
2883        let chunk = hash_join.next_unwrap_ready_chunk()?;
2884        assert_eq!(
2885            chunk,
2886            StreamChunk::from_pretty(
2887                " I I I I
2888                + 2 . 2 7"
2889            )
2890        );
2891
2892        // push a barrier to right side
2893        tx_r.push_barrier(test_epoch(2), false);
2894
2895        // get the aligned barrier here
2896        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2897        assert!(matches!(
2898            hash_join.next_unwrap_ready_barrier()?,
2899            Barrier {
2900                epoch,
2901                mutation: None,
2902                ..
2903            } if epoch == expected_epoch
2904        ));
2905
2906        // join the 2nd left chunk
2907        let chunk = hash_join.next_unwrap_ready_chunk()?;
2908        assert_eq!(
2909            chunk,
2910            StreamChunk::from_pretty(
2911                " I I I I
2912                + 6 . 6 9"
2913            )
2914        );
2915
2916        // push the 2nd right chunk
2917        tx_r.push_chunk(chunk_r2);
2918        let chunk = hash_join.next_unwrap_ready_chunk()?;
2919        assert_eq!(
2920            chunk,
2921            StreamChunk::from_pretty(
2922                " I I I I
2923                + 3 8 3 10
2924                + 3 . 3 10
2925                + 6 . 6 11"
2926            )
2927        );
2928
2929        Ok(())
2930    }
2931
2932    #[tokio::test]
2933    async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2934        let chunk_l1 = StreamChunk::from_pretty(
2935            "  I I
2936             + 1 4
2937             + 2 5
2938             + 3 6",
2939        );
2940        let chunk_l2 = StreamChunk::from_pretty(
2941            "  I I
2942             + 3 8
2943             - 3 8",
2944        );
2945        let chunk_r1 = StreamChunk::from_pretty(
2946            "  I I
2947             + 2 7
2948             + 4 8
2949             + 6 9",
2950        );
2951        let chunk_r2 = StreamChunk::from_pretty(
2952            "  I  I
2953             + 3 10
2954             + 6 11",
2955        );
2956        let (mut tx_l, mut tx_r, mut hash_join) =
2957            create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2958
2959        // push the init barrier for left and right
2960        tx_l.push_barrier(test_epoch(1), false);
2961        tx_r.push_barrier(test_epoch(1), false);
2962        hash_join.next_unwrap_ready_barrier()?;
2963
2964        // push the 1st left chunk
2965        tx_l.push_chunk(chunk_l1);
2966        let chunk = hash_join.next_unwrap_ready_chunk()?;
2967        assert_eq!(
2968            chunk,
2969            StreamChunk::from_pretty(
2970                " I I I I
2971                + 1 4 . .
2972                + 2 5 . .
2973                + 3 6 . ."
2974            )
2975        );
2976
2977        // push the 2nd left chunk
2978        tx_l.push_chunk(chunk_l2);
2979        let chunk = hash_join.next_unwrap_ready_chunk()?;
2980        assert_eq!(
2981            chunk,
2982            StreamChunk::from_pretty(
2983                " I I I I
2984                + 3 8 . . D
2985                - 3 8 . . D"
2986            )
2987        );
2988
2989        // push the 1st right chunk
2990        tx_r.push_chunk(chunk_r1);
2991        let chunk = hash_join.next_unwrap_ready_chunk()?;
2992        assert_eq!(
2993            chunk,
2994            StreamChunk::from_pretty(
2995                " I I I I
2996                - 2 5 . .
2997                + 2 5 2 7"
2998            )
2999        );
3000
3001        // push the 2nd right chunk
3002        tx_r.push_chunk(chunk_r2);
3003        let chunk = hash_join.next_unwrap_ready_chunk()?;
3004        assert_eq!(
3005            chunk,
3006            StreamChunk::from_pretty(
3007                " I I I I
3008                - 3 6 . .
3009                + 3 6 3 10"
3010            )
3011        );
3012
3013        Ok(())
3014    }
3015
3016    #[tokio::test]
3017    async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
3018        let chunk_l1 = StreamChunk::from_pretty(
3019            "  I I
3020             + 1 4
3021             + 2 5
3022             + . 6",
3023        );
3024        let chunk_l2 = StreamChunk::from_pretty(
3025            "  I I
3026             + . 8
3027             - . 8",
3028        );
3029        let chunk_r1 = StreamChunk::from_pretty(
3030            "  I I
3031             + 2 7
3032             + 4 8
3033             + 6 9",
3034        );
3035        let chunk_r2 = StreamChunk::from_pretty(
3036            "  I  I
3037             + . 10
3038             + 6 11",
3039        );
3040        let (mut tx_l, mut tx_r, mut hash_join) =
3041            create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
3042
3043        // push the init barrier for left and right
3044        tx_l.push_barrier(test_epoch(1), false);
3045        tx_r.push_barrier(test_epoch(1), false);
3046        hash_join.next_unwrap_ready_barrier()?;
3047
3048        // push the 1st left chunk
3049        tx_l.push_chunk(chunk_l1);
3050        let chunk = hash_join.next_unwrap_ready_chunk()?;
3051        assert_eq!(
3052            chunk,
3053            StreamChunk::from_pretty(
3054                " I I I I
3055                + 1 4 . .
3056                + 2 5 . .
3057                + . 6 . ."
3058            )
3059        );
3060
3061        // push the 2nd left chunk
3062        tx_l.push_chunk(chunk_l2);
3063        let chunk = hash_join.next_unwrap_ready_chunk()?;
3064        assert_eq!(
3065            chunk,
3066            StreamChunk::from_pretty(
3067                " I I I I
3068                + . 8 . . D
3069                - . 8 . . D"
3070            )
3071        );
3072
3073        // push the 1st right chunk
3074        tx_r.push_chunk(chunk_r1);
3075        let chunk = hash_join.next_unwrap_ready_chunk()?;
3076        assert_eq!(
3077            chunk,
3078            StreamChunk::from_pretty(
3079                " I I I I
3080                - 2 5 . .
3081                + 2 5 2 7"
3082            )
3083        );
3084
3085        // push the 2nd right chunk
3086        tx_r.push_chunk(chunk_r2);
3087        let chunk = hash_join.next_unwrap_ready_chunk()?;
3088        assert_eq!(
3089            chunk,
3090            StreamChunk::from_pretty(
3091                " I I I I
3092                - . 6 . .
3093                + . 6 . 10"
3094            )
3095        );
3096
3097        Ok(())
3098    }
3099
3100    #[tokio::test]
3101    async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
3102        let chunk_l1 = StreamChunk::from_pretty(
3103            "  I I
3104             + 1 4
3105             + 2 5
3106             + 3 6",
3107        );
3108        let chunk_l2 = StreamChunk::from_pretty(
3109            "  I I
3110             + 3 8
3111             - 3 8",
3112        );
3113        let chunk_r1 = StreamChunk::from_pretty(
3114            "  I I
3115             + 2 7
3116             + 4 8
3117             + 6 9",
3118        );
3119        let chunk_r2 = StreamChunk::from_pretty(
3120            "  I  I
3121             + 5 10
3122             - 5 10",
3123        );
3124        let (mut tx_l, mut tx_r, mut hash_join) =
3125            create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
3126
3127        // push the init barrier for left and right
3128        tx_l.push_barrier(test_epoch(1), false);
3129        tx_r.push_barrier(test_epoch(1), false);
3130        hash_join.next_unwrap_ready_barrier()?;
3131
3132        // push the 1st left chunk
3133        tx_l.push_chunk(chunk_l1);
3134        hash_join.next_unwrap_pending();
3135
3136        // push the 2nd left chunk
3137        tx_l.push_chunk(chunk_l2);
3138        hash_join.next_unwrap_pending();
3139
3140        // push the 1st right chunk
3141        tx_r.push_chunk(chunk_r1);
3142        let chunk = hash_join.next_unwrap_ready_chunk()?;
3143        assert_eq!(
3144            chunk,
3145            StreamChunk::from_pretty(
3146                " I I I I
3147                + 2 5 2 7
3148                + . . 4 8
3149                + . . 6 9"
3150            )
3151        );
3152
3153        // push the 2nd right chunk
3154        tx_r.push_chunk(chunk_r2);
3155        let chunk = hash_join.next_unwrap_ready_chunk()?;
3156        assert_eq!(
3157            chunk,
3158            StreamChunk::from_pretty(
3159                " I I I I
3160                + . . 5 10 D
3161                - . . 5 10 D"
3162            )
3163        );
3164
3165        Ok(())
3166    }
3167
3168    #[tokio::test]
3169    async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3170        let chunk_l1 = StreamChunk::from_pretty(
3171            "  I I I
3172             + 1 4 1
3173             + 2 5 2
3174             + 3 6 3",
3175        );
3176        let chunk_l2 = StreamChunk::from_pretty(
3177            "  I I I
3178             + 4 9 4
3179             + 5 10 5",
3180        );
3181        let chunk_r1 = StreamChunk::from_pretty(
3182            "  I I I
3183             + 2 5 1
3184             + 4 9 2
3185             + 6 9 3",
3186        );
3187        let chunk_r2 = StreamChunk::from_pretty(
3188            "  I I I
3189             + 1 4 4
3190             + 3 6 5",
3191        );
3192
3193        let (mut tx_l, mut tx_r, mut hash_join) =
3194            create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3195
3196        // push the init barrier for left and right
3197        tx_l.push_barrier(test_epoch(1), false);
3198        tx_r.push_barrier(test_epoch(1), false);
3199        hash_join.next_unwrap_ready_barrier()?;
3200
3201        // push the 1st left chunk
3202        tx_l.push_chunk(chunk_l1);
3203        let chunk = hash_join.next_unwrap_ready_chunk()?;
3204        assert_eq!(
3205            chunk,
3206            StreamChunk::from_pretty(
3207                " I I I I I I
3208                + 1 4 1 . . .
3209                + 2 5 2 . . .
3210                + 3 6 3 . . ."
3211            )
3212        );
3213
3214        // push the 2nd left chunk
3215        tx_l.push_chunk(chunk_l2);
3216        let chunk = hash_join.next_unwrap_ready_chunk()?;
3217        assert_eq!(
3218            chunk,
3219            StreamChunk::from_pretty(
3220                " I I I I I I
3221                + 4 9 4 . . .
3222                + 5 10 5 . . ."
3223            )
3224        );
3225
3226        // push the 1st right chunk
3227        tx_r.push_chunk(chunk_r1);
3228        let chunk = hash_join.next_unwrap_ready_chunk()?;
3229        assert_eq!(
3230            chunk,
3231            StreamChunk::from_pretty(
3232                " I I I I I I
3233                - 2 5 2 . . .
3234                + 2 5 2 2 5 1
3235                - 4 9 4 . . .
3236                + 4 9 4 4 9 2"
3237            )
3238        );
3239
3240        // push the 2nd right chunk
3241        tx_r.push_chunk(chunk_r2);
3242        let chunk = hash_join.next_unwrap_ready_chunk()?;
3243        assert_eq!(
3244            chunk,
3245            StreamChunk::from_pretty(
3246                " I I I I I I
3247                - 1 4 1 . . .
3248                + 1 4 1 1 4 4
3249                - 3 6 3 . . .
3250                + 3 6 3 3 6 5"
3251            )
3252        );
3253
3254        Ok(())
3255    }
3256
3257    #[tokio::test]
3258    async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3259        let chunk_l1 = StreamChunk::from_pretty(
3260            "  I I I
3261             + 1 4 1
3262             + 2 5 2
3263             + 3 6 3",
3264        );
3265        let chunk_l2 = StreamChunk::from_pretty(
3266            "  I I I
3267             + 4 9 4
3268             + 5 10 5",
3269        );
3270        let chunk_r1 = StreamChunk::from_pretty(
3271            "  I I I
3272             + 2 5 1
3273             + 4 9 2
3274             + 6 9 3",
3275        );
3276        let chunk_r2 = StreamChunk::from_pretty(
3277            "  I I I
3278             + 1 4 4
3279             + 3 6 5
3280             + 7 7 6",
3281        );
3282
3283        let (mut tx_l, mut tx_r, mut hash_join) =
3284            create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3285
3286        // push the init barrier for left and right
3287        tx_l.push_barrier(test_epoch(1), false);
3288        tx_r.push_barrier(test_epoch(1), false);
3289        hash_join.next_unwrap_ready_barrier()?;
3290
3291        // push the 1st left chunk
3292        tx_l.push_chunk(chunk_l1);
3293        hash_join.next_unwrap_pending();
3294
3295        // push the 2nd left chunk
3296        tx_l.push_chunk(chunk_l2);
3297        hash_join.next_unwrap_pending();
3298
3299        // push the 1st right chunk
3300        tx_r.push_chunk(chunk_r1);
3301        let chunk = hash_join.next_unwrap_ready_chunk()?;
3302        assert_eq!(
3303            chunk,
3304            StreamChunk::from_pretty(
3305                "  I I I I I I
3306                + 2 5 2 2 5 1
3307                + 4 9 4 4 9 2
3308                + . . . 6 9 3"
3309            )
3310        );
3311
3312        // push the 2nd right chunk
3313        tx_r.push_chunk(chunk_r2);
3314        let chunk = hash_join.next_unwrap_ready_chunk()?;
3315        assert_eq!(
3316            chunk,
3317            StreamChunk::from_pretty(
3318                "  I I I I I I
3319                + 1 4 1 1 4 4
3320                + 3 6 3 3 6 5
3321                + . . . 7 7 6"
3322            )
3323        );
3324
3325        Ok(())
3326    }
3327
3328    #[tokio::test]
3329    async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3330        let chunk_l1 = StreamChunk::from_pretty(
3331            "  I I
3332             + 1 4
3333             + 2 5
3334             + 3 6",
3335        );
3336        let chunk_l2 = StreamChunk::from_pretty(
3337            "  I I
3338             + 3 8
3339             - 3 8",
3340        );
3341        let chunk_r1 = StreamChunk::from_pretty(
3342            "  I I
3343             + 2 7
3344             + 4 8
3345             + 6 9",
3346        );
3347        let chunk_r2 = StreamChunk::from_pretty(
3348            "  I  I
3349             + 5 10
3350             - 5 10",
3351        );
3352        let (mut tx_l, mut tx_r, mut hash_join) =
3353            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3354
3355        // push the init barrier for left and right
3356        tx_l.push_barrier(test_epoch(1), false);
3357        tx_r.push_barrier(test_epoch(1), false);
3358        hash_join.next_unwrap_ready_barrier()?;
3359
3360        // push the 1st left chunk
3361        tx_l.push_chunk(chunk_l1);
3362        let chunk = hash_join.next_unwrap_ready_chunk()?;
3363        assert_eq!(
3364            chunk,
3365            StreamChunk::from_pretty(
3366                " I I I I
3367                + 1 4 . .
3368                + 2 5 . .
3369                + 3 6 . ."
3370            )
3371        );
3372
3373        // push the 2nd left chunk
3374        tx_l.push_chunk(chunk_l2);
3375        let chunk = hash_join.next_unwrap_ready_chunk()?;
3376        assert_eq!(
3377            chunk,
3378            StreamChunk::from_pretty(
3379                " I I I I
3380                + 3 8 . . D
3381                - 3 8 . . D"
3382            )
3383        );
3384
3385        // push the 1st right chunk
3386        tx_r.push_chunk(chunk_r1);
3387        let chunk = hash_join.next_unwrap_ready_chunk()?;
3388        assert_eq!(
3389            chunk,
3390            StreamChunk::from_pretty(
3391                " I I I I
3392                - 2 5 . .
3393                + 2 5 2 7
3394                + . . 4 8
3395                + . . 6 9"
3396            )
3397        );
3398
3399        // push the 2nd right chunk
3400        tx_r.push_chunk(chunk_r2);
3401        let chunk = hash_join.next_unwrap_ready_chunk()?;
3402        assert_eq!(
3403            chunk,
3404            StreamChunk::from_pretty(
3405                " I I I I
3406                + . . 5 10 D
3407                - . . 5 10 D"
3408            )
3409        );
3410
3411        Ok(())
3412    }
3413
3414    #[tokio::test]
3415    async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3416        let (mut tx_l, mut tx_r, mut hash_join) =
3417            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3418
3419        // push the init barrier for left and right
3420        tx_l.push_barrier(test_epoch(1), false);
3421        tx_r.push_barrier(test_epoch(1), false);
3422        hash_join.next_unwrap_ready_barrier()?;
3423
3424        tx_l.push_chunk(StreamChunk::from_pretty(
3425            "  I I
3426             + 1 1
3427            ",
3428        ));
3429        let chunk = hash_join.next_unwrap_ready_chunk()?;
3430        assert_eq!(
3431            chunk,
3432            StreamChunk::from_pretty(
3433                " I I I I
3434                + 1 1 . ."
3435            )
3436        );
3437
3438        tx_r.push_chunk(StreamChunk::from_pretty(
3439            "  I I
3440             + 1 1
3441            ",
3442        ));
3443        let chunk = hash_join.next_unwrap_ready_chunk()?;
3444
3445        assert_eq!(
3446            chunk,
3447            StreamChunk::from_pretty(
3448                " I I I I
3449                - 1 1 . .
3450                + 1 1 1 1"
3451            )
3452        );
3453
3454        tx_l.push_chunk(StreamChunk::from_pretty(
3455            "   I I
3456              - 1 1
3457              + 1 2
3458            ",
3459        ));
3460        let chunk = hash_join.next_unwrap_ready_chunk()?;
3461        let chunk = chunk.compact_vis();
3462        assert_eq!(
3463            chunk,
3464            StreamChunk::from_pretty(
3465                " I I I I
3466                - 1 1 1 1
3467                + 1 2 1 1
3468                "
3469            )
3470        );
3471
3472        Ok(())
3473    }
3474
3475    #[tokio::test]
3476    async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3477    {
3478        let chunk_l1 = StreamChunk::from_pretty(
3479            "  I I
3480             + 1 4
3481             + 2 5
3482             + 3 6
3483             + 3 7",
3484        );
3485        let chunk_l2 = StreamChunk::from_pretty(
3486            "  I I
3487             + 3 8
3488             - 3 8
3489             - 1 4", // delete row to cause an empty JoinHashEntry
3490        );
3491        let chunk_r1 = StreamChunk::from_pretty(
3492            "  I I
3493             + 2 6
3494             + 4 8
3495             + 3 4",
3496        );
3497        let chunk_r2 = StreamChunk::from_pretty(
3498            "  I  I
3499             + 5 10
3500             - 5 10
3501             + 1 2",
3502        );
3503        let (mut tx_l, mut tx_r, mut hash_join) =
3504            create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3505
3506        // push the init barrier for left and right
3507        tx_l.push_barrier(test_epoch(1), false);
3508        tx_r.push_barrier(test_epoch(1), false);
3509        hash_join.next_unwrap_ready_barrier()?;
3510
3511        // push the 1st left chunk
3512        tx_l.push_chunk(chunk_l1);
3513        let chunk = hash_join.next_unwrap_ready_chunk()?;
3514        assert_eq!(
3515            chunk,
3516            StreamChunk::from_pretty(
3517                " I I I I
3518                + 1 4 . .
3519                + 2 5 . .
3520                + 3 6 . .
3521                + 3 7 . ."
3522            )
3523        );
3524
3525        // push the 2nd left chunk
3526        tx_l.push_chunk(chunk_l2);
3527        let chunk = hash_join.next_unwrap_ready_chunk()?;
3528        assert_eq!(
3529            chunk,
3530            StreamChunk::from_pretty(
3531                " I I I I
3532                + 3 8 . . D
3533                - 3 8 . . D
3534                - 1 4 . ."
3535            )
3536        );
3537
3538        // push the 1st right chunk
3539        tx_r.push_chunk(chunk_r1);
3540        let chunk = hash_join.next_unwrap_ready_chunk()?;
3541        assert_eq!(
3542            chunk,
3543            StreamChunk::from_pretty(
3544                " I I I I
3545                - 2 5 . .
3546                + 2 5 2 6
3547                + . . 4 8
3548                + . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once
3549                            * despite matching on eq join on 2
3550                            * entries */
3551            )
3552        );
3553
3554        // push the 2nd right chunk
3555        tx_r.push_chunk(chunk_r2);
3556        let chunk = hash_join.next_unwrap_ready_chunk()?;
3557        assert_eq!(
3558            chunk,
3559            StreamChunk::from_pretty(
3560                " I I I I
3561                + . . 5 10 D
3562                - . . 5 10 D
3563                + . . 1 2" /* regression test (#2420): 1 2 forwarded even if matches on an empty
3564                            * join entry */
3565            )
3566        );
3567
3568        Ok(())
3569    }
3570
3571    #[tokio::test]
3572    async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3573        let chunk_l1 = StreamChunk::from_pretty(
3574            "  I I
3575             + 1 4
3576             + 2 10
3577             + 3 6",
3578        );
3579        let chunk_l2 = StreamChunk::from_pretty(
3580            "  I I
3581             + 3 8
3582             - 3 8",
3583        );
3584        let chunk_r1 = StreamChunk::from_pretty(
3585            "  I I
3586             + 2 7
3587             + 4 8
3588             + 6 9",
3589        );
3590        let chunk_r2 = StreamChunk::from_pretty(
3591            "  I  I
3592             + 3 10
3593             + 6 11",
3594        );
3595        let (mut tx_l, mut tx_r, mut hash_join) =
3596            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3597
3598        // push the init barrier for left and right
3599        tx_l.push_barrier(test_epoch(1), false);
3600        tx_r.push_barrier(test_epoch(1), false);
3601        hash_join.next_unwrap_ready_barrier()?;
3602
3603        // push the 1st left chunk
3604        tx_l.push_chunk(chunk_l1);
3605        hash_join.next_unwrap_pending();
3606
3607        // push the 2nd left chunk
3608        tx_l.push_chunk(chunk_l2);
3609        hash_join.next_unwrap_pending();
3610
3611        // push the 1st right chunk
3612        tx_r.push_chunk(chunk_r1);
3613        hash_join.next_unwrap_pending();
3614
3615        // push the 2nd right chunk
3616        tx_r.push_chunk(chunk_r2);
3617        let chunk = hash_join.next_unwrap_ready_chunk()?;
3618        assert_eq!(
3619            chunk,
3620            StreamChunk::from_pretty(
3621                " I I I I
3622                + 3 6 3 10"
3623            )
3624        );
3625
3626        Ok(())
3627    }
3628
3629    #[tokio::test]
3630    async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3631        let (mut tx_l, mut tx_r, mut hash_join) =
3632            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3633
3634        // push the init barrier for left and right
3635        tx_l.push_barrier(test_epoch(1), false);
3636        tx_r.push_barrier(test_epoch(1), false);
3637        hash_join.next_unwrap_ready_barrier()?;
3638
3639        tx_l.push_int64_watermark(0, 100);
3640
3641        tx_l.push_int64_watermark(0, 200);
3642
3643        tx_l.push_barrier(test_epoch(2), false);
3644        tx_r.push_barrier(test_epoch(2), false);
3645        hash_join.next_unwrap_ready_barrier()?;
3646
3647        tx_r.push_int64_watermark(0, 50);
3648
3649        let w1 = hash_join.next().await.unwrap().unwrap();
3650        let w1 = w1.as_watermark().unwrap();
3651
3652        let w2 = hash_join.next().await.unwrap().unwrap();
3653        let w2 = w2.as_watermark().unwrap();
3654
3655        tx_r.push_int64_watermark(0, 100);
3656
3657        let w3 = hash_join.next().await.unwrap().unwrap();
3658        let w3 = w3.as_watermark().unwrap();
3659
3660        let w4 = hash_join.next().await.unwrap().unwrap();
3661        let w4 = w4.as_watermark().unwrap();
3662
3663        assert_eq!(
3664            w1,
3665            &Watermark {
3666                col_idx: 2,
3667                data_type: DataType::Int64,
3668                val: ScalarImpl::Int64(50)
3669            }
3670        );
3671
3672        assert_eq!(
3673            w2,
3674            &Watermark {
3675                col_idx: 0,
3676                data_type: DataType::Int64,
3677                val: ScalarImpl::Int64(50)
3678            }
3679        );
3680
3681        assert_eq!(
3682            w3,
3683            &Watermark {
3684                col_idx: 2,
3685                data_type: DataType::Int64,
3686                val: ScalarImpl::Int64(100)
3687            }
3688        );
3689
3690        assert_eq!(
3691            w4,
3692            &Watermark {
3693                col_idx: 0,
3694                data_type: DataType::Int64,
3695                val: ScalarImpl::Int64(100)
3696            }
3697        );
3698
3699        Ok(())
3700    }
3701}