risingwave_stream/executor/
hash_join.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{BTreeMap, HashSet};
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use anyhow::Context;
21use either::Either;
22use itertools::Itertools;
23use multimap::MultiMap;
24use risingwave_common::array::Op;
25use risingwave_common::hash::{HashKey, NullBitmap};
26use risingwave_common::row::RowExt;
27use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::iter_util::ZipEqDebug;
30use risingwave_expr::expr::NonStrictExpression;
31use risingwave_pb::stream_plan::InequalityType;
32use tokio::time::Instant;
33
34use self::builder::JoinChunkBuilder;
35use super::barrier_align::*;
36use super::join::hash_join::*;
37use super::join::row::{JoinEncoding, JoinRow};
38use super::join::*;
39use super::watermark::*;
40use crate::executor::CachedJoinRow;
41use crate::executor::join::builder::JoinStreamChunkBuilder;
42use crate::executor::join::hash_join::CacheResult;
43use crate::executor::prelude::*;
44
45fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
46    HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
47}
48
49/// Information about an inequality condition in the join.
50/// Represents: `left_col <op> right_col` where op is one of `<`, `<=`, `>`, `>=`.
51#[derive(Debug, Clone)]
52pub struct InequalityPairInfo {
53    /// Index of the left side column (from left input).
54    pub left_idx: usize,
55    /// Index of the right side column (from right input).
56    pub right_idx: usize,
57    /// Whether this condition is used to clean left state table.
58    pub clean_left_state: bool,
59    /// Whether this condition is used to clean right state table.
60    pub clean_right_state: bool,
61    /// Comparison operator.
62    pub op: InequalityType,
63}
64
65impl InequalityPairInfo {
66    /// Returns true if left side has larger values (`>` or `>=`).
67    pub fn left_side_is_larger(&self) -> bool {
68        matches!(
69            self.op,
70            InequalityType::GreaterThan | InequalityType::GreaterThanOrEqual
71        )
72    }
73}
74
75pub struct JoinParams {
76    /// Indices of the join keys
77    pub join_key_indices: Vec<usize>,
78    /// Indices of the input pk after dedup
79    pub deduped_pk_indices: Vec<usize>,
80}
81
82impl JoinParams {
83    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
84        Self {
85            join_key_indices,
86            deduped_pk_indices,
87        }
88    }
89}
90
91struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
92    /// Store all data from a one side stream
93    ht: JoinHashMap<K, S, E>,
94    /// Indices of the join key columns
95    join_key_indices: Vec<usize>,
96    /// The data type of all columns without degree.
97    all_data_types: Vec<DataType>,
98    /// The start position for the side in output new columns
99    start_pos: usize,
100    /// The mapping from input indices of a side to output columns.
101    i2o_mapping: Vec<(usize, usize)>,
102    i2o_mapping_indexed: MultiMap<usize, usize>,
103    /// The first field of the ith element indicates that when a watermark at the ith column of
104    /// this side comes, what band join conditions should be updated in order to possibly
105    /// generate a new watermark at that column or the corresponding column in the counterpart
106    /// join side.
107    ///
108    /// The second field indicates that whether the column is required less than the
109    /// the corresponding column in the counterpart join side in the band join condition.
110    input2inequality_index: Vec<Vec<(usize, bool)>>,
111    /// Some fields which are required non null to match due to inequalities.
112    non_null_fields: Vec<usize>,
113    /// (i, j) in this `Vec` means that state data in this join side can be cleaned if the value of
114    /// its ith column is less than the synthetic watermark of the jth band join condition.
115    state_clean_columns: Vec<(usize, usize)>,
116    /// Whether degree table is needed for this side.
117    need_degree_table: bool,
118    _marker: std::marker::PhantomData<E>,
119}
120
121impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
122    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123        f.debug_struct("JoinSide")
124            .field("join_key_indices", &self.join_key_indices)
125            .field("col_types", &self.all_data_types)
126            .field("start_pos", &self.start_pos)
127            .field("i2o_mapping", &self.i2o_mapping)
128            .field("need_degree_table", &self.need_degree_table)
129            .finish()
130    }
131}
132
133impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
134    // WARNING: Please do not call this until we implement it.
135    fn is_dirty(&self) -> bool {
136        unimplemented!()
137    }
138
139    #[expect(dead_code)]
140    fn clear_cache(&mut self) {
141        assert!(
142            !self.is_dirty(),
143            "cannot clear cache while states of hash join are dirty"
144        );
145
146        // TODO: not working with rearranged chain
147        // self.ht.clear();
148    }
149
150    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
151        self.ht.init(epoch).await
152    }
153}
154
155/// `HashJoinExecutor` takes two input streams and runs equal hash join on them.
156/// The output columns are the concatenation of left and right columns.
157pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
158{
159    ctx: ActorContextRef,
160    info: ExecutorInfo,
161
162    /// Left input executor
163    input_l: Option<Executor>,
164    /// Right input executor
165    input_r: Option<Executor>,
166    /// The data types of the formed new columns
167    actual_output_data_types: Vec<DataType>,
168    /// The parameters of the left join executor
169    side_l: JoinSide<K, S, E>,
170    /// The parameters of the right join executor
171    side_r: JoinSide<K, S, E>,
172    /// Optional non-equi join conditions
173    cond: Option<NonStrictExpression>,
174    /// Inequality pairs with output column indices.
175    /// Each entry contains: (`output_indices`, `InequalityPairInfo`).
176    inequality_pairs: Vec<(Vec<usize>, InequalityPairInfo)>,
177    /// The output watermark of each inequality condition and its value is the minimum of the
178    /// watermarks from both sides. It will be used to generate watermark into downstream
179    /// and do state cleaning based on `clean_left_state`/`clean_right_state` fields.
180    inequality_watermarks: Vec<Option<Watermark>>,
181    /// Join-key positions and cleaning flags for watermark handling.
182    /// Each entry: (join-key position, `do_state_cleaning`).
183    watermark_indices_in_jk: Vec<(usize, bool)>,
184
185    /// Whether the logic can be optimized for append-only stream
186    append_only_optimize: bool,
187
188    metrics: Arc<StreamingMetrics>,
189    /// The maximum size of the chunk produced by executor at a time
190    chunk_size: usize,
191    /// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
192    cnt_rows_received: u32,
193
194    /// watermark column index -> `BufferedWatermarks`
195    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
196
197    /// When to alert high join amplification
198    high_join_amplification_threshold: usize,
199
200    /// Max number of rows that will be cached in the entry state.
201    entry_state_max_rows: usize,
202    /// Number of processed rows between periodic manual evictions of the join cache.
203    join_cache_evict_interval_rows: u32,
204}
205
206impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
207    for HashJoinExecutor<K, S, T, E>
208{
209    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210        f.debug_struct("HashJoinExecutor")
211            .field("join_type", &T)
212            .field("input_left", &self.input_l.as_ref().unwrap().identity())
213            .field("input_right", &self.input_r.as_ref().unwrap().identity())
214            .field("side_l", &self.side_l)
215            .field("side_r", &self.side_r)
216            .field("stream_key", &self.info.stream_key)
217            .field("schema", &self.info.schema)
218            .field("actual_output_data_types", &self.actual_output_data_types)
219            .field(
220                "join_cache_evict_interval_rows",
221                &self.join_cache_evict_interval_rows,
222            )
223            .finish()
224    }
225}
226
227impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
228    for HashJoinExecutor<K, S, T, E>
229{
230    fn execute(self: Box<Self>) -> BoxedMessageStream {
231        self.into_stream().boxed()
232    }
233}
234
235struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
236    ctx: &'a ActorContextRef,
237    side_l: &'a mut JoinSide<K, S, E>,
238    side_r: &'a mut JoinSide<K, S, E>,
239    actual_output_data_types: &'a [DataType],
240    cond: &'a mut Option<NonStrictExpression>,
241    inequality_watermarks: &'a [Option<Watermark>],
242    chunk: StreamChunk,
243    append_only_optimize: bool,
244    chunk_size: usize,
245    cnt_rows_received: &'a mut u32,
246    high_join_amplification_threshold: usize,
247    entry_state_max_rows: usize,
248    join_cache_evict_interval_rows: u32,
249}
250
251impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
252    HashJoinExecutor<K, S, T, E>
253{
254    #[allow(clippy::too_many_arguments)]
255    pub fn new(
256        ctx: ActorContextRef,
257        info: ExecutorInfo,
258        input_l: Executor,
259        input_r: Executor,
260        params_l: JoinParams,
261        params_r: JoinParams,
262        null_safe: Vec<bool>,
263        output_indices: Vec<usize>,
264        cond: Option<NonStrictExpression>,
265        inequality_pairs: Vec<InequalityPairInfo>,
266        state_table_l: StateTable<S>,
267        degree_state_table_l: StateTable<S>,
268        state_table_r: StateTable<S>,
269        degree_state_table_r: StateTable<S>,
270        watermark_epoch: AtomicU64Ref,
271        is_append_only: bool,
272        metrics: Arc<StreamingMetrics>,
273        chunk_size: usize,
274        high_join_amplification_threshold: usize,
275        watermark_indices_in_jk: Vec<(usize, bool)>,
276    ) -> Self {
277        Self::new_with_cache_size(
278            ctx,
279            info,
280            input_l,
281            input_r,
282            params_l,
283            params_r,
284            null_safe,
285            output_indices,
286            cond,
287            inequality_pairs,
288            state_table_l,
289            degree_state_table_l,
290            state_table_r,
291            degree_state_table_r,
292            watermark_epoch,
293            is_append_only,
294            metrics,
295            chunk_size,
296            high_join_amplification_threshold,
297            None,
298            watermark_indices_in_jk,
299        )
300    }
301
302    #[allow(clippy::too_many_arguments)]
303    pub fn new_with_cache_size(
304        ctx: ActorContextRef,
305        info: ExecutorInfo,
306        input_l: Executor,
307        input_r: Executor,
308        params_l: JoinParams,
309        params_r: JoinParams,
310        null_safe: Vec<bool>,
311        output_indices: Vec<usize>,
312        cond: Option<NonStrictExpression>,
313        inequality_pairs: Vec<InequalityPairInfo>,
314        state_table_l: StateTable<S>,
315        degree_state_table_l: StateTable<S>,
316        state_table_r: StateTable<S>,
317        degree_state_table_r: StateTable<S>,
318        watermark_epoch: AtomicU64Ref,
319        is_append_only: bool,
320        metrics: Arc<StreamingMetrics>,
321        chunk_size: usize,
322        high_join_amplification_threshold: usize,
323        entry_state_max_rows: Option<usize>,
324        watermark_indices_in_jk: Vec<(usize, bool)>,
325    ) -> Self {
326        let entry_state_max_rows = match entry_state_max_rows {
327            None => ctx.config.developer.hash_join_entry_state_max_rows,
328            Some(entry_state_max_rows) => entry_state_max_rows,
329        };
330        let join_cache_evict_interval_rows = ctx
331            .config
332            .developer
333            .join_hash_map_evict_interval_rows
334            .max(1);
335        let side_l_column_n = input_l.schema().len();
336
337        let schema_fields = match T {
338            JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
339            JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
340            _ => [
341                input_l.schema().fields.clone(),
342                input_r.schema().fields.clone(),
343            ]
344            .concat(),
345        };
346
347        let original_output_data_types = schema_fields
348            .iter()
349            .map(|field| field.data_type())
350            .collect_vec();
351        let actual_output_data_types = output_indices
352            .iter()
353            .map(|&idx| original_output_data_types[idx].clone())
354            .collect_vec();
355
356        // Data types of of hash join state.
357        let state_all_data_types_l = input_l.schema().data_types();
358        let state_all_data_types_r = input_r.schema().data_types();
359
360        let state_pk_indices_l = input_l.stream_key().to_vec();
361        let state_pk_indices_r = input_r.stream_key().to_vec();
362
363        let state_join_key_indices_l = params_l.join_key_indices;
364        let state_join_key_indices_r = params_r.join_key_indices;
365
366        let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
367        let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
368
369        let degree_pk_indices_l = (state_join_key_indices_l.len()
370            ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
371            .collect_vec();
372        let degree_pk_indices_r = (state_join_key_indices_r.len()
373            ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
374            .collect_vec();
375
376        // If pk is contained in join key.
377        let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
378        let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
379
380        // check whether join key contains pk in both side
381        let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
382
383        let join_key_data_types_l = state_join_key_indices_l
384            .iter()
385            .map(|idx| state_all_data_types_l[*idx].clone())
386            .collect_vec();
387
388        let join_key_data_types_r = state_join_key_indices_r
389            .iter()
390            .map(|idx| state_all_data_types_r[*idx].clone())
391            .collect_vec();
392
393        assert_eq!(join_key_data_types_l, join_key_data_types_r);
394
395        let null_matched = K::Bitmap::from_bool_vec(null_safe);
396
397        let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
398        let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
399
400        let (left_to_output, right_to_output) = {
401            let (left_len, right_len) = if is_left_semi_or_anti(T) {
402                (state_all_data_types_l.len(), 0usize)
403            } else if is_right_semi_or_anti(T) {
404                (0usize, state_all_data_types_r.len())
405            } else {
406                (state_all_data_types_l.len(), state_all_data_types_r.len())
407            };
408            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
409        };
410
411        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
412        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
413
414        let left_input_len = input_l.schema().len();
415        let right_input_len = input_r.schema().len();
416        let mut l2inequality_index = vec![vec![]; left_input_len];
417        let mut r2inequality_index = vec![vec![]; right_input_len];
418        let mut l_inequal_state_clean_columns = vec![];
419        let mut r_inequal_state_clean_columns = vec![];
420        let inequality_pairs = inequality_pairs
421            .into_iter()
422            .enumerate()
423            .map(|(index, pair)| {
424                // Map input columns to inequality index
425                // The second field indicates whether this column is required to be less than
426                // the corresponding column on the other side.
427                // For `left >= right`: left is NOT less than right, right IS less than left
428                // For `left <= right`: left IS less than right, right is NOT less than left
429                let left_is_larger = pair.left_side_is_larger();
430                l2inequality_index[pair.left_idx].push((index, !left_is_larger));
431                r2inequality_index[pair.right_idx].push((index, left_is_larger));
432
433                // Determine state cleanup columns
434                if pair.clean_left_state {
435                    l_inequal_state_clean_columns.push((pair.left_idx, index));
436                }
437                if pair.clean_right_state {
438                    r_inequal_state_clean_columns.push((pair.right_idx, index));
439                }
440
441                // Get output indices for watermark emission
442                // We only emit watermarks for the LARGER side's output columns
443                let output_indices = if pair.left_side_is_larger() {
444                    // Left is larger, emit for left output columns
445                    l2o_indexed
446                        .get_vec(&pair.left_idx)
447                        .cloned()
448                        .unwrap_or_default()
449                } else {
450                    // Right is larger, emit for right output columns
451                    r2o_indexed
452                        .get_vec(&pair.right_idx)
453                        .cloned()
454                        .unwrap_or_default()
455                };
456
457                (output_indices, pair)
458            })
459            .collect_vec();
460
461        let mut l_non_null_fields = l2inequality_index
462            .iter()
463            .positions(|inequalities| !inequalities.is_empty())
464            .collect_vec();
465        let mut r_non_null_fields = r2inequality_index
466            .iter()
467            .positions(|inequalities| !inequalities.is_empty())
468            .collect_vec();
469
470        if append_only_optimize {
471            l_inequal_state_clean_columns.clear();
472            r_inequal_state_clean_columns.clear();
473            l_non_null_fields.clear();
474            r_non_null_fields.clear();
475        }
476
477        // Create degree tables with inequality column index for watermark-based cleaning.
478        // The degree_inequality_idx is the column index in the input row that should be
479        // stored in the degree table for inequality-based watermark cleaning.
480        let l_inequality_idx = l_inequal_state_clean_columns
481            .first()
482            .map(|(col_idx, _)| *col_idx);
483        let r_inequality_idx = r_inequal_state_clean_columns
484            .first()
485            .map(|(col_idx, _)| *col_idx);
486
487        let degree_state_l = need_degree_table_l.then(|| {
488            TableInner::new(
489                degree_pk_indices_l,
490                degree_join_key_indices_l,
491                degree_state_table_l,
492                l_inequality_idx,
493            )
494        });
495        let degree_state_r = need_degree_table_r.then(|| {
496            TableInner::new(
497                degree_pk_indices_r,
498                degree_join_key_indices_r,
499                degree_state_table_r,
500                r_inequality_idx,
501            )
502        });
503
504        let inequality_watermarks = vec![None; inequality_pairs.len()];
505        let watermark_buffers = BTreeMap::new();
506        Self {
507            ctx: ctx.clone(),
508            info,
509            input_l: Some(input_l),
510            input_r: Some(input_r),
511            actual_output_data_types,
512            side_l: JoinSide {
513                ht: JoinHashMap::new(
514                    watermark_epoch.clone(),
515                    join_key_data_types_l,
516                    state_join_key_indices_l.clone(),
517                    state_all_data_types_l.clone(),
518                    state_table_l,
519                    params_l.deduped_pk_indices,
520                    degree_state_l,
521                    null_matched.clone(),
522                    pk_contained_in_jk_l,
523                    metrics.clone(),
524                    ctx.id,
525                    ctx.fragment_id,
526                    "left",
527                ),
528                join_key_indices: state_join_key_indices_l,
529                all_data_types: state_all_data_types_l,
530                i2o_mapping: left_to_output,
531                i2o_mapping_indexed: l2o_indexed,
532                input2inequality_index: l2inequality_index,
533                non_null_fields: l_non_null_fields,
534                state_clean_columns: l_inequal_state_clean_columns,
535                start_pos: 0,
536                need_degree_table: need_degree_table_l,
537                _marker: PhantomData,
538            },
539            side_r: JoinSide {
540                ht: JoinHashMap::new(
541                    watermark_epoch,
542                    join_key_data_types_r,
543                    state_join_key_indices_r.clone(),
544                    state_all_data_types_r.clone(),
545                    state_table_r,
546                    params_r.deduped_pk_indices,
547                    degree_state_r,
548                    null_matched,
549                    pk_contained_in_jk_r,
550                    metrics.clone(),
551                    ctx.id,
552                    ctx.fragment_id,
553                    "right",
554                ),
555                join_key_indices: state_join_key_indices_r,
556                all_data_types: state_all_data_types_r,
557                start_pos: side_l_column_n,
558                i2o_mapping: right_to_output,
559                i2o_mapping_indexed: r2o_indexed,
560                input2inequality_index: r2inequality_index,
561                non_null_fields: r_non_null_fields,
562                state_clean_columns: r_inequal_state_clean_columns,
563                need_degree_table: need_degree_table_r,
564                _marker: PhantomData,
565            },
566            cond,
567            inequality_pairs,
568            inequality_watermarks,
569            watermark_indices_in_jk,
570            append_only_optimize,
571            metrics,
572            chunk_size,
573            cnt_rows_received: 0,
574            watermark_buffers,
575            high_join_amplification_threshold,
576            entry_state_max_rows,
577            join_cache_evict_interval_rows,
578        }
579    }
580
581    #[try_stream(ok = Message, error = StreamExecutorError)]
582    async fn into_stream(mut self) {
583        let input_l = self.input_l.take().unwrap();
584        let input_r = self.input_r.take().unwrap();
585        let aligned_stream = barrier_align(
586            input_l.execute(),
587            input_r.execute(),
588            self.ctx.id,
589            self.ctx.fragment_id,
590            self.metrics.clone(),
591            "Join",
592        );
593        pin_mut!(aligned_stream);
594
595        let actor_id = self.ctx.id;
596
597        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
598        let first_epoch = barrier.epoch;
599        // The first barrier message should be propagated.
600        yield Message::Barrier(barrier);
601        self.side_l.init(first_epoch).await?;
602        self.side_r.init(first_epoch).await?;
603
604        let actor_id_str = self.ctx.id.to_string();
605        let fragment_id_str = self.ctx.fragment_id.to_string();
606
607        // initialized some metrics
608        let join_actor_input_waiting_duration_ns = self
609            .metrics
610            .join_actor_input_waiting_duration_ns
611            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
612        let left_join_match_duration_ns = self
613            .metrics
614            .join_match_duration_ns
615            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
616        let right_join_match_duration_ns = self
617            .metrics
618            .join_match_duration_ns
619            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
620
621        let barrier_join_match_duration_ns = self
622            .metrics
623            .join_match_duration_ns
624            .with_guarded_label_values(&[
625                actor_id_str.as_str(),
626                fragment_id_str.as_str(),
627                "barrier",
628            ]);
629
630        let left_join_cached_entry_count = self
631            .metrics
632            .join_cached_entry_count
633            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
634
635        let right_join_cached_entry_count = self
636            .metrics
637            .join_cached_entry_count
638            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
639
640        let mut start_time = Instant::now();
641
642        while let Some(msg) = aligned_stream
643            .next()
644            .instrument_await("hash_join_barrier_align")
645            .await
646        {
647            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
648            match msg? {
649                AlignedMessage::WatermarkLeft(watermark) => {
650                    for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
651                        yield Message::Watermark(watermark_to_emit);
652                    }
653                }
654                AlignedMessage::WatermarkRight(watermark) => {
655                    for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
656                        yield Message::Watermark(watermark_to_emit);
657                    }
658                }
659                AlignedMessage::Left(chunk) => {
660                    let mut left_time = Duration::from_nanos(0);
661                    let mut left_start_time = Instant::now();
662                    #[for_await]
663                    for chunk in Self::eq_join_left(EqJoinArgs {
664                        ctx: &self.ctx,
665                        side_l: &mut self.side_l,
666                        side_r: &mut self.side_r,
667                        actual_output_data_types: &self.actual_output_data_types,
668                        cond: &mut self.cond,
669                        inequality_watermarks: &self.inequality_watermarks,
670                        chunk,
671                        append_only_optimize: self.append_only_optimize,
672                        chunk_size: self.chunk_size,
673                        cnt_rows_received: &mut self.cnt_rows_received,
674                        high_join_amplification_threshold: self.high_join_amplification_threshold,
675                        entry_state_max_rows: self.entry_state_max_rows,
676                        join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
677                    }) {
678                        left_time += left_start_time.elapsed();
679                        yield Message::Chunk(chunk?);
680                        left_start_time = Instant::now();
681                    }
682                    left_time += left_start_time.elapsed();
683                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
684                    self.try_flush_data().await?;
685                }
686                AlignedMessage::Right(chunk) => {
687                    let mut right_time = Duration::from_nanos(0);
688                    let mut right_start_time = Instant::now();
689                    #[for_await]
690                    for chunk in Self::eq_join_right(EqJoinArgs {
691                        ctx: &self.ctx,
692                        side_l: &mut self.side_l,
693                        side_r: &mut self.side_r,
694                        actual_output_data_types: &self.actual_output_data_types,
695                        cond: &mut self.cond,
696                        inequality_watermarks: &self.inequality_watermarks,
697                        chunk,
698                        append_only_optimize: self.append_only_optimize,
699                        chunk_size: self.chunk_size,
700                        cnt_rows_received: &mut self.cnt_rows_received,
701                        high_join_amplification_threshold: self.high_join_amplification_threshold,
702                        entry_state_max_rows: self.entry_state_max_rows,
703                        join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
704                    }) {
705                        right_time += right_start_time.elapsed();
706                        yield Message::Chunk(chunk?);
707                        right_start_time = Instant::now();
708                    }
709                    right_time += right_start_time.elapsed();
710                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
711                    self.try_flush_data().await?;
712                }
713                AlignedMessage::Barrier(barrier) => {
714                    let barrier_start_time = Instant::now();
715                    let (left_post_commit, right_post_commit) =
716                        self.flush_data(barrier.epoch).await?;
717
718                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
719
720                    // We don't include the time post yielding barrier because the vnode update
721                    // is a one-off and rare operation.
722                    barrier_join_match_duration_ns
723                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
724                    yield Message::Barrier(barrier);
725
726                    // Update the vnode bitmap for state tables of both sides if asked.
727                    right_post_commit
728                        .post_yield_barrier(update_vnode_bitmap.clone())
729                        .await?;
730                    if left_post_commit
731                        .post_yield_barrier(update_vnode_bitmap)
732                        .await?
733                        .unwrap_or(false)
734                    {
735                        self.watermark_buffers
736                            .values_mut()
737                            .for_each(|buffers| buffers.clear());
738                        self.inequality_watermarks.fill(None);
739                    }
740
741                    // Report metrics of cached join rows/entries
742                    for (join_cached_entry_count, ht) in [
743                        (&left_join_cached_entry_count, &self.side_l.ht),
744                        (&right_join_cached_entry_count, &self.side_r.ht),
745                    ] {
746                        join_cached_entry_count.set(ht.entry_count() as i64);
747                    }
748                }
749            }
750            start_time = Instant::now();
751        }
752    }
753
754    async fn flush_data(
755        &mut self,
756        epoch: EpochPair,
757    ) -> StreamExecutorResult<(
758        JoinHashMapPostCommit<'_, K, S, E>,
759        JoinHashMapPostCommit<'_, K, S, E>,
760    )> {
761        // All changes to the state has been buffered in the mem-table of the state table. Just
762        // `commit` them here.
763        let left = self.side_l.ht.flush(epoch).await?;
764        let right = self.side_r.ht.flush(epoch).await?;
765        Ok((left, right))
766    }
767
768    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
769        // All changes to the state has been buffered in the mem-table of the state table. Just
770        // `commit` them here.
771        self.side_l.ht.try_flush().await?;
772        self.side_r.ht.try_flush().await?;
773        Ok(())
774    }
775
776    // We need to manually evict the cache.
777    fn evict_cache(
778        side_update: &mut JoinSide<K, S, E>,
779        side_match: &mut JoinSide<K, S, E>,
780        cnt_rows_received: &mut u32,
781        join_cache_evict_interval_rows: u32,
782    ) {
783        *cnt_rows_received += 1;
784        if *cnt_rows_received >= join_cache_evict_interval_rows {
785            side_update.ht.evict();
786            side_match.ht.evict();
787            *cnt_rows_received = 0;
788        }
789    }
790
791    fn handle_watermark(
792        &mut self,
793        side: SideTypePrimitive,
794        watermark: Watermark,
795    ) -> StreamExecutorResult<Vec<Watermark>> {
796        let (side_update, side_match) = if side == SideType::Left {
797            (&mut self.side_l, &mut self.side_r)
798        } else {
799            (&mut self.side_r, &mut self.side_l)
800        };
801
802        // Process join-key watermarks
803        let wm_in_jk = side_update
804            .join_key_indices
805            .iter()
806            .positions(|idx| *idx == watermark.col_idx);
807        let mut watermarks_to_emit = vec![];
808        for idx in wm_in_jk {
809            let buffers = self
810                .watermark_buffers
811                .entry(idx)
812                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
813            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
814                if self
815                    .watermark_indices_in_jk
816                    .iter()
817                    .any(|(jk_pos, do_clean)| *jk_pos == idx && *do_clean)
818                {
819                    side_match
820                        .ht
821                        .update_watermark(selected_watermark.val.clone());
822                    side_update
823                        .ht
824                        .update_watermark(selected_watermark.val.clone());
825                }
826
827                let empty_indices = vec![];
828                let output_indices = side_update
829                    .i2o_mapping_indexed
830                    .get_vec(&side_update.join_key_indices[idx])
831                    .unwrap_or(&empty_indices)
832                    .iter()
833                    .chain(
834                        side_match
835                            .i2o_mapping_indexed
836                            .get_vec(&side_match.join_key_indices[idx])
837                            .unwrap_or(&empty_indices),
838                    );
839                for output_idx in output_indices {
840                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
841                }
842            };
843        }
844
845        // Process inequality watermarks
846        // We can only yield the LARGER side's watermark downstream.
847        // For `left >= right`: left is larger, emit for left output columns
848        // For `left <= right`: right is larger, emit for right output columns
849        let mut update_left_watermark = None;
850        let mut update_right_watermark = None;
851        if let Some(watermark_indices) = side_update.input2inequality_index.get(watermark.col_idx) {
852            for (inequality_index, _) in watermark_indices {
853                let buffers = self
854                    .watermark_buffers
855                    .entry(side_update.join_key_indices.len() + inequality_index)
856                    .or_insert_with(|| {
857                        BufferedWatermarks::with_ids([SideType::Left, SideType::Right])
858                    });
859                if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone())
860                {
861                    let (output_indices, pair_info) = &self.inequality_pairs[*inequality_index];
862                    let left_is_larger = pair_info.left_side_is_larger();
863
864                    // Emit watermark for the larger side's output columns only
865                    for output_idx in output_indices {
866                        watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
867                    }
868                    // Store watermark for state cleaning (via useful_state_clean_columns)
869                    self.inequality_watermarks[*inequality_index] =
870                        Some(selected_watermark.clone());
871
872                    // Mark which side needs state cleaning (only if the clean flag is set)
873                    if left_is_larger && pair_info.clean_left_state {
874                        update_left_watermark = Some(selected_watermark.val.clone());
875                    } else if !left_is_larger && pair_info.clean_right_state {
876                        update_right_watermark = Some(selected_watermark.val.clone());
877                    }
878                }
879            }
880            // Do state cleaning on the larger side.
881            // Now that degree tables include the inequality column, we can clean both
882            // state and degree tables using `update_watermark`.
883            if let Some(val) = update_left_watermark {
884                self.side_l.ht.update_watermark(val);
885            }
886            if let Some(val) = update_right_watermark {
887                self.side_r.ht.update_watermark(val);
888            }
889        }
890        Ok(watermarks_to_emit)
891    }
892
893    fn row_concat(
894        row_update: impl Row,
895        update_start_pos: usize,
896        row_matched: impl Row,
897        matched_start_pos: usize,
898    ) -> OwnedRow {
899        let mut new_row = vec![None; row_update.len() + row_matched.len()];
900
901        for (i, datum_ref) in row_update.iter().enumerate() {
902            new_row[i + update_start_pos] = datum_ref.to_owned_datum();
903        }
904        for (i, datum_ref) in row_matched.iter().enumerate() {
905            new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
906        }
907        OwnedRow::new(new_row)
908    }
909
910    /// Used to forward `eq_join_oneside` to show join side in stack.
911    fn eq_join_left(
912        args: EqJoinArgs<'_, K, S, E>,
913    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
914        Self::eq_join_oneside::<{ SideType::Left }>(args)
915    }
916
917    /// Used to forward `eq_join_oneside` to show join side in stack.
918    fn eq_join_right(
919        args: EqJoinArgs<'_, K, S, E>,
920    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
921        Self::eq_join_oneside::<{ SideType::Right }>(args)
922    }
923
924    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
925    async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
926        let EqJoinArgs {
927            ctx,
928            side_l,
929            side_r,
930            actual_output_data_types,
931            cond,
932            inequality_watermarks,
933            chunk,
934            append_only_optimize,
935            chunk_size,
936            cnt_rows_received,
937            high_join_amplification_threshold,
938            entry_state_max_rows,
939            join_cache_evict_interval_rows,
940            ..
941        } = args;
942
943        let (side_update, side_match) = if SIDE == SideType::Left {
944            (side_l, side_r)
945        } else {
946            (side_r, side_l)
947        };
948
949        let useful_state_clean_columns = side_match
950            .state_clean_columns
951            .iter()
952            .filter_map(|(column_idx, inequality_index)| {
953                inequality_watermarks[*inequality_index]
954                    .as_ref()
955                    .map(|watermark| (*column_idx, watermark))
956            })
957            .collect_vec();
958
959        let mut hashjoin_chunk_builder =
960            JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
961                chunk_size,
962                actual_output_data_types.to_vec(),
963                side_update.i2o_mapping.clone(),
964                side_match.i2o_mapping.clone(),
965            ));
966
967        let join_matched_join_keys = ctx
968            .streaming_metrics
969            .join_matched_join_keys
970            .with_guarded_label_values(&[
971                &ctx.id.to_string(),
972                &ctx.fragment_id.to_string(),
973                &side_update.ht.table_id().to_string(),
974            ]);
975
976        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
977        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
978            let Some((op, row)) = r else {
979                continue;
980            };
981            Self::evict_cache(
982                side_update,
983                side_match,
984                cnt_rows_received,
985                join_cache_evict_interval_rows,
986            );
987
988            let cache_lookup_result = {
989                let probe_non_null_requirement_satisfied = side_update
990                    .non_null_fields
991                    .iter()
992                    .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
993                let build_non_null_requirement_satisfied =
994                    key.null_bitmap().is_subset(side_match.ht.null_matched());
995                if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
996                    side_match.ht.take_state_opt(key)
997                } else {
998                    CacheResult::NeverMatch
999                }
1000            };
1001            let mut total_matches = 0;
1002
1003            macro_rules! match_rows {
1004                ($op:ident) => {
1005                    Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
1006                        cache_lookup_result,
1007                        row,
1008                        key,
1009                        &mut hashjoin_chunk_builder,
1010                        side_match,
1011                        side_update,
1012                        &useful_state_clean_columns,
1013                        cond,
1014                        append_only_optimize,
1015                        entry_state_max_rows,
1016                    )
1017                };
1018            }
1019
1020            match op {
1021                Op::Insert | Op::UpdateInsert =>
1022                {
1023                    #[for_await]
1024                    for chunk in match_rows!(Insert) {
1025                        let chunk = chunk?;
1026                        total_matches += chunk.cardinality();
1027                        yield chunk;
1028                    }
1029                }
1030                Op::Delete | Op::UpdateDelete =>
1031                {
1032                    #[for_await]
1033                    for chunk in match_rows!(Delete) {
1034                        let chunk = chunk?;
1035                        total_matches += chunk.cardinality();
1036                        yield chunk;
1037                    }
1038                }
1039            };
1040
1041            join_matched_join_keys.observe(total_matches as _);
1042            if total_matches > high_join_amplification_threshold {
1043                let join_key_data_types = side_update.ht.join_key_data_types();
1044                let key = key.deserialize(join_key_data_types)?;
1045                tracing::warn!(target: "high_join_amplification",
1046                    matched_rows_len = total_matches,
1047                    update_table_id = %side_update.ht.table_id(),
1048                    match_table_id = %side_match.ht.table_id(),
1049                    join_key = ?key,
1050                    actor_id = %ctx.id,
1051                    fragment_id = %ctx.fragment_id,
1052                    "large rows matched for join key"
1053                );
1054            }
1055        }
1056        // NOTE(kwannoel): We don't track metrics for this last chunk.
1057        if let Some(chunk) = hashjoin_chunk_builder.take() {
1058            yield chunk;
1059        }
1060    }
1061
1062    /// For the probe-side row, we need to check if it has values in cache, if not, we need to
1063    /// fetch the matched rows from the state table.
1064    ///
1065    /// Every matched build-side row being processed needs to go through the following phases:
1066    /// 1. Handle join condition evaluation.
1067    /// 2. Always do cache refill, if the state count is good.
1068    /// 3. Handle state cleaning.
1069    /// 4. Handle degree table update.
1070    #[allow(clippy::too_many_arguments)]
1071    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
1072    async fn handle_match_rows<
1073        'a,
1074        const SIDE: SideTypePrimitive,
1075        const JOIN_OP: JoinOpPrimitive,
1076    >(
1077        cached_lookup_result: CacheResult<E>,
1078        row: RowRef<'a>,
1079        key: &'a K,
1080        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1081        side_match: &'a mut JoinSide<K, S, E>,
1082        side_update: &'a mut JoinSide<K, S, E>,
1083        useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1084        cond: &'a mut Option<NonStrictExpression>,
1085        append_only_optimize: bool,
1086        entry_state_max_rows: usize,
1087    ) {
1088        let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1089        let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1090        let mut entry_state_count = 0;
1091
1092        let mut degree = 0;
1093        let mut append_only_matched_row = None;
1094        let mut matched_rows_to_clean = vec![];
1095
1096        macro_rules! match_row {
1097            (
1098                $match_order_key_indices:expr,
1099                $degree_table:expr,
1100                $matched_row:expr,
1101                $matched_row_ref:expr,
1102                $from_cache:literal,
1103                $map_output:expr,
1104            ) => {
1105                Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1106                    row,
1107                    $matched_row,
1108                    $matched_row_ref,
1109                    hashjoin_chunk_builder,
1110                    $match_order_key_indices,
1111                    $degree_table,
1112                    side_update.start_pos,
1113                    side_match.start_pos,
1114                    cond,
1115                    &mut degree,
1116                    useful_state_clean_columns,
1117                    append_only_optimize,
1118                    &mut append_only_matched_row,
1119                    &mut matched_rows_to_clean,
1120                    $map_output,
1121                )
1122            };
1123        }
1124
1125        let entry_state = match cached_lookup_result {
1126            CacheResult::NeverMatch => {
1127                let op = match JOIN_OP {
1128                    JoinOp::Insert => Op::Insert,
1129                    JoinOp::Delete => Op::Delete,
1130                };
1131                if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1132                    yield chunk;
1133                }
1134                return Ok(());
1135            }
1136            CacheResult::Hit(mut cached_rows) => {
1137                let (match_order_key_indices, match_degree_state) =
1138                    side_match.ht.get_degree_state_mut_ref();
1139                // Handle cached rows which match the probe-side row.
1140                for (matched_row_ref, matched_row) in
1141                    cached_rows.values_mut(&side_match.all_data_types)
1142                {
1143                    let matched_row = matched_row?;
1144                    if let Some(chunk) = match_row!(
1145                        match_order_key_indices,
1146                        match_degree_state,
1147                        matched_row,
1148                        Some(matched_row_ref),
1149                        true,
1150                        Either::Left,
1151                    )
1152                    .await
1153                    {
1154                        yield chunk;
1155                    }
1156                }
1157
1158                cached_rows
1159            }
1160            CacheResult::Miss => {
1161                // Handle rows which are not in cache.
1162                let (matched_rows, match_order_key_indices, degree_table) = side_match
1163                    .ht
1164                    .fetch_matched_rows_and_get_degree_table_ref(key)
1165                    .await?;
1166
1167                #[for_await]
1168                for matched_row in matched_rows {
1169                    let (encoded_pk, matched_row) = matched_row?;
1170
1171                    let mut matched_row_ref = None;
1172
1173                    // cache refill
1174                    if entry_state_count <= entry_state_max_rows {
1175                        let row_ref = entry_state
1176                            .insert(encoded_pk, E::encode(&matched_row))
1177                            .with_context(|| format!("row: {}", row.display(),))?;
1178                        matched_row_ref = Some(row_ref);
1179                        entry_state_count += 1;
1180                    }
1181                    if let Some(chunk) = match_row!(
1182                        match_order_key_indices,
1183                        degree_table,
1184                        matched_row,
1185                        matched_row_ref,
1186                        false,
1187                        Either::Right,
1188                    )
1189                    .await
1190                    {
1191                        yield chunk;
1192                    }
1193                }
1194                Box::new(entry_state)
1195            }
1196        };
1197
1198        // forward rows depending on join types
1199        let op = match JOIN_OP {
1200            JoinOp::Insert => Op::Insert,
1201            JoinOp::Delete => Op::Delete,
1202        };
1203        if degree == 0 {
1204            if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1205                yield chunk;
1206            }
1207        } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1208        {
1209            yield chunk;
1210        }
1211
1212        // cache refill
1213        if cache_hit || entry_state_count <= entry_state_max_rows {
1214            side_match.ht.update_state(key, entry_state);
1215        }
1216
1217        // watermark state cleaning
1218        for matched_row in matched_rows_to_clean {
1219            // The committed table watermark is responsible for reclaiming rows in the state table.
1220            side_match.ht.delete_row_in_mem(key, &matched_row.row)?;
1221        }
1222
1223        // apply append_only optimization to clean matched_rows which have been persisted
1224        if append_only_optimize && let Some(row) = append_only_matched_row {
1225            assert_matches!(JOIN_OP, JoinOp::Insert);
1226            side_match.ht.delete_handle_degree(key, row)?;
1227            return Ok(());
1228        }
1229
1230        // no append_only optimization, update state table(s).
1231        match JOIN_OP {
1232            JoinOp::Insert => {
1233                side_update
1234                    .ht
1235                    .insert_handle_degree(key, JoinRow::new(row, degree))?;
1236            }
1237            JoinOp::Delete => {
1238                side_update
1239                    .ht
1240                    .delete_handle_degree(key, JoinRow::new(row, degree))?;
1241            }
1242        }
1243    }
1244
1245    #[allow(clippy::too_many_arguments)]
1246    #[inline]
1247    async fn handle_match_row<
1248        'a,
1249        R: Row,  // input row type
1250        RO: Row, // output row type
1251        const SIDE: SideTypePrimitive,
1252        const JOIN_OP: JoinOpPrimitive,
1253        const MATCHED_ROWS_FROM_CACHE: bool,
1254    >(
1255        update_row: RowRef<'a>,
1256        mut matched_row: JoinRow<R>,
1257        mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1258        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1259        match_order_key_indices: &[usize],
1260        match_degree_table: &mut Option<TableInner<S>>,
1261        side_update_start_pos: usize,
1262        side_match_start_pos: usize,
1263        cond: &Option<NonStrictExpression>,
1264        update_row_degree: &mut u64,
1265        useful_state_clean_columns: &[(usize, &'a Watermark)],
1266        append_only_optimize: bool,
1267        append_only_matched_row: &mut Option<JoinRow<RO>>,
1268        matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1269        map_output: impl Fn(R) -> RO,
1270    ) -> Option<StreamChunk> {
1271        let mut need_state_clean = false;
1272        let mut chunk_opt = None;
1273        // TODO(kwannoel): Instead of evaluating this every loop,
1274        // we can call this only if there's a non-equi expression.
1275        // check join cond
1276        let join_condition_satisfied = Self::check_join_condition(
1277            update_row,
1278            side_update_start_pos,
1279            &matched_row.row,
1280            side_match_start_pos,
1281            cond,
1282        )
1283        .await;
1284
1285        if join_condition_satisfied {
1286            // update degree
1287            *update_row_degree += 1;
1288            // send matched row downstream
1289
1290            // Inserts must happen before degrees are updated,
1291            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1292            if matches!(JOIN_OP, JoinOp::Insert)
1293                && !forward_exactly_once(T, SIDE)
1294                && let Some(chunk) =
1295                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1296            {
1297                chunk_opt = Some(chunk);
1298            }
1299            // TODO(kwannoel): We can actually statically decide this, using join side + join type.
1300            if let Some(degree_table) = match_degree_table {
1301                update_degree::<S, { JOIN_OP }>(
1302                    match_order_key_indices,
1303                    degree_table,
1304                    &mut matched_row,
1305                );
1306                if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1307                    // update matched row in cache
1308                    match JOIN_OP {
1309                        JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1310                        JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1311                    }
1312                }
1313            }
1314
1315            // Deletes must happen after degree table is updated.
1316            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1317            if matches!(JOIN_OP, JoinOp::Delete)
1318                && !forward_exactly_once(T, SIDE)
1319                && let Some(chunk) =
1320                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1321            {
1322                chunk_opt = Some(chunk);
1323            }
1324        } else {
1325            // check if need state cleaning
1326            for (column_idx, watermark) in useful_state_clean_columns {
1327                if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1328                    scalar
1329                        .default_cmp(&watermark.val.as_scalar_ref_impl())
1330                        .is_lt()
1331                }) {
1332                    need_state_clean = true;
1333                    break;
1334                }
1335            }
1336        }
1337        // If the stream is append-only and the join key covers pk in both side,
1338        // then we can remove matched rows since pk is unique and will not be
1339        // inserted again
1340        if append_only_optimize {
1341            assert_matches!(JOIN_OP, JoinOp::Insert);
1342            // Since join key contains pk and pk is unique, there should be only
1343            // one row if matched.
1344            assert!(append_only_matched_row.is_none());
1345            *append_only_matched_row = Some(matched_row.map(map_output));
1346        } else if need_state_clean {
1347            debug_assert!(
1348                !append_only_optimize,
1349                "`append_only_optimize` and `need_state_clean` must not both be true"
1350            );
1351            matched_rows_to_clean.push(matched_row.map(map_output));
1352        }
1353
1354        chunk_opt
1355    }
1356
1357    // TODO(yuhao-su): We should find a better way to eval the expression
1358    // without concat two rows.
1359    // if there are non-equi expressions
1360    // NOTE(kwannoel): We can probably let `eval` use `impl Row` instead of `OwnedRow`.
1361    #[inline]
1362    async fn check_join_condition(
1363        row: impl Row,
1364        side_update_start_pos: usize,
1365        matched_row: impl Row,
1366        side_match_start_pos: usize,
1367        join_condition: &Option<NonStrictExpression>,
1368    ) -> bool {
1369        if let Some(join_condition) = join_condition {
1370            let new_row = Self::row_concat(
1371                row,
1372                side_update_start_pos,
1373                matched_row,
1374                side_match_start_pos,
1375            );
1376            join_condition
1377                .eval_row_infallible(&new_row)
1378                .await
1379                .map(|s| *s.as_bool())
1380                .unwrap_or(false)
1381        } else {
1382            true
1383        }
1384    }
1385}
1386
1387#[cfg(test)]
1388mod tests {
1389    use std::sync::atomic::AtomicU64;
1390
1391    use pretty_assertions::assert_eq;
1392    use risingwave_common::array::*;
1393    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1394    use risingwave_common::config::StreamingConfig;
1395    use risingwave_common::hash::{Key64, Key128};
1396    use risingwave_common::util::epoch::test_epoch;
1397    use risingwave_common::util::sort_util::OrderType;
1398    use risingwave_storage::memory::MemoryStateStore;
1399
1400    use super::*;
1401    use crate::common::table::test_utils::gen_pbtable;
1402    use crate::executor::MemoryEncoding;
1403    use crate::executor::test_utils::expr::build_from_pretty;
1404    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1405
1406    async fn create_in_memory_state_table(
1407        mem_state: MemoryStateStore,
1408        data_types: &[DataType],
1409        order_types: &[OrderType],
1410        pk_indices: &[usize],
1411        table_id: u32,
1412    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1413        create_in_memory_state_table_with_inequality(
1414            mem_state,
1415            data_types,
1416            order_types,
1417            pk_indices,
1418            table_id,
1419            None,
1420        )
1421        .await
1422    }
1423
1424    async fn create_in_memory_state_table_with_inequality(
1425        mem_state: MemoryStateStore,
1426        data_types: &[DataType],
1427        order_types: &[OrderType],
1428        pk_indices: &[usize],
1429        table_id: u32,
1430        degree_inequality_type: Option<DataType>,
1431    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1432        create_in_memory_state_table_with_watermark(
1433            mem_state,
1434            data_types,
1435            order_types,
1436            pk_indices,
1437            table_id,
1438            degree_inequality_type,
1439            vec![],
1440            vec![],
1441        )
1442        .await
1443    }
1444
1445    #[allow(clippy::too_many_arguments)]
1446    async fn create_in_memory_state_table_with_watermark(
1447        mem_state: MemoryStateStore,
1448        data_types: &[DataType],
1449        order_types: &[OrderType],
1450        pk_indices: &[usize],
1451        table_id: u32,
1452        degree_inequality_type: Option<DataType>,
1453        state_clean_watermark_indices: Vec<usize>,
1454        degree_clean_watermark_indices: Vec<usize>,
1455    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1456        let column_descs = data_types
1457            .iter()
1458            .enumerate()
1459            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1460            .collect_vec();
1461        let mut state_table_catalog = gen_pbtable(
1462            TableId::new(table_id),
1463            column_descs,
1464            order_types.to_vec(),
1465            pk_indices.to_vec(),
1466            0,
1467        );
1468        state_table_catalog.clean_watermark_indices = state_clean_watermark_indices
1469            .into_iter()
1470            .map(|idx| idx as u32)
1471            .collect();
1472        let state_table =
1473            StateTable::from_table_catalog(&state_table_catalog, mem_state.clone(), None).await;
1474
1475        // Create degree table with schema: [pk..., _degree, inequality?]
1476        let mut degree_table_column_descs = vec![];
1477        pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1478            degree_table_column_descs.push(ColumnDesc::unnamed(
1479                ColumnId::new(pk_id as i32),
1480                data_types[*idx].clone(),
1481            ))
1482        });
1483        // Add _degree column
1484        degree_table_column_descs.push(ColumnDesc::unnamed(
1485            ColumnId::new(pk_indices.len() as i32),
1486            DataType::Int64,
1487        ));
1488        // Add inequality column if present
1489        if let Some(ineq_type) = degree_inequality_type {
1490            degree_table_column_descs.push(ColumnDesc::unnamed(
1491                ColumnId::new((pk_indices.len() + 1) as i32),
1492                ineq_type,
1493            ));
1494        }
1495        let mut degree_table_catalog = gen_pbtable(
1496            TableId::new(table_id + 1),
1497            degree_table_column_descs,
1498            order_types.to_vec(),
1499            pk_indices.to_vec(),
1500            0,
1501        );
1502        degree_table_catalog.clean_watermark_indices = degree_clean_watermark_indices
1503            .into_iter()
1504            .map(|idx| idx as u32)
1505            .collect();
1506        let degree_state_table =
1507            StateTable::from_table_catalog(&degree_table_catalog, mem_state, None).await;
1508        (state_table, degree_state_table)
1509    }
1510
1511    fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1512        build_from_pretty(
1513            condition_text
1514                .as_deref()
1515                .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1516        )
1517    }
1518
1519    async fn create_executor<const T: JoinTypePrimitive>(
1520        with_condition: bool,
1521        null_safe: bool,
1522        condition_text: Option<String>,
1523        inequality_pairs: Vec<InequalityPairInfo>,
1524    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1525        let schema = Schema {
1526            fields: vec![
1527                Field::unnamed(DataType::Int64), // join key
1528                Field::unnamed(DataType::Int64),
1529            ],
1530        };
1531        let (tx_l, source_l) = MockSource::channel();
1532        let source_l = source_l.into_executor(schema.clone(), vec![1]);
1533        let (tx_r, source_r) = MockSource::channel();
1534        let source_r = source_r.into_executor(schema, vec![1]);
1535        let params_l = JoinParams::new(vec![0], vec![1]);
1536        let params_r = JoinParams::new(vec![0], vec![1]);
1537        let cond = with_condition.then(|| create_cond(condition_text));
1538
1539        let mem_state = MemoryStateStore::new();
1540
1541        // Determine inequality column types for degree tables based on inequality_pairs
1542        let l_degree_ineq_type = inequality_pairs
1543            .iter()
1544            .find(|pair| pair.clean_left_state)
1545            .map(|_| DataType::Int64); // Column 1 is Int64
1546        let r_degree_ineq_type = inequality_pairs
1547            .iter()
1548            .find(|pair| pair.clean_right_state)
1549            .map(|_| DataType::Int64); // Column 1 is Int64
1550        let l_clean_watermark_indices = inequality_pairs
1551            .iter()
1552            .find(|pair| pair.clean_left_state)
1553            .map(|pair| vec![pair.left_idx])
1554            .unwrap_or_default();
1555        let r_clean_watermark_indices = inequality_pairs
1556            .iter()
1557            .find(|pair| pair.clean_right_state)
1558            .map(|pair| vec![pair.right_idx])
1559            .unwrap_or_default();
1560        let degree_inequality_column_idx = 3;
1561        let l_degree_clean_watermark_indices = l_degree_ineq_type
1562            .as_ref()
1563            .map(|_| vec![degree_inequality_column_idx])
1564            .unwrap_or_default();
1565        let r_degree_clean_watermark_indices = r_degree_ineq_type
1566            .as_ref()
1567            .map(|_| vec![degree_inequality_column_idx])
1568            .unwrap_or_default();
1569
1570        let (state_l, degree_state_l) = create_in_memory_state_table_with_watermark(
1571            mem_state.clone(),
1572            &[DataType::Int64, DataType::Int64],
1573            &[OrderType::ascending(), OrderType::ascending()],
1574            &[0, 1],
1575            0,
1576            l_degree_ineq_type,
1577            l_clean_watermark_indices,
1578            l_degree_clean_watermark_indices,
1579        )
1580        .await;
1581
1582        let (state_r, degree_state_r) = create_in_memory_state_table_with_watermark(
1583            mem_state,
1584            &[DataType::Int64, DataType::Int64],
1585            &[OrderType::ascending(), OrderType::ascending()],
1586            &[0, 1],
1587            2,
1588            r_degree_ineq_type,
1589            r_clean_watermark_indices,
1590            r_degree_clean_watermark_indices,
1591        )
1592        .await;
1593
1594        let schema = match T {
1595            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1596            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1597            _ => [source_l.schema().fields(), source_r.schema().fields()]
1598                .concat()
1599                .into_iter()
1600                .collect(),
1601        };
1602        let schema_len = schema.len();
1603        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1604
1605        let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1606            ActorContext::for_test(123),
1607            info,
1608            source_l,
1609            source_r,
1610            params_l,
1611            params_r,
1612            vec![null_safe],
1613            (0..schema_len).collect_vec(),
1614            cond,
1615            inequality_pairs,
1616            state_l,
1617            degree_state_l,
1618            state_r,
1619            degree_state_r,
1620            Arc::new(AtomicU64::new(0)),
1621            false,
1622            Arc::new(StreamingMetrics::unused()),
1623            1024,
1624            2048,
1625            vec![(0, true)],
1626        );
1627        (tx_l, tx_r, executor.boxed().execute())
1628    }
1629
1630    async fn create_classical_executor<const T: JoinTypePrimitive>(
1631        with_condition: bool,
1632        null_safe: bool,
1633        condition_text: Option<String>,
1634    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1635        create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1636    }
1637
1638    async fn create_append_only_executor<const T: JoinTypePrimitive>(
1639        with_condition: bool,
1640    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1641        let schema = Schema {
1642            fields: vec![
1643                Field::unnamed(DataType::Int64),
1644                Field::unnamed(DataType::Int64),
1645                Field::unnamed(DataType::Int64),
1646            ],
1647        };
1648        let (tx_l, source_l) = MockSource::channel();
1649        let source_l = source_l.into_executor(schema.clone(), vec![0]);
1650        let (tx_r, source_r) = MockSource::channel();
1651        let source_r = source_r.into_executor(schema, vec![0]);
1652        let params_l = JoinParams::new(vec![0, 1], vec![]);
1653        let params_r = JoinParams::new(vec![0, 1], vec![]);
1654        let cond = with_condition.then(|| create_cond(None));
1655
1656        let mem_state = MemoryStateStore::new();
1657
1658        let (state_l, degree_state_l) = create_in_memory_state_table(
1659            mem_state.clone(),
1660            &[DataType::Int64, DataType::Int64, DataType::Int64],
1661            &[
1662                OrderType::ascending(),
1663                OrderType::ascending(),
1664                OrderType::ascending(),
1665            ],
1666            &[0, 1, 0],
1667            0,
1668        )
1669        .await;
1670
1671        let (state_r, degree_state_r) = create_in_memory_state_table(
1672            mem_state,
1673            &[DataType::Int64, DataType::Int64, DataType::Int64],
1674            &[
1675                OrderType::ascending(),
1676                OrderType::ascending(),
1677                OrderType::ascending(),
1678            ],
1679            &[0, 1, 1],
1680            1,
1681        )
1682        .await;
1683
1684        let schema = match T {
1685            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1686            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1687            _ => [source_l.schema().fields(), source_r.schema().fields()]
1688                .concat()
1689                .into_iter()
1690                .collect(),
1691        };
1692        let schema_len = schema.len();
1693        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1694
1695        let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1696            ActorContext::for_test(123),
1697            info,
1698            source_l,
1699            source_r,
1700            params_l,
1701            params_r,
1702            vec![false],
1703            (0..schema_len).collect_vec(),
1704            cond,
1705            vec![],
1706            state_l,
1707            degree_state_l,
1708            state_r,
1709            degree_state_r,
1710            Arc::new(AtomicU64::new(0)),
1711            true,
1712            Arc::new(StreamingMetrics::unused()),
1713            1024,
1714            2048,
1715            vec![(0, true)],
1716        );
1717        (tx_l, tx_r, executor.boxed().execute())
1718    }
1719
1720    #[tokio::test]
1721    async fn test_inequality_join_watermark() -> StreamExecutorResult<()> {
1722        // Test inequality join with watermark-based state cleanup
1723        // Condition: left.col1 >= right.col1 (left side is larger, clean left state)
1724        // Left state rows with col1 < watermark will be cleaned
1725        let chunk_l1 = StreamChunk::from_pretty(
1726            "  I I
1727             + 2 4
1728             + 2 7
1729             + 3 8",
1730        );
1731        let chunk_r1 = StreamChunk::from_pretty(
1732            "  I I
1733             + 2 6",
1734        );
1735        let chunk_r2 = StreamChunk::from_pretty(
1736            "  I I
1737             + 2 3",
1738        );
1739        // Test with condition: left.col1 >= right.col1
1740        let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1741            true,
1742            false,
1743            Some(String::from(
1744                "(greater_than_or_equal:boolean $1:int8 $3:int8)",
1745            )),
1746            vec![InequalityPairInfo {
1747                left_idx: 1,
1748                right_idx: 1,
1749                clean_left_state: true, // left >= right, left is larger
1750                clean_right_state: false,
1751                op: InequalityType::GreaterThanOrEqual,
1752            }],
1753        )
1754        .await;
1755
1756        // push the init barrier for left and right
1757        tx_l.push_barrier(test_epoch(1), false);
1758        tx_r.push_barrier(test_epoch(1), false);
1759        hash_join.next_unwrap_ready_barrier()?;
1760
1761        // push the left chunk: (2, 4), (2, 7), (3, 8)
1762        tx_l.push_chunk(chunk_l1);
1763        hash_join.next_unwrap_pending();
1764
1765        // push watermarks: left=10, right=6
1766        // Output watermark is min(10, 6) = 6 for both output columns
1767        tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1768        hash_join.next_unwrap_pending();
1769
1770        tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1771        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1772        assert_eq!(
1773            output_watermark,
1774            Watermark::new(1, DataType::Int64, ScalarImpl::Int64(6))
1775        );
1776
1777        // After watermark, left state rows with col1 < 6 are cleaned
1778        // Row (2, 4) should be cleaned (4 < 6)
1779        // Row (2, 7) should remain (7 >= 6)
1780        // Row (3, 8) should remain (8 >= 6)
1781
1782        // push right chunk (2, 6)
1783        // Only (2, 7) can match: 7 >= 6 is TRUE
1784        // (2, 4) was cleaned, so it won't match
1785        tx_r.push_chunk(chunk_r1);
1786        let chunk = hash_join.next_unwrap_ready_chunk()?;
1787        assert_eq!(
1788            chunk,
1789            StreamChunk::from_pretty(
1790                " I I I I
1791                + 2 7 2 6"
1792            )
1793        );
1794
1795        // push right chunk (2, 3)
1796        // (2, 7) can match: 7 >= 3 is TRUE
1797        // (2, 4) was cleaned, won't match
1798        tx_r.push_chunk(chunk_r2);
1799        let chunk = hash_join.next_unwrap_ready_chunk()?;
1800        assert_eq!(
1801            chunk,
1802            StreamChunk::from_pretty(
1803                " I I I I
1804                + 2 7 2 3"
1805            )
1806        );
1807
1808        Ok(())
1809    }
1810
1811    #[tokio::test]
1812    async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1813        let chunk_l1 = StreamChunk::from_pretty(
1814            "  I I
1815             + 1 4
1816             + 2 5
1817             + 3 6",
1818        );
1819        let chunk_l2 = StreamChunk::from_pretty(
1820            "  I I
1821             + 3 8
1822             - 3 8",
1823        );
1824        let chunk_r1 = StreamChunk::from_pretty(
1825            "  I I
1826             + 2 7
1827             + 4 8
1828             + 6 9",
1829        );
1830        let chunk_r2 = StreamChunk::from_pretty(
1831            "  I  I
1832             + 3 10
1833             + 6 11",
1834        );
1835        let (mut tx_l, mut tx_r, mut hash_join) =
1836            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1837
1838        // push the init barrier for left and right
1839        tx_l.push_barrier(test_epoch(1), false);
1840        tx_r.push_barrier(test_epoch(1), false);
1841        hash_join.next_unwrap_ready_barrier()?;
1842
1843        // push the 1st left chunk
1844        tx_l.push_chunk(chunk_l1);
1845        hash_join.next_unwrap_pending();
1846
1847        // push the init barrier for left and right
1848        tx_l.push_barrier(test_epoch(2), false);
1849        tx_r.push_barrier(test_epoch(2), false);
1850        hash_join.next_unwrap_ready_barrier()?;
1851
1852        // push the 2nd left chunk
1853        tx_l.push_chunk(chunk_l2);
1854        hash_join.next_unwrap_pending();
1855
1856        // push the 1st right chunk
1857        tx_r.push_chunk(chunk_r1);
1858        let chunk = hash_join.next_unwrap_ready_chunk()?;
1859        assert_eq!(
1860            chunk,
1861            StreamChunk::from_pretty(
1862                " I I I I
1863                + 2 5 2 7"
1864            )
1865        );
1866
1867        // push the 2nd right chunk
1868        tx_r.push_chunk(chunk_r2);
1869        let chunk = hash_join.next_unwrap_ready_chunk()?;
1870        assert_eq!(
1871            chunk,
1872            StreamChunk::from_pretty(
1873                " I I I I
1874                + 3 6 3 10"
1875            )
1876        );
1877
1878        Ok(())
1879    }
1880
1881    #[tokio::test]
1882    async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1883        let chunk_l1 = StreamChunk::from_pretty(
1884            "  I I
1885             + 1 4
1886             + 2 5
1887             + . 6",
1888        );
1889        let chunk_l2 = StreamChunk::from_pretty(
1890            "  I I
1891             + . 8
1892             - . 8",
1893        );
1894        let chunk_r1 = StreamChunk::from_pretty(
1895            "  I I
1896             + 2 7
1897             + 4 8
1898             + 6 9",
1899        );
1900        let chunk_r2 = StreamChunk::from_pretty(
1901            "  I  I
1902             + . 10
1903             + 6 11",
1904        );
1905        let (mut tx_l, mut tx_r, mut hash_join) =
1906            create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1907
1908        // push the init barrier for left and right
1909        tx_l.push_barrier(test_epoch(1), false);
1910        tx_r.push_barrier(test_epoch(1), false);
1911        hash_join.next_unwrap_ready_barrier()?;
1912
1913        // push the 1st left chunk
1914        tx_l.push_chunk(chunk_l1);
1915        hash_join.next_unwrap_pending();
1916
1917        // push the init barrier for left and right
1918        tx_l.push_barrier(test_epoch(2), false);
1919        tx_r.push_barrier(test_epoch(2), false);
1920        hash_join.next_unwrap_ready_barrier()?;
1921
1922        // push the 2nd left chunk
1923        tx_l.push_chunk(chunk_l2);
1924        hash_join.next_unwrap_pending();
1925
1926        // push the 1st right chunk
1927        tx_r.push_chunk(chunk_r1);
1928        let chunk = hash_join.next_unwrap_ready_chunk()?;
1929        assert_eq!(
1930            chunk,
1931            StreamChunk::from_pretty(
1932                " I I I I
1933                + 2 5 2 7"
1934            )
1935        );
1936
1937        // push the 2nd right chunk
1938        tx_r.push_chunk(chunk_r2);
1939        let chunk = hash_join.next_unwrap_ready_chunk()?;
1940        assert_eq!(
1941            chunk,
1942            StreamChunk::from_pretty(
1943                " I I I I
1944                + . 6 . 10"
1945            )
1946        );
1947
1948        Ok(())
1949    }
1950
1951    #[tokio::test]
1952    async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1953        let chunk_l1 = StreamChunk::from_pretty(
1954            "  I I
1955             + 1 4
1956             + 2 5
1957             + 3 6",
1958        );
1959        let chunk_l2 = StreamChunk::from_pretty(
1960            "  I I
1961             + 3 8
1962             - 3 8",
1963        );
1964        let chunk_r1 = StreamChunk::from_pretty(
1965            "  I I
1966             + 2 7
1967             + 4 8
1968             + 6 9",
1969        );
1970        let chunk_r2 = StreamChunk::from_pretty(
1971            "  I  I
1972             + 3 10
1973             + 6 11",
1974        );
1975        let chunk_l3 = StreamChunk::from_pretty(
1976            "  I I
1977             + 6 10",
1978        );
1979        let chunk_r3 = StreamChunk::from_pretty(
1980            "  I  I
1981             - 6 11",
1982        );
1983        let chunk_r4 = StreamChunk::from_pretty(
1984            "  I  I
1985             - 6 9",
1986        );
1987        let (mut tx_l, mut tx_r, mut hash_join) =
1988            create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1989
1990        // push the init barrier for left and right
1991        tx_l.push_barrier(test_epoch(1), false);
1992        tx_r.push_barrier(test_epoch(1), false);
1993        hash_join.next_unwrap_ready_barrier()?;
1994
1995        // push the 1st left chunk
1996        tx_l.push_chunk(chunk_l1);
1997        hash_join.next_unwrap_pending();
1998
1999        // push the init barrier for left and right
2000        tx_l.push_barrier(test_epoch(2), false);
2001        tx_r.push_barrier(test_epoch(2), false);
2002        hash_join.next_unwrap_ready_barrier()?;
2003
2004        // push the 2nd left chunk
2005        tx_l.push_chunk(chunk_l2);
2006        hash_join.next_unwrap_pending();
2007
2008        // push the 1st right chunk
2009        tx_r.push_chunk(chunk_r1);
2010        let chunk = hash_join.next_unwrap_ready_chunk()?;
2011        assert_eq!(
2012            chunk,
2013            StreamChunk::from_pretty(
2014                " I I
2015                + 2 5"
2016            )
2017        );
2018
2019        // push the 2nd right chunk
2020        tx_r.push_chunk(chunk_r2);
2021        let chunk = hash_join.next_unwrap_ready_chunk()?;
2022        assert_eq!(
2023            chunk,
2024            StreamChunk::from_pretty(
2025                " I I
2026                + 3 6"
2027            )
2028        );
2029
2030        // push the 3rd left chunk (tests forward_exactly_once)
2031        tx_l.push_chunk(chunk_l3);
2032        let chunk = hash_join.next_unwrap_ready_chunk()?;
2033        assert_eq!(
2034            chunk,
2035            StreamChunk::from_pretty(
2036                " I I
2037                + 6 10"
2038            )
2039        );
2040
2041        // push the 3rd right chunk
2042        // (tests that no change if there are still matches)
2043        tx_r.push_chunk(chunk_r3);
2044        hash_join.next_unwrap_pending();
2045
2046        // push the 3rd left chunk
2047        // (tests that deletion occurs when there are no more matches)
2048        tx_r.push_chunk(chunk_r4);
2049        let chunk = hash_join.next_unwrap_ready_chunk()?;
2050        assert_eq!(
2051            chunk,
2052            StreamChunk::from_pretty(
2053                " I I
2054                - 6 10"
2055            )
2056        );
2057
2058        Ok(())
2059    }
2060
2061    #[tokio::test]
2062    async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
2063        let chunk_l1 = StreamChunk::from_pretty(
2064            "  I I
2065             + 1 4
2066             + 2 5
2067             + . 6",
2068        );
2069        let chunk_l2 = StreamChunk::from_pretty(
2070            "  I I
2071             + . 8
2072             - . 8",
2073        );
2074        let chunk_r1 = StreamChunk::from_pretty(
2075            "  I I
2076             + 2 7
2077             + 4 8
2078             + 6 9",
2079        );
2080        let chunk_r2 = StreamChunk::from_pretty(
2081            "  I  I
2082             + . 10
2083             + 6 11",
2084        );
2085        let chunk_l3 = StreamChunk::from_pretty(
2086            "  I I
2087             + 6 10",
2088        );
2089        let chunk_r3 = StreamChunk::from_pretty(
2090            "  I  I
2091             - 6 11",
2092        );
2093        let chunk_r4 = StreamChunk::from_pretty(
2094            "  I  I
2095             - 6 9",
2096        );
2097        let (mut tx_l, mut tx_r, mut hash_join) =
2098            create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
2099
2100        // push the init barrier for left and right
2101        tx_l.push_barrier(test_epoch(1), false);
2102        tx_r.push_barrier(test_epoch(1), false);
2103        hash_join.next_unwrap_ready_barrier()?;
2104
2105        // push the 1st left chunk
2106        tx_l.push_chunk(chunk_l1);
2107        hash_join.next_unwrap_pending();
2108
2109        // push the init barrier for left and right
2110        tx_l.push_barrier(test_epoch(2), false);
2111        tx_r.push_barrier(test_epoch(2), false);
2112        hash_join.next_unwrap_ready_barrier()?;
2113
2114        // push the 2nd left chunk
2115        tx_l.push_chunk(chunk_l2);
2116        hash_join.next_unwrap_pending();
2117
2118        // push the 1st right chunk
2119        tx_r.push_chunk(chunk_r1);
2120        let chunk = hash_join.next_unwrap_ready_chunk()?;
2121        assert_eq!(
2122            chunk,
2123            StreamChunk::from_pretty(
2124                " I I
2125                + 2 5"
2126            )
2127        );
2128
2129        // push the 2nd right chunk
2130        tx_r.push_chunk(chunk_r2);
2131        let chunk = hash_join.next_unwrap_ready_chunk()?;
2132        assert_eq!(
2133            chunk,
2134            StreamChunk::from_pretty(
2135                " I I
2136                + . 6"
2137            )
2138        );
2139
2140        // push the 3rd left chunk (tests forward_exactly_once)
2141        tx_l.push_chunk(chunk_l3);
2142        let chunk = hash_join.next_unwrap_ready_chunk()?;
2143        assert_eq!(
2144            chunk,
2145            StreamChunk::from_pretty(
2146                " I I
2147                + 6 10"
2148            )
2149        );
2150
2151        // push the 3rd right chunk
2152        // (tests that no change if there are still matches)
2153        tx_r.push_chunk(chunk_r3);
2154        hash_join.next_unwrap_pending();
2155
2156        // push the 3rd left chunk
2157        // (tests that deletion occurs when there are no more matches)
2158        tx_r.push_chunk(chunk_r4);
2159        let chunk = hash_join.next_unwrap_ready_chunk()?;
2160        assert_eq!(
2161            chunk,
2162            StreamChunk::from_pretty(
2163                " I I
2164                - 6 10"
2165            )
2166        );
2167
2168        Ok(())
2169    }
2170
2171    #[tokio::test]
2172    async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
2173        let chunk_l1 = StreamChunk::from_pretty(
2174            "  I I I
2175             + 1 4 1
2176             + 2 5 2
2177             + 3 6 3",
2178        );
2179        let chunk_l2 = StreamChunk::from_pretty(
2180            "  I I I
2181             + 4 9 4
2182             + 5 10 5",
2183        );
2184        let chunk_r1 = StreamChunk::from_pretty(
2185            "  I I I
2186             + 2 5 1
2187             + 4 9 2
2188             + 6 9 3",
2189        );
2190        let chunk_r2 = StreamChunk::from_pretty(
2191            "  I I I
2192             + 1 4 4
2193             + 3 6 5",
2194        );
2195
2196        let (mut tx_l, mut tx_r, mut hash_join) =
2197            create_append_only_executor::<{ JoinType::Inner }>(false).await;
2198
2199        // push the init barrier for left and right
2200        tx_l.push_barrier(test_epoch(1), false);
2201        tx_r.push_barrier(test_epoch(1), false);
2202        hash_join.next_unwrap_ready_barrier()?;
2203
2204        // push the 1st left chunk
2205        tx_l.push_chunk(chunk_l1);
2206        hash_join.next_unwrap_pending();
2207
2208        // push the init barrier for left and right
2209        tx_l.push_barrier(test_epoch(2), false);
2210        tx_r.push_barrier(test_epoch(2), false);
2211        hash_join.next_unwrap_ready_barrier()?;
2212
2213        // push the 2nd left chunk
2214        tx_l.push_chunk(chunk_l2);
2215        hash_join.next_unwrap_pending();
2216
2217        // push the 1st right chunk
2218        tx_r.push_chunk(chunk_r1);
2219        let chunk = hash_join.next_unwrap_ready_chunk()?;
2220        assert_eq!(
2221            chunk,
2222            StreamChunk::from_pretty(
2223                " I I I I I I
2224                + 2 5 2 2 5 1
2225                + 4 9 4 4 9 2"
2226            )
2227        );
2228
2229        // push the 2nd right chunk
2230        tx_r.push_chunk(chunk_r2);
2231        let chunk = hash_join.next_unwrap_ready_chunk()?;
2232        assert_eq!(
2233            chunk,
2234            StreamChunk::from_pretty(
2235                " I I I I I I
2236                + 1 4 1 1 4 4
2237                + 3 6 3 3 6 5"
2238            )
2239        );
2240
2241        Ok(())
2242    }
2243
2244    #[tokio::test]
2245    async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2246        let chunk_l1 = StreamChunk::from_pretty(
2247            "  I I I
2248             + 1 4 1
2249             + 2 5 2
2250             + 3 6 3",
2251        );
2252        let chunk_l2 = StreamChunk::from_pretty(
2253            "  I I I
2254             + 4 9 4
2255             + 5 10 5",
2256        );
2257        let chunk_r1 = StreamChunk::from_pretty(
2258            "  I I I
2259             + 2 5 1
2260             + 4 9 2
2261             + 6 9 3",
2262        );
2263        let chunk_r2 = StreamChunk::from_pretty(
2264            "  I I I
2265             + 1 4 4
2266             + 3 6 5",
2267        );
2268
2269        let (mut tx_l, mut tx_r, mut hash_join) =
2270            create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2271
2272        // push the init barrier for left and right
2273        tx_l.push_barrier(test_epoch(1), false);
2274        tx_r.push_barrier(test_epoch(1), false);
2275        hash_join.next_unwrap_ready_barrier()?;
2276
2277        // push the 1st left chunk
2278        tx_l.push_chunk(chunk_l1);
2279        hash_join.next_unwrap_pending();
2280
2281        // push the init barrier for left and right
2282        tx_l.push_barrier(test_epoch(2), false);
2283        tx_r.push_barrier(test_epoch(2), false);
2284        hash_join.next_unwrap_ready_barrier()?;
2285
2286        // push the 2nd left chunk
2287        tx_l.push_chunk(chunk_l2);
2288        hash_join.next_unwrap_pending();
2289
2290        // push the 1st right chunk
2291        tx_r.push_chunk(chunk_r1);
2292        let chunk = hash_join.next_unwrap_ready_chunk()?;
2293        assert_eq!(
2294            chunk,
2295            StreamChunk::from_pretty(
2296                " I I I
2297                + 2 5 2
2298                + 4 9 4"
2299            )
2300        );
2301
2302        // push the 2nd right chunk
2303        tx_r.push_chunk(chunk_r2);
2304        let chunk = hash_join.next_unwrap_ready_chunk()?;
2305        assert_eq!(
2306            chunk,
2307            StreamChunk::from_pretty(
2308                " I I I
2309                + 1 4 1
2310                + 3 6 3"
2311            )
2312        );
2313
2314        Ok(())
2315    }
2316
2317    #[tokio::test]
2318    async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2319        let chunk_l1 = StreamChunk::from_pretty(
2320            "  I I I
2321             + 1 4 1
2322             + 2 5 2
2323             + 3 6 3",
2324        );
2325        let chunk_l2 = StreamChunk::from_pretty(
2326            "  I I I
2327             + 4 9 4
2328             + 5 10 5",
2329        );
2330        let chunk_r1 = StreamChunk::from_pretty(
2331            "  I I I
2332             + 2 5 1
2333             + 4 9 2
2334             + 6 9 3",
2335        );
2336        let chunk_r2 = StreamChunk::from_pretty(
2337            "  I I I
2338             + 1 4 4
2339             + 3 6 5",
2340        );
2341
2342        let (mut tx_l, mut tx_r, mut hash_join) =
2343            create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2344
2345        // push the init barrier for left and right
2346        tx_l.push_barrier(test_epoch(1), false);
2347        tx_r.push_barrier(test_epoch(1), false);
2348        hash_join.next_unwrap_ready_barrier()?;
2349
2350        // push the 1st left chunk
2351        tx_l.push_chunk(chunk_l1);
2352        hash_join.next_unwrap_pending();
2353
2354        // push the init barrier for left and right
2355        tx_l.push_barrier(test_epoch(2), false);
2356        tx_r.push_barrier(test_epoch(2), false);
2357        hash_join.next_unwrap_ready_barrier()?;
2358
2359        // push the 2nd left chunk
2360        tx_l.push_chunk(chunk_l2);
2361        hash_join.next_unwrap_pending();
2362
2363        // push the 1st right chunk
2364        tx_r.push_chunk(chunk_r1);
2365        let chunk = hash_join.next_unwrap_ready_chunk()?;
2366        assert_eq!(
2367            chunk,
2368            StreamChunk::from_pretty(
2369                " I I I
2370                + 2 5 1
2371                + 4 9 2"
2372            )
2373        );
2374
2375        // push the 2nd right chunk
2376        tx_r.push_chunk(chunk_r2);
2377        let chunk = hash_join.next_unwrap_ready_chunk()?;
2378        assert_eq!(
2379            chunk,
2380            StreamChunk::from_pretty(
2381                " I I I
2382                + 1 4 4
2383                + 3 6 5"
2384            )
2385        );
2386
2387        Ok(())
2388    }
2389
2390    #[tokio::test]
2391    async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2392        let chunk_r1 = StreamChunk::from_pretty(
2393            "  I I
2394             + 1 4
2395             + 2 5
2396             + 3 6",
2397        );
2398        let chunk_r2 = StreamChunk::from_pretty(
2399            "  I I
2400             + 3 8
2401             - 3 8",
2402        );
2403        let chunk_l1 = StreamChunk::from_pretty(
2404            "  I I
2405             + 2 7
2406             + 4 8
2407             + 6 9",
2408        );
2409        let chunk_l2 = StreamChunk::from_pretty(
2410            "  I  I
2411             + 3 10
2412             + 6 11",
2413        );
2414        let chunk_r3 = StreamChunk::from_pretty(
2415            "  I I
2416             + 6 10",
2417        );
2418        let chunk_l3 = StreamChunk::from_pretty(
2419            "  I  I
2420             - 6 11",
2421        );
2422        let chunk_l4 = StreamChunk::from_pretty(
2423            "  I  I
2424             - 6 9",
2425        );
2426        let (mut tx_l, mut tx_r, mut hash_join) =
2427            create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2428
2429        // push the init barrier for left and right
2430        tx_l.push_barrier(test_epoch(1), false);
2431        tx_r.push_barrier(test_epoch(1), false);
2432        hash_join.next_unwrap_ready_barrier()?;
2433
2434        // push the 1st right chunk
2435        tx_r.push_chunk(chunk_r1);
2436        hash_join.next_unwrap_pending();
2437
2438        // push the init barrier for left and right
2439        tx_l.push_barrier(test_epoch(2), false);
2440        tx_r.push_barrier(test_epoch(2), false);
2441        hash_join.next_unwrap_ready_barrier()?;
2442
2443        // push the 2nd right chunk
2444        tx_r.push_chunk(chunk_r2);
2445        hash_join.next_unwrap_pending();
2446
2447        // push the 1st left chunk
2448        tx_l.push_chunk(chunk_l1);
2449        let chunk = hash_join.next_unwrap_ready_chunk()?;
2450        assert_eq!(
2451            chunk,
2452            StreamChunk::from_pretty(
2453                " I I
2454                + 2 5"
2455            )
2456        );
2457
2458        // push the 2nd left chunk
2459        tx_l.push_chunk(chunk_l2);
2460        let chunk = hash_join.next_unwrap_ready_chunk()?;
2461        assert_eq!(
2462            chunk,
2463            StreamChunk::from_pretty(
2464                " I I
2465                + 3 6"
2466            )
2467        );
2468
2469        // push the 3rd right chunk (tests forward_exactly_once)
2470        tx_r.push_chunk(chunk_r3);
2471        let chunk = hash_join.next_unwrap_ready_chunk()?;
2472        assert_eq!(
2473            chunk,
2474            StreamChunk::from_pretty(
2475                " I I
2476                + 6 10"
2477            )
2478        );
2479
2480        // push the 3rd left chunk
2481        // (tests that no change if there are still matches)
2482        tx_l.push_chunk(chunk_l3);
2483        hash_join.next_unwrap_pending();
2484
2485        // push the 3rd right chunk
2486        // (tests that deletion occurs when there are no more matches)
2487        tx_l.push_chunk(chunk_l4);
2488        let chunk = hash_join.next_unwrap_ready_chunk()?;
2489        assert_eq!(
2490            chunk,
2491            StreamChunk::from_pretty(
2492                " I I
2493                - 6 10"
2494            )
2495        );
2496
2497        Ok(())
2498    }
2499
2500    #[tokio::test]
2501    async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2502        let chunk_l1 = StreamChunk::from_pretty(
2503            "  I I
2504             + 1 4
2505             + 2 5
2506             + 3 6",
2507        );
2508        let chunk_l2 = StreamChunk::from_pretty(
2509            "  I I
2510             + 3 8
2511             - 3 8",
2512        );
2513        let chunk_r1 = StreamChunk::from_pretty(
2514            "  I I
2515             + 2 7
2516             + 4 8
2517             + 6 9",
2518        );
2519        let chunk_r2 = StreamChunk::from_pretty(
2520            "  I  I
2521             + 3 10
2522             + 6 11
2523             + 1 2
2524             + 1 3",
2525        );
2526        let chunk_l3 = StreamChunk::from_pretty(
2527            "  I I
2528             + 9 10",
2529        );
2530        let chunk_r3 = StreamChunk::from_pretty(
2531            "  I I
2532             - 1 2",
2533        );
2534        let chunk_r4 = StreamChunk::from_pretty(
2535            "  I I
2536             - 1 3",
2537        );
2538        let (mut tx_l, mut tx_r, mut hash_join) =
2539            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2540
2541        // push the init barrier for left and right
2542        tx_l.push_barrier(test_epoch(1), false);
2543        tx_r.push_barrier(test_epoch(1), false);
2544        hash_join.next_unwrap_ready_barrier()?;
2545
2546        // push the 1st left chunk
2547        tx_l.push_chunk(chunk_l1);
2548        let chunk = hash_join.next_unwrap_ready_chunk()?;
2549        assert_eq!(
2550            chunk,
2551            StreamChunk::from_pretty(
2552                " I I
2553                + 1 4
2554                + 2 5
2555                + 3 6",
2556            )
2557        );
2558
2559        // push the init barrier for left and right
2560        tx_l.push_barrier(test_epoch(2), false);
2561        tx_r.push_barrier(test_epoch(2), false);
2562        hash_join.next_unwrap_ready_barrier()?;
2563
2564        // push the 2nd left chunk
2565        tx_l.push_chunk(chunk_l2);
2566        let chunk = hash_join.next_unwrap_ready_chunk()?;
2567        assert_eq!(
2568            chunk,
2569            StreamChunk::from_pretty(
2570                "  I I
2571                 + 3 8 D
2572                 - 3 8 D",
2573            )
2574        );
2575
2576        // push the 1st right chunk
2577        tx_r.push_chunk(chunk_r1);
2578        let chunk = hash_join.next_unwrap_ready_chunk()?;
2579        assert_eq!(
2580            chunk,
2581            StreamChunk::from_pretty(
2582                " I I
2583                - 2 5"
2584            )
2585        );
2586
2587        // push the 2nd right chunk
2588        tx_r.push_chunk(chunk_r2);
2589        let chunk = hash_join.next_unwrap_ready_chunk()?;
2590        assert_eq!(
2591            chunk,
2592            StreamChunk::from_pretty(
2593                " I I
2594                - 3 6
2595                - 1 4"
2596            )
2597        );
2598
2599        // push the 3rd left chunk (tests forward_exactly_once)
2600        tx_l.push_chunk(chunk_l3);
2601        let chunk = hash_join.next_unwrap_ready_chunk()?;
2602        assert_eq!(
2603            chunk,
2604            StreamChunk::from_pretty(
2605                " I I
2606                + 9 10"
2607            )
2608        );
2609
2610        // push the 3rd right chunk
2611        // (tests that no change if there are still matches)
2612        tx_r.push_chunk(chunk_r3);
2613        hash_join.next_unwrap_pending();
2614
2615        // push the 4th right chunk
2616        // (tests that insertion occurs when there are no more matches)
2617        tx_r.push_chunk(chunk_r4);
2618        let chunk = hash_join.next_unwrap_ready_chunk()?;
2619        assert_eq!(
2620            chunk,
2621            StreamChunk::from_pretty(
2622                " I I
2623                + 1 4"
2624            )
2625        );
2626
2627        Ok(())
2628    }
2629
2630    #[tokio::test]
2631    async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2632        let chunk_r1 = StreamChunk::from_pretty(
2633            "  I I
2634             + 1 4
2635             + 2 5
2636             + 3 6",
2637        );
2638        let chunk_r2 = StreamChunk::from_pretty(
2639            "  I I
2640             + 3 8
2641             - 3 8",
2642        );
2643        let chunk_l1 = StreamChunk::from_pretty(
2644            "  I I
2645             + 2 7
2646             + 4 8
2647             + 6 9",
2648        );
2649        let chunk_l2 = StreamChunk::from_pretty(
2650            "  I  I
2651             + 3 10
2652             + 6 11
2653             + 1 2
2654             + 1 3",
2655        );
2656        let chunk_r3 = StreamChunk::from_pretty(
2657            "  I I
2658             + 9 10",
2659        );
2660        let chunk_l3 = StreamChunk::from_pretty(
2661            "  I I
2662             - 1 2",
2663        );
2664        let chunk_l4 = StreamChunk::from_pretty(
2665            "  I I
2666             - 1 3",
2667        );
2668        let (mut tx_r, mut tx_l, mut hash_join) =
2669            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2670
2671        // push the init barrier for left and right
2672        tx_r.push_barrier(test_epoch(1), false);
2673        tx_l.push_barrier(test_epoch(1), false);
2674        hash_join.next_unwrap_ready_barrier()?;
2675
2676        // push the 1st right chunk
2677        tx_r.push_chunk(chunk_r1);
2678        let chunk = hash_join.next_unwrap_ready_chunk()?;
2679        assert_eq!(
2680            chunk,
2681            StreamChunk::from_pretty(
2682                " I I
2683                + 1 4
2684                + 2 5
2685                + 3 6",
2686            )
2687        );
2688
2689        // push the init barrier for left and right
2690        tx_r.push_barrier(test_epoch(2), false);
2691        tx_l.push_barrier(test_epoch(2), false);
2692        hash_join.next_unwrap_ready_barrier()?;
2693
2694        // push the 2nd right chunk
2695        tx_r.push_chunk(chunk_r2);
2696        let chunk = hash_join.next_unwrap_ready_chunk()?;
2697        assert_eq!(
2698            chunk,
2699            StreamChunk::from_pretty(
2700                "  I I
2701                 + 3 8 D
2702                 - 3 8 D",
2703            )
2704        );
2705
2706        // push the 1st left chunk
2707        tx_l.push_chunk(chunk_l1);
2708        let chunk = hash_join.next_unwrap_ready_chunk()?;
2709        assert_eq!(
2710            chunk,
2711            StreamChunk::from_pretty(
2712                " I I
2713                - 2 5"
2714            )
2715        );
2716
2717        // push the 2nd left chunk
2718        tx_l.push_chunk(chunk_l2);
2719        let chunk = hash_join.next_unwrap_ready_chunk()?;
2720        assert_eq!(
2721            chunk,
2722            StreamChunk::from_pretty(
2723                " I I
2724                - 3 6
2725                - 1 4"
2726            )
2727        );
2728
2729        // push the 3rd right chunk (tests forward_exactly_once)
2730        tx_r.push_chunk(chunk_r3);
2731        let chunk = hash_join.next_unwrap_ready_chunk()?;
2732        assert_eq!(
2733            chunk,
2734            StreamChunk::from_pretty(
2735                " I I
2736                + 9 10"
2737            )
2738        );
2739
2740        // push the 3rd left chunk
2741        // (tests that no change if there are still matches)
2742        tx_l.push_chunk(chunk_l3);
2743        hash_join.next_unwrap_pending();
2744
2745        // push the 4th left chunk
2746        // (tests that insertion occurs when there are no more matches)
2747        tx_l.push_chunk(chunk_l4);
2748        let chunk = hash_join.next_unwrap_ready_chunk()?;
2749        assert_eq!(
2750            chunk,
2751            StreamChunk::from_pretty(
2752                " I I
2753                + 1 4"
2754            )
2755        );
2756
2757        Ok(())
2758    }
2759
2760    #[tokio::test]
2761    async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2762        let chunk_l1 = StreamChunk::from_pretty(
2763            "  I I
2764             + 1 4
2765             + 2 5
2766             + 3 6",
2767        );
2768        let chunk_l2 = StreamChunk::from_pretty(
2769            "  I I
2770             + 6 8
2771             + 3 8",
2772        );
2773        let chunk_r1 = StreamChunk::from_pretty(
2774            "  I I
2775             + 2 7
2776             + 4 8
2777             + 6 9",
2778        );
2779        let chunk_r2 = StreamChunk::from_pretty(
2780            "  I  I
2781             + 3 10
2782             + 6 11",
2783        );
2784        let (mut tx_l, mut tx_r, mut hash_join) =
2785            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2786
2787        // push the init barrier for left and right
2788        tx_l.push_barrier(test_epoch(1), false);
2789        tx_r.push_barrier(test_epoch(1), false);
2790        hash_join.next_unwrap_ready_barrier()?;
2791
2792        // push the 1st left chunk
2793        tx_l.push_chunk(chunk_l1);
2794        hash_join.next_unwrap_pending();
2795
2796        // push a barrier to left side
2797        tx_l.push_barrier(test_epoch(2), false);
2798
2799        // push the 2nd left chunk
2800        tx_l.push_chunk(chunk_l2);
2801
2802        // join the first right chunk
2803        tx_r.push_chunk(chunk_r1);
2804
2805        // Consume stream chunk
2806        let chunk = hash_join.next_unwrap_ready_chunk()?;
2807        assert_eq!(
2808            chunk,
2809            StreamChunk::from_pretty(
2810                " I I I I
2811                + 2 5 2 7"
2812            )
2813        );
2814
2815        // push a barrier to right side
2816        tx_r.push_barrier(test_epoch(2), false);
2817
2818        // get the aligned barrier here
2819        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2820        assert!(matches!(
2821            hash_join.next_unwrap_ready_barrier()?,
2822            Barrier {
2823                epoch,
2824                mutation: None,
2825                ..
2826            } if epoch == expected_epoch
2827        ));
2828
2829        // join the 2nd left chunk
2830        let chunk = hash_join.next_unwrap_ready_chunk()?;
2831        assert_eq!(
2832            chunk,
2833            StreamChunk::from_pretty(
2834                " I I I I
2835                + 6 8 6 9"
2836            )
2837        );
2838
2839        // push the 2nd right chunk
2840        tx_r.push_chunk(chunk_r2);
2841        let chunk = hash_join.next_unwrap_ready_chunk()?;
2842        assert_eq!(
2843            chunk,
2844            StreamChunk::from_pretty(
2845                " I I I I
2846                + 3 6 3 10
2847                + 3 8 3 10
2848                + 6 8 6 11"
2849            )
2850        );
2851
2852        Ok(())
2853    }
2854
2855    #[tokio::test]
2856    async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2857        let chunk_l1 = StreamChunk::from_pretty(
2858            "  I I
2859             + 1 4
2860             + 2 .
2861             + 3 .",
2862        );
2863        let chunk_l2 = StreamChunk::from_pretty(
2864            "  I I
2865             + 6 .
2866             + 3 8",
2867        );
2868        let chunk_r1 = StreamChunk::from_pretty(
2869            "  I I
2870             + 2 7
2871             + 4 8
2872             + 6 9",
2873        );
2874        let chunk_r2 = StreamChunk::from_pretty(
2875            "  I  I
2876             + 3 10
2877             + 6 11",
2878        );
2879        let (mut tx_l, mut tx_r, mut hash_join) =
2880            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2881
2882        // push the init barrier for left and right
2883        tx_l.push_barrier(test_epoch(1), false);
2884        tx_r.push_barrier(test_epoch(1), false);
2885        hash_join.next_unwrap_ready_barrier()?;
2886
2887        // push the 1st left chunk
2888        tx_l.push_chunk(chunk_l1);
2889        hash_join.next_unwrap_pending();
2890
2891        // push a barrier to left side
2892        tx_l.push_barrier(test_epoch(2), false);
2893
2894        // push the 2nd left chunk
2895        tx_l.push_chunk(chunk_l2);
2896
2897        // join the first right chunk
2898        tx_r.push_chunk(chunk_r1);
2899
2900        // Consume stream chunk
2901        let chunk = hash_join.next_unwrap_ready_chunk()?;
2902        assert_eq!(
2903            chunk,
2904            StreamChunk::from_pretty(
2905                " I I I I
2906                + 2 . 2 7"
2907            )
2908        );
2909
2910        // push a barrier to right side
2911        tx_r.push_barrier(test_epoch(2), false);
2912
2913        // get the aligned barrier here
2914        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2915        assert!(matches!(
2916            hash_join.next_unwrap_ready_barrier()?,
2917            Barrier {
2918                epoch,
2919                mutation: None,
2920                ..
2921            } if epoch == expected_epoch
2922        ));
2923
2924        // join the 2nd left chunk
2925        let chunk = hash_join.next_unwrap_ready_chunk()?;
2926        assert_eq!(
2927            chunk,
2928            StreamChunk::from_pretty(
2929                " I I I I
2930                + 6 . 6 9"
2931            )
2932        );
2933
2934        // push the 2nd right chunk
2935        tx_r.push_chunk(chunk_r2);
2936        let chunk = hash_join.next_unwrap_ready_chunk()?;
2937        assert_eq!(
2938            chunk,
2939            StreamChunk::from_pretty(
2940                " I I I I
2941                + 3 8 3 10
2942                + 3 . 3 10
2943                + 6 . 6 11"
2944            )
2945        );
2946
2947        Ok(())
2948    }
2949
2950    #[tokio::test]
2951    async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2952        let chunk_l1 = StreamChunk::from_pretty(
2953            "  I I
2954             + 1 4
2955             + 2 5
2956             + 3 6",
2957        );
2958        let chunk_l2 = StreamChunk::from_pretty(
2959            "  I I
2960             + 3 8
2961             - 3 8",
2962        );
2963        let chunk_r1 = StreamChunk::from_pretty(
2964            "  I I
2965             + 2 7
2966             + 4 8
2967             + 6 9",
2968        );
2969        let chunk_r2 = StreamChunk::from_pretty(
2970            "  I  I
2971             + 3 10
2972             + 6 11",
2973        );
2974        let (mut tx_l, mut tx_r, mut hash_join) =
2975            create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2976
2977        // push the init barrier for left and right
2978        tx_l.push_barrier(test_epoch(1), false);
2979        tx_r.push_barrier(test_epoch(1), false);
2980        hash_join.next_unwrap_ready_barrier()?;
2981
2982        // push the 1st left chunk
2983        tx_l.push_chunk(chunk_l1);
2984        let chunk = hash_join.next_unwrap_ready_chunk()?;
2985        assert_eq!(
2986            chunk,
2987            StreamChunk::from_pretty(
2988                " I I I I
2989                + 1 4 . .
2990                + 2 5 . .
2991                + 3 6 . ."
2992            )
2993        );
2994
2995        // push the 2nd left chunk
2996        tx_l.push_chunk(chunk_l2);
2997        let chunk = hash_join.next_unwrap_ready_chunk()?;
2998        assert_eq!(
2999            chunk,
3000            StreamChunk::from_pretty(
3001                " I I I I
3002                + 3 8 . . D
3003                - 3 8 . . D"
3004            )
3005        );
3006
3007        // push the 1st right chunk
3008        tx_r.push_chunk(chunk_r1);
3009        let chunk = hash_join.next_unwrap_ready_chunk()?;
3010        assert_eq!(
3011            chunk,
3012            StreamChunk::from_pretty(
3013                " I I I I
3014                - 2 5 . .
3015                + 2 5 2 7"
3016            )
3017        );
3018
3019        // push the 2nd right chunk
3020        tx_r.push_chunk(chunk_r2);
3021        let chunk = hash_join.next_unwrap_ready_chunk()?;
3022        assert_eq!(
3023            chunk,
3024            StreamChunk::from_pretty(
3025                " I I I I
3026                - 3 6 . .
3027                + 3 6 3 10"
3028            )
3029        );
3030
3031        Ok(())
3032    }
3033
3034    #[tokio::test]
3035    async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
3036        let chunk_l1 = StreamChunk::from_pretty(
3037            "  I I
3038             + 1 4
3039             + 2 5
3040             + . 6",
3041        );
3042        let chunk_l2 = StreamChunk::from_pretty(
3043            "  I I
3044             + . 8
3045             - . 8",
3046        );
3047        let chunk_r1 = StreamChunk::from_pretty(
3048            "  I I
3049             + 2 7
3050             + 4 8
3051             + 6 9",
3052        );
3053        let chunk_r2 = StreamChunk::from_pretty(
3054            "  I  I
3055             + . 10
3056             + 6 11",
3057        );
3058        let (mut tx_l, mut tx_r, mut hash_join) =
3059            create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
3060
3061        // push the init barrier for left and right
3062        tx_l.push_barrier(test_epoch(1), false);
3063        tx_r.push_barrier(test_epoch(1), false);
3064        hash_join.next_unwrap_ready_barrier()?;
3065
3066        // push the 1st left chunk
3067        tx_l.push_chunk(chunk_l1);
3068        let chunk = hash_join.next_unwrap_ready_chunk()?;
3069        assert_eq!(
3070            chunk,
3071            StreamChunk::from_pretty(
3072                " I I I I
3073                + 1 4 . .
3074                + 2 5 . .
3075                + . 6 . ."
3076            )
3077        );
3078
3079        // push the 2nd left chunk
3080        tx_l.push_chunk(chunk_l2);
3081        let chunk = hash_join.next_unwrap_ready_chunk()?;
3082        assert_eq!(
3083            chunk,
3084            StreamChunk::from_pretty(
3085                " I I I I
3086                + . 8 . . D
3087                - . 8 . . D"
3088            )
3089        );
3090
3091        // push the 1st right chunk
3092        tx_r.push_chunk(chunk_r1);
3093        let chunk = hash_join.next_unwrap_ready_chunk()?;
3094        assert_eq!(
3095            chunk,
3096            StreamChunk::from_pretty(
3097                " I I I I
3098                - 2 5 . .
3099                + 2 5 2 7"
3100            )
3101        );
3102
3103        // push the 2nd right chunk
3104        tx_r.push_chunk(chunk_r2);
3105        let chunk = hash_join.next_unwrap_ready_chunk()?;
3106        assert_eq!(
3107            chunk,
3108            StreamChunk::from_pretty(
3109                " I I I I
3110                - . 6 . .
3111                + . 6 . 10"
3112            )
3113        );
3114
3115        Ok(())
3116    }
3117
3118    #[tokio::test]
3119    async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
3120        let chunk_l1 = StreamChunk::from_pretty(
3121            "  I I
3122             + 1 4
3123             + 2 5
3124             + 3 6",
3125        );
3126        let chunk_l2 = StreamChunk::from_pretty(
3127            "  I I
3128             + 3 8
3129             - 3 8",
3130        );
3131        let chunk_r1 = StreamChunk::from_pretty(
3132            "  I I
3133             + 2 7
3134             + 4 8
3135             + 6 9",
3136        );
3137        let chunk_r2 = StreamChunk::from_pretty(
3138            "  I  I
3139             + 5 10
3140             - 5 10",
3141        );
3142        let (mut tx_l, mut tx_r, mut hash_join) =
3143            create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
3144
3145        // push the init barrier for left and right
3146        tx_l.push_barrier(test_epoch(1), false);
3147        tx_r.push_barrier(test_epoch(1), false);
3148        hash_join.next_unwrap_ready_barrier()?;
3149
3150        // push the 1st left chunk
3151        tx_l.push_chunk(chunk_l1);
3152        hash_join.next_unwrap_pending();
3153
3154        // push the 2nd left chunk
3155        tx_l.push_chunk(chunk_l2);
3156        hash_join.next_unwrap_pending();
3157
3158        // push the 1st right chunk
3159        tx_r.push_chunk(chunk_r1);
3160        let chunk = hash_join.next_unwrap_ready_chunk()?;
3161        assert_eq!(
3162            chunk,
3163            StreamChunk::from_pretty(
3164                " I I I I
3165                + 2 5 2 7
3166                + . . 4 8
3167                + . . 6 9"
3168            )
3169        );
3170
3171        // push the 2nd right chunk
3172        tx_r.push_chunk(chunk_r2);
3173        let chunk = hash_join.next_unwrap_ready_chunk()?;
3174        assert_eq!(
3175            chunk,
3176            StreamChunk::from_pretty(
3177                " I I I I
3178                + . . 5 10 D
3179                - . . 5 10 D"
3180            )
3181        );
3182
3183        Ok(())
3184    }
3185
3186    #[tokio::test]
3187    async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3188        let chunk_l1 = StreamChunk::from_pretty(
3189            "  I I I
3190             + 1 4 1
3191             + 2 5 2
3192             + 3 6 3",
3193        );
3194        let chunk_l2 = StreamChunk::from_pretty(
3195            "  I I I
3196             + 4 9 4
3197             + 5 10 5",
3198        );
3199        let chunk_r1 = StreamChunk::from_pretty(
3200            "  I I I
3201             + 2 5 1
3202             + 4 9 2
3203             + 6 9 3",
3204        );
3205        let chunk_r2 = StreamChunk::from_pretty(
3206            "  I I I
3207             + 1 4 4
3208             + 3 6 5",
3209        );
3210
3211        let (mut tx_l, mut tx_r, mut hash_join) =
3212            create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3213
3214        // push the init barrier for left and right
3215        tx_l.push_barrier(test_epoch(1), false);
3216        tx_r.push_barrier(test_epoch(1), false);
3217        hash_join.next_unwrap_ready_barrier()?;
3218
3219        // push the 1st left chunk
3220        tx_l.push_chunk(chunk_l1);
3221        let chunk = hash_join.next_unwrap_ready_chunk()?;
3222        assert_eq!(
3223            chunk,
3224            StreamChunk::from_pretty(
3225                " I I I I I I
3226                + 1 4 1 . . .
3227                + 2 5 2 . . .
3228                + 3 6 3 . . ."
3229            )
3230        );
3231
3232        // push the 2nd left chunk
3233        tx_l.push_chunk(chunk_l2);
3234        let chunk = hash_join.next_unwrap_ready_chunk()?;
3235        assert_eq!(
3236            chunk,
3237            StreamChunk::from_pretty(
3238                " I I I I I I
3239                + 4 9 4 . . .
3240                + 5 10 5 . . ."
3241            )
3242        );
3243
3244        // push the 1st right chunk
3245        tx_r.push_chunk(chunk_r1);
3246        let chunk = hash_join.next_unwrap_ready_chunk()?;
3247        assert_eq!(
3248            chunk,
3249            StreamChunk::from_pretty(
3250                " I I I I I I
3251                - 2 5 2 . . .
3252                + 2 5 2 2 5 1
3253                - 4 9 4 . . .
3254                + 4 9 4 4 9 2"
3255            )
3256        );
3257
3258        // push the 2nd right chunk
3259        tx_r.push_chunk(chunk_r2);
3260        let chunk = hash_join.next_unwrap_ready_chunk()?;
3261        assert_eq!(
3262            chunk,
3263            StreamChunk::from_pretty(
3264                " I I I I I I
3265                - 1 4 1 . . .
3266                + 1 4 1 1 4 4
3267                - 3 6 3 . . .
3268                + 3 6 3 3 6 5"
3269            )
3270        );
3271
3272        Ok(())
3273    }
3274
3275    #[tokio::test]
3276    async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3277        let chunk_l1 = StreamChunk::from_pretty(
3278            "  I I I
3279             + 1 4 1
3280             + 2 5 2
3281             + 3 6 3",
3282        );
3283        let chunk_l2 = StreamChunk::from_pretty(
3284            "  I I I
3285             + 4 9 4
3286             + 5 10 5",
3287        );
3288        let chunk_r1 = StreamChunk::from_pretty(
3289            "  I I I
3290             + 2 5 1
3291             + 4 9 2
3292             + 6 9 3",
3293        );
3294        let chunk_r2 = StreamChunk::from_pretty(
3295            "  I I I
3296             + 1 4 4
3297             + 3 6 5
3298             + 7 7 6",
3299        );
3300
3301        let (mut tx_l, mut tx_r, mut hash_join) =
3302            create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3303
3304        // push the init barrier for left and right
3305        tx_l.push_barrier(test_epoch(1), false);
3306        tx_r.push_barrier(test_epoch(1), false);
3307        hash_join.next_unwrap_ready_barrier()?;
3308
3309        // push the 1st left chunk
3310        tx_l.push_chunk(chunk_l1);
3311        hash_join.next_unwrap_pending();
3312
3313        // push the 2nd left chunk
3314        tx_l.push_chunk(chunk_l2);
3315        hash_join.next_unwrap_pending();
3316
3317        // push the 1st right chunk
3318        tx_r.push_chunk(chunk_r1);
3319        let chunk = hash_join.next_unwrap_ready_chunk()?;
3320        assert_eq!(
3321            chunk,
3322            StreamChunk::from_pretty(
3323                "  I I I I I I
3324                + 2 5 2 2 5 1
3325                + 4 9 4 4 9 2
3326                + . . . 6 9 3"
3327            )
3328        );
3329
3330        // push the 2nd right chunk
3331        tx_r.push_chunk(chunk_r2);
3332        let chunk = hash_join.next_unwrap_ready_chunk()?;
3333        assert_eq!(
3334            chunk,
3335            StreamChunk::from_pretty(
3336                "  I I I I I I
3337                + 1 4 1 1 4 4
3338                + 3 6 3 3 6 5
3339                + . . . 7 7 6"
3340            )
3341        );
3342
3343        Ok(())
3344    }
3345
3346    #[tokio::test]
3347    async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3348        let chunk_l1 = StreamChunk::from_pretty(
3349            "  I I
3350             + 1 4
3351             + 2 5
3352             + 3 6",
3353        );
3354        let chunk_l2 = StreamChunk::from_pretty(
3355            "  I I
3356             + 3 8
3357             - 3 8",
3358        );
3359        let chunk_r1 = StreamChunk::from_pretty(
3360            "  I I
3361             + 2 7
3362             + 4 8
3363             + 6 9",
3364        );
3365        let chunk_r2 = StreamChunk::from_pretty(
3366            "  I  I
3367             + 5 10
3368             - 5 10",
3369        );
3370        let (mut tx_l, mut tx_r, mut hash_join) =
3371            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3372
3373        // push the init barrier for left and right
3374        tx_l.push_barrier(test_epoch(1), false);
3375        tx_r.push_barrier(test_epoch(1), false);
3376        hash_join.next_unwrap_ready_barrier()?;
3377
3378        // push the 1st left chunk
3379        tx_l.push_chunk(chunk_l1);
3380        let chunk = hash_join.next_unwrap_ready_chunk()?;
3381        assert_eq!(
3382            chunk,
3383            StreamChunk::from_pretty(
3384                " I I I I
3385                + 1 4 . .
3386                + 2 5 . .
3387                + 3 6 . ."
3388            )
3389        );
3390
3391        // push the 2nd left chunk
3392        tx_l.push_chunk(chunk_l2);
3393        let chunk = hash_join.next_unwrap_ready_chunk()?;
3394        assert_eq!(
3395            chunk,
3396            StreamChunk::from_pretty(
3397                " I I I I
3398                + 3 8 . . D
3399                - 3 8 . . D"
3400            )
3401        );
3402
3403        // push the 1st right chunk
3404        tx_r.push_chunk(chunk_r1);
3405        let chunk = hash_join.next_unwrap_ready_chunk()?;
3406        assert_eq!(
3407            chunk,
3408            StreamChunk::from_pretty(
3409                " I I I I
3410                - 2 5 . .
3411                + 2 5 2 7
3412                + . . 4 8
3413                + . . 6 9"
3414            )
3415        );
3416
3417        // push the 2nd right chunk
3418        tx_r.push_chunk(chunk_r2);
3419        let chunk = hash_join.next_unwrap_ready_chunk()?;
3420        assert_eq!(
3421            chunk,
3422            StreamChunk::from_pretty(
3423                " I I I I
3424                + . . 5 10 D
3425                - . . 5 10 D"
3426            )
3427        );
3428
3429        Ok(())
3430    }
3431
3432    #[tokio::test]
3433    async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3434        let (mut tx_l, mut tx_r, mut hash_join) =
3435            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3436
3437        // push the init barrier for left and right
3438        tx_l.push_barrier(test_epoch(1), false);
3439        tx_r.push_barrier(test_epoch(1), false);
3440        hash_join.next_unwrap_ready_barrier()?;
3441
3442        tx_l.push_chunk(StreamChunk::from_pretty(
3443            "  I I
3444             + 1 1
3445            ",
3446        ));
3447        let chunk = hash_join.next_unwrap_ready_chunk()?;
3448        assert_eq!(
3449            chunk,
3450            StreamChunk::from_pretty(
3451                " I I I I
3452                + 1 1 . ."
3453            )
3454        );
3455
3456        tx_r.push_chunk(StreamChunk::from_pretty(
3457            "  I I
3458             + 1 1
3459            ",
3460        ));
3461        let chunk = hash_join.next_unwrap_ready_chunk()?;
3462
3463        assert_eq!(
3464            chunk,
3465            StreamChunk::from_pretty(
3466                " I I I I
3467                - 1 1 . .
3468                + 1 1 1 1"
3469            )
3470        );
3471
3472        tx_l.push_chunk(StreamChunk::from_pretty(
3473            "   I I
3474              - 1 1
3475              + 1 2
3476            ",
3477        ));
3478        let chunk = hash_join.next_unwrap_ready_chunk()?;
3479        let chunk = chunk.compact_vis();
3480        assert_eq!(
3481            chunk,
3482            StreamChunk::from_pretty(
3483                " I I I I
3484                - 1 1 1 1
3485                + 1 2 1 1
3486                "
3487            )
3488        );
3489
3490        Ok(())
3491    }
3492
3493    #[tokio::test]
3494    async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3495    {
3496        let chunk_l1 = StreamChunk::from_pretty(
3497            "  I I
3498             + 1 4
3499             + 2 5
3500             + 3 6
3501             + 3 7",
3502        );
3503        let chunk_l2 = StreamChunk::from_pretty(
3504            "  I I
3505             + 3 8
3506             - 3 8
3507             - 1 4", // delete row to cause an empty JoinHashEntry
3508        );
3509        let chunk_r1 = StreamChunk::from_pretty(
3510            "  I I
3511             + 2 6
3512             + 4 8
3513             + 3 4",
3514        );
3515        let chunk_r2 = StreamChunk::from_pretty(
3516            "  I  I
3517             + 5 10
3518             - 5 10
3519             + 1 2",
3520        );
3521        let (mut tx_l, mut tx_r, mut hash_join) =
3522            create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3523
3524        // push the init barrier for left and right
3525        tx_l.push_barrier(test_epoch(1), false);
3526        tx_r.push_barrier(test_epoch(1), false);
3527        hash_join.next_unwrap_ready_barrier()?;
3528
3529        // push the 1st left chunk
3530        tx_l.push_chunk(chunk_l1);
3531        let chunk = hash_join.next_unwrap_ready_chunk()?;
3532        assert_eq!(
3533            chunk,
3534            StreamChunk::from_pretty(
3535                " I I I I
3536                + 1 4 . .
3537                + 2 5 . .
3538                + 3 6 . .
3539                + 3 7 . ."
3540            )
3541        );
3542
3543        // push the 2nd left chunk
3544        tx_l.push_chunk(chunk_l2);
3545        let chunk = hash_join.next_unwrap_ready_chunk()?;
3546        assert_eq!(
3547            chunk,
3548            StreamChunk::from_pretty(
3549                " I I I I
3550                + 3 8 . . D
3551                - 3 8 . . D
3552                - 1 4 . ."
3553            )
3554        );
3555
3556        // push the 1st right chunk
3557        tx_r.push_chunk(chunk_r1);
3558        let chunk = hash_join.next_unwrap_ready_chunk()?;
3559        assert_eq!(
3560            chunk,
3561            StreamChunk::from_pretty(
3562                " I I I I
3563                - 2 5 . .
3564                + 2 5 2 6
3565                + . . 4 8
3566                + . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once
3567                            * despite matching on eq join on 2
3568                            * entries */
3569            )
3570        );
3571
3572        // push the 2nd right chunk
3573        tx_r.push_chunk(chunk_r2);
3574        let chunk = hash_join.next_unwrap_ready_chunk()?;
3575        assert_eq!(
3576            chunk,
3577            StreamChunk::from_pretty(
3578                " I I I I
3579                + . . 5 10 D
3580                - . . 5 10 D
3581                + . . 1 2" /* regression test (#2420): 1 2 forwarded even if matches on an empty
3582                            * join entry */
3583            )
3584        );
3585
3586        Ok(())
3587    }
3588
3589    #[tokio::test]
3590    async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3591        let chunk_l1 = StreamChunk::from_pretty(
3592            "  I I
3593             + 1 4
3594             + 2 10
3595             + 3 6",
3596        );
3597        let chunk_l2 = StreamChunk::from_pretty(
3598            "  I I
3599             + 3 8
3600             - 3 8",
3601        );
3602        let chunk_r1 = StreamChunk::from_pretty(
3603            "  I I
3604             + 2 7
3605             + 4 8
3606             + 6 9",
3607        );
3608        let chunk_r2 = StreamChunk::from_pretty(
3609            "  I  I
3610             + 3 10
3611             + 6 11",
3612        );
3613        let (mut tx_l, mut tx_r, mut hash_join) =
3614            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3615
3616        // push the init barrier for left and right
3617        tx_l.push_barrier(test_epoch(1), false);
3618        tx_r.push_barrier(test_epoch(1), false);
3619        hash_join.next_unwrap_ready_barrier()?;
3620
3621        // push the 1st left chunk
3622        tx_l.push_chunk(chunk_l1);
3623        hash_join.next_unwrap_pending();
3624
3625        // push the 2nd left chunk
3626        tx_l.push_chunk(chunk_l2);
3627        hash_join.next_unwrap_pending();
3628
3629        // push the 1st right chunk
3630        tx_r.push_chunk(chunk_r1);
3631        hash_join.next_unwrap_pending();
3632
3633        // push the 2nd right chunk
3634        tx_r.push_chunk(chunk_r2);
3635        let chunk = hash_join.next_unwrap_ready_chunk()?;
3636        assert_eq!(
3637            chunk,
3638            StreamChunk::from_pretty(
3639                " I I I I
3640                + 3 6 3 10"
3641            )
3642        );
3643
3644        Ok(())
3645    }
3646
3647    #[tokio::test]
3648    async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3649        let (mut tx_l, mut tx_r, mut hash_join) =
3650            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3651
3652        // push the init barrier for left and right
3653        tx_l.push_barrier(test_epoch(1), false);
3654        tx_r.push_barrier(test_epoch(1), false);
3655        hash_join.next_unwrap_ready_barrier()?;
3656
3657        tx_l.push_int64_watermark(0, 100);
3658
3659        tx_l.push_int64_watermark(0, 200);
3660
3661        tx_l.push_barrier(test_epoch(2), false);
3662        tx_r.push_barrier(test_epoch(2), false);
3663        hash_join.next_unwrap_ready_barrier()?;
3664
3665        tx_r.push_int64_watermark(0, 50);
3666
3667        let w1 = hash_join.next().await.unwrap().unwrap();
3668        let w1 = w1.as_watermark().unwrap();
3669
3670        let w2 = hash_join.next().await.unwrap().unwrap();
3671        let w2 = w2.as_watermark().unwrap();
3672
3673        tx_r.push_int64_watermark(0, 100);
3674
3675        let w3 = hash_join.next().await.unwrap().unwrap();
3676        let w3 = w3.as_watermark().unwrap();
3677
3678        let w4 = hash_join.next().await.unwrap().unwrap();
3679        let w4 = w4.as_watermark().unwrap();
3680
3681        assert_eq!(
3682            w1,
3683            &Watermark {
3684                col_idx: 2,
3685                data_type: DataType::Int64,
3686                val: ScalarImpl::Int64(50)
3687            }
3688        );
3689
3690        assert_eq!(
3691            w2,
3692            &Watermark {
3693                col_idx: 0,
3694                data_type: DataType::Int64,
3695                val: ScalarImpl::Int64(50)
3696            }
3697        );
3698
3699        assert_eq!(
3700            w3,
3701            &Watermark {
3702                col_idx: 2,
3703                data_type: DataType::Int64,
3704                val: ScalarImpl::Int64(100)
3705            }
3706        );
3707
3708        assert_eq!(
3709            w4,
3710            &Watermark {
3711                col_idx: 0,
3712                data_type: DataType::Int64,
3713                val: ScalarImpl::Int64(100)
3714            }
3715        );
3716
3717        Ok(())
3718    }
3719
3720    async fn create_executor_with_evict_interval<const T: JoinTypePrimitive>(
3721        evict_interval: u32,
3722    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
3723        let schema = Schema {
3724            fields: vec![
3725                Field::unnamed(DataType::Int64), // join key
3726                Field::unnamed(DataType::Int64),
3727            ],
3728        };
3729        let (tx_l, source_l) = MockSource::channel();
3730        let source_l = source_l.into_executor(schema.clone(), vec![1]);
3731        let (tx_r, source_r) = MockSource::channel();
3732        let source_r = source_r.into_executor(schema, vec![1]);
3733        let params_l = JoinParams::new(vec![0], vec![1]);
3734        let params_r = JoinParams::new(vec![0], vec![1]);
3735
3736        let mem_state = MemoryStateStore::new();
3737
3738        let (state_l, degree_state_l) = create_in_memory_state_table(
3739            mem_state.clone(),
3740            &[DataType::Int64, DataType::Int64],
3741            &[OrderType::ascending(), OrderType::ascending()],
3742            &[0, 1],
3743            0,
3744        )
3745        .await;
3746
3747        let (state_r, degree_state_r) = create_in_memory_state_table(
3748            mem_state,
3749            &[DataType::Int64, DataType::Int64],
3750            &[OrderType::ascending(), OrderType::ascending()],
3751            &[0, 1],
3752            2,
3753        )
3754        .await;
3755
3756        let schema = match T {
3757            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
3758            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
3759            _ => [source_l.schema().fields(), source_r.schema().fields()]
3760                .concat()
3761                .into_iter()
3762                .collect(),
3763        };
3764        let schema_len = schema.len();
3765        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
3766
3767        let mut streaming_config = StreamingConfig::default();
3768        streaming_config.developer.join_hash_map_evict_interval_rows = evict_interval;
3769
3770        let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
3771            ActorContext::for_test_with_config(123, streaming_config),
3772            info,
3773            source_l,
3774            source_r,
3775            params_l,
3776            params_r,
3777            vec![false],
3778            (0..schema_len).collect_vec(),
3779            None,
3780            vec![],
3781            state_l,
3782            degree_state_l,
3783            state_r,
3784            degree_state_r,
3785            Arc::new(AtomicU64::new(0)),
3786            false,
3787            Arc::new(StreamingMetrics::unused()),
3788            1024,
3789            2048,
3790            vec![(0, true)],
3791        );
3792        (tx_l, tx_r, executor.boxed().execute())
3793    }
3794
3795    /// Tests that a hash join executor with `join_hash_map_evict_interval_rows = 0`
3796    /// (periodic eviction disabled) produces correct results.
3797    #[tokio::test]
3798    async fn test_hash_join_evict_interval_disabled() -> StreamExecutorResult<()> {
3799        let chunk_l = StreamChunk::from_pretty(
3800            "  I I
3801             + 1 4
3802             + 2 5
3803             + 3 6",
3804        );
3805        let chunk_r = StreamChunk::from_pretty(
3806            "  I I
3807             + 2 7
3808             + 3 8",
3809        );
3810
3811        // 0 disables periodic eviction
3812        let (mut tx_l, mut tx_r, mut hash_join) =
3813            create_executor_with_evict_interval::<{ JoinType::Inner }>(0).await;
3814
3815        tx_l.push_barrier(test_epoch(1), false);
3816        tx_r.push_barrier(test_epoch(1), false);
3817        hash_join.next_unwrap_ready_barrier()?;
3818
3819        tx_l.push_chunk(chunk_l);
3820        hash_join.next_unwrap_pending();
3821
3822        tx_r.push_chunk(chunk_r);
3823        let chunk = hash_join.next_unwrap_ready_chunk()?;
3824        assert_eq!(
3825            chunk,
3826            StreamChunk::from_pretty(
3827                " I I I I
3828                + 2 5 2 7
3829                + 3 6 3 8"
3830            )
3831        );
3832
3833        Ok(())
3834    }
3835
3836    /// Tests that a hash join executor with `join_hash_map_evict_interval_rows = 1`
3837    /// (evict after every row) produces correct results.
3838    #[tokio::test]
3839    async fn test_hash_join_evict_interval_one() -> StreamExecutorResult<()> {
3840        let chunk_l = StreamChunk::from_pretty(
3841            "  I I
3842             + 1 4
3843             + 2 5
3844             + 3 6",
3845        );
3846        let chunk_r = StreamChunk::from_pretty(
3847            "  I I
3848             + 2 7
3849             + 3 8",
3850        );
3851
3852        // evict after every row
3853        let (mut tx_l, mut tx_r, mut hash_join) =
3854            create_executor_with_evict_interval::<{ JoinType::Inner }>(1).await;
3855
3856        tx_l.push_barrier(test_epoch(1), false);
3857        tx_r.push_barrier(test_epoch(1), false);
3858        hash_join.next_unwrap_ready_barrier()?;
3859
3860        tx_l.push_chunk(chunk_l);
3861        hash_join.next_unwrap_pending();
3862
3863        tx_r.push_chunk(chunk_r);
3864        let chunk = hash_join.next_unwrap_ready_chunk()?;
3865        assert_eq!(
3866            chunk,
3867            StreamChunk::from_pretty(
3868                " I I I I
3869                + 2 5 2 7
3870                + 3 6 3 8"
3871            )
3872        );
3873
3874        Ok(())
3875    }
3876
3877    /// Tests that a hash join executor with a custom `join_hash_map_evict_interval_rows`
3878    /// value produces correct results.
3879    #[tokio::test]
3880    async fn test_hash_join_evict_interval_custom() -> StreamExecutorResult<()> {
3881        let chunk_l = StreamChunk::from_pretty(
3882            "  I I
3883             + 1 4
3884             + 2 5
3885             + 3 6
3886             + 4 7
3887             + 5 8",
3888        );
3889        let chunk_r = StreamChunk::from_pretty(
3890            "  I I
3891             + 1 9
3892             + 3 10
3893             + 5 11",
3894        );
3895
3896        // evict every 2 rows
3897        let (mut tx_l, mut tx_r, mut hash_join) =
3898            create_executor_with_evict_interval::<{ JoinType::Inner }>(2).await;
3899
3900        tx_l.push_barrier(test_epoch(1), false);
3901        tx_r.push_barrier(test_epoch(1), false);
3902        hash_join.next_unwrap_ready_barrier()?;
3903
3904        tx_l.push_chunk(chunk_l);
3905        hash_join.next_unwrap_pending();
3906
3907        tx_r.push_chunk(chunk_r);
3908        let chunk = hash_join.next_unwrap_ready_chunk()?;
3909        assert_eq!(
3910            chunk,
3911            StreamChunk::from_pretty(
3912                " I I I I
3913                + 1 4 1 9
3914                + 3 6 3 10
3915                + 5 8 5 11"
3916            )
3917        );
3918
3919        Ok(())
3920    }
3921}