risingwave_stream/executor/
hash_join.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{BTreeMap, HashSet};
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use anyhow::Context;
21use either::Either;
22use itertools::Itertools;
23use multimap::MultiMap;
24use risingwave_common::array::Op;
25use risingwave_common::hash::{HashKey, NullBitmap};
26use risingwave_common::row::RowExt;
27use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::iter_util::ZipEqDebug;
30use risingwave_expr::expr::NonStrictExpression;
31use risingwave_pb::stream_plan::InequalityType;
32use tokio::time::Instant;
33
34use self::builder::JoinChunkBuilder;
35use super::barrier_align::*;
36use super::join::hash_join::*;
37use super::join::row::{JoinEncoding, JoinRow};
38use super::join::*;
39use super::watermark::*;
40use crate::executor::CachedJoinRow;
41use crate::executor::join::builder::JoinStreamChunkBuilder;
42use crate::executor::join::hash_join::CacheResult;
43use crate::executor::prelude::*;
44
45/// Evict the cache every n rows.
46const EVICT_EVERY_N_ROWS: u32 = 16;
47
48fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
49    HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
50}
51
52/// Information about an inequality condition in the join.
53/// Represents: `left_col <op> right_col` where op is one of `<`, `<=`, `>`, `>=`.
54#[derive(Debug, Clone)]
55pub struct InequalityPairInfo {
56    /// Index of the left side column (from left input).
57    pub left_idx: usize,
58    /// Index of the right side column (from right input).
59    pub right_idx: usize,
60    /// Whether this condition is used to clean left state table.
61    pub clean_left_state: bool,
62    /// Whether this condition is used to clean right state table.
63    pub clean_right_state: bool,
64    /// Comparison operator.
65    pub op: InequalityType,
66}
67
68impl InequalityPairInfo {
69    /// Returns true if the left side has larger values based on the operator.
70    /// For `>` and `>=`, left side is larger.
71    /// State cleanup applies to the side with larger values.
72    pub fn left_side_is_larger(&self) -> bool {
73        matches!(
74            self.op,
75            InequalityType::GreaterThan | InequalityType::GreaterThanOrEqual
76        )
77    }
78}
79
80pub struct JoinParams {
81    /// Indices of the join keys
82    pub join_key_indices: Vec<usize>,
83    /// Indices of the input pk after dedup
84    pub deduped_pk_indices: Vec<usize>,
85}
86
87impl JoinParams {
88    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
89        Self {
90            join_key_indices,
91            deduped_pk_indices,
92        }
93    }
94}
95
96struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
97    /// Store all data from a one side stream
98    ht: JoinHashMap<K, S, E>,
99    /// Indices of the join key columns
100    join_key_indices: Vec<usize>,
101    /// The data type of all columns without degree.
102    all_data_types: Vec<DataType>,
103    /// The start position for the side in output new columns
104    start_pos: usize,
105    /// The mapping from input indices of a side to output columns.
106    i2o_mapping: Vec<(usize, usize)>,
107    i2o_mapping_indexed: MultiMap<usize, usize>,
108    /// The first field of the ith element indicates that when a watermark at the ith column of
109    /// this side comes, what band join conditions should be updated in order to possibly
110    /// generate a new watermark at that column or the corresponding column in the counterpart
111    /// join side.
112    ///
113    /// The second field indicates that whether the column is required less than the
114    /// the corresponding column in the counterpart join side in the band join condition.
115    input2inequality_index: Vec<Vec<(usize, bool)>>,
116    /// Some fields which are required non null to match due to inequalities.
117    non_null_fields: Vec<usize>,
118    /// (i, j) in this `Vec` means that state data in this join side can be cleaned if the value of
119    /// its ith column is less than the synthetic watermark of the jth band join condition.
120    state_clean_columns: Vec<(usize, usize)>,
121    /// Whether degree table is needed for this side.
122    need_degree_table: bool,
123    _marker: std::marker::PhantomData<E>,
124}
125
126impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        f.debug_struct("JoinSide")
129            .field("join_key_indices", &self.join_key_indices)
130            .field("col_types", &self.all_data_types)
131            .field("start_pos", &self.start_pos)
132            .field("i2o_mapping", &self.i2o_mapping)
133            .field("need_degree_table", &self.need_degree_table)
134            .finish()
135    }
136}
137
138impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
139    // WARNING: Please do not call this until we implement it.
140    fn is_dirty(&self) -> bool {
141        unimplemented!()
142    }
143
144    #[expect(dead_code)]
145    fn clear_cache(&mut self) {
146        assert!(
147            !self.is_dirty(),
148            "cannot clear cache while states of hash join are dirty"
149        );
150
151        // TODO: not working with rearranged chain
152        // self.ht.clear();
153    }
154
155    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
156        self.ht.init(epoch).await
157    }
158}
159
160/// `HashJoinExecutor` takes two input streams and runs equal hash join on them.
161/// The output columns are the concatenation of left and right columns.
162pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
163{
164    ctx: ActorContextRef,
165    info: ExecutorInfo,
166
167    /// Left input executor
168    input_l: Option<Executor>,
169    /// Right input executor
170    input_r: Option<Executor>,
171    /// The data types of the formed new columns
172    actual_output_data_types: Vec<DataType>,
173    /// The parameters of the left join executor
174    side_l: JoinSide<K, S, E>,
175    /// The parameters of the right join executor
176    side_r: JoinSide<K, S, E>,
177    /// Optional non-equi join conditions
178    cond: Option<NonStrictExpression>,
179    /// Inequality pairs with output column indices.
180    /// Each entry contains: (`output_indices`, `InequalityPairInfo`).
181    inequality_pairs: Vec<(Vec<usize>, InequalityPairInfo)>,
182    /// The output watermark of each inequality condition and its value is the minimum of the
183    /// watermarks from both sides. It will be used to generate watermark into downstream
184    /// and do state cleaning based on `clean_left_state`/`clean_right_state` fields.
185    inequality_watermarks: Vec<Option<Watermark>>,
186
187    /// Whether the logic can be optimized for append-only stream
188    append_only_optimize: bool,
189
190    metrics: Arc<StreamingMetrics>,
191    /// The maximum size of the chunk produced by executor at a time
192    chunk_size: usize,
193    /// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
194    cnt_rows_received: u32,
195
196    /// watermark column index -> `BufferedWatermarks`
197    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
198
199    /// When to alert high join amplification
200    high_join_amplification_threshold: usize,
201
202    /// Max number of rows that will be cached in the entry state.
203    entry_state_max_rows: usize,
204}
205
206impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
207    for HashJoinExecutor<K, S, T, E>
208{
209    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210        f.debug_struct("HashJoinExecutor")
211            .field("join_type", &T)
212            .field("input_left", &self.input_l.as_ref().unwrap().identity())
213            .field("input_right", &self.input_r.as_ref().unwrap().identity())
214            .field("side_l", &self.side_l)
215            .field("side_r", &self.side_r)
216            .field("stream_key", &self.info.stream_key)
217            .field("schema", &self.info.schema)
218            .field("actual_output_data_types", &self.actual_output_data_types)
219            .finish()
220    }
221}
222
223impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
224    for HashJoinExecutor<K, S, T, E>
225{
226    fn execute(self: Box<Self>) -> BoxedMessageStream {
227        self.into_stream().boxed()
228    }
229}
230
231struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
232    ctx: &'a ActorContextRef,
233    side_l: &'a mut JoinSide<K, S, E>,
234    side_r: &'a mut JoinSide<K, S, E>,
235    actual_output_data_types: &'a [DataType],
236    cond: &'a mut Option<NonStrictExpression>,
237    inequality_watermarks: &'a [Option<Watermark>],
238    chunk: StreamChunk,
239    append_only_optimize: bool,
240    chunk_size: usize,
241    cnt_rows_received: &'a mut u32,
242    high_join_amplification_threshold: usize,
243    entry_state_max_rows: usize,
244}
245
246impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
247    HashJoinExecutor<K, S, T, E>
248{
249    #[allow(clippy::too_many_arguments)]
250    pub fn new(
251        ctx: ActorContextRef,
252        info: ExecutorInfo,
253        input_l: Executor,
254        input_r: Executor,
255        params_l: JoinParams,
256        params_r: JoinParams,
257        null_safe: Vec<bool>,
258        output_indices: Vec<usize>,
259        cond: Option<NonStrictExpression>,
260        inequality_pairs: Vec<InequalityPairInfo>,
261        state_table_l: StateTable<S>,
262        degree_state_table_l: StateTable<S>,
263        state_table_r: StateTable<S>,
264        degree_state_table_r: StateTable<S>,
265        watermark_epoch: AtomicU64Ref,
266        is_append_only: bool,
267        metrics: Arc<StreamingMetrics>,
268        chunk_size: usize,
269        high_join_amplification_threshold: usize,
270    ) -> Self {
271        Self::new_with_cache_size(
272            ctx,
273            info,
274            input_l,
275            input_r,
276            params_l,
277            params_r,
278            null_safe,
279            output_indices,
280            cond,
281            inequality_pairs,
282            state_table_l,
283            degree_state_table_l,
284            state_table_r,
285            degree_state_table_r,
286            watermark_epoch,
287            is_append_only,
288            metrics,
289            chunk_size,
290            high_join_amplification_threshold,
291            None,
292        )
293    }
294
295    #[allow(clippy::too_many_arguments)]
296    pub fn new_with_cache_size(
297        ctx: ActorContextRef,
298        info: ExecutorInfo,
299        input_l: Executor,
300        input_r: Executor,
301        params_l: JoinParams,
302        params_r: JoinParams,
303        null_safe: Vec<bool>,
304        output_indices: Vec<usize>,
305        cond: Option<NonStrictExpression>,
306        inequality_pairs: Vec<InequalityPairInfo>,
307        state_table_l: StateTable<S>,
308        degree_state_table_l: StateTable<S>,
309        state_table_r: StateTable<S>,
310        degree_state_table_r: StateTable<S>,
311        watermark_epoch: AtomicU64Ref,
312        is_append_only: bool,
313        metrics: Arc<StreamingMetrics>,
314        chunk_size: usize,
315        high_join_amplification_threshold: usize,
316        entry_state_max_rows: Option<usize>,
317    ) -> Self {
318        let entry_state_max_rows = match entry_state_max_rows {
319            None => ctx.config.developer.hash_join_entry_state_max_rows,
320            Some(entry_state_max_rows) => entry_state_max_rows,
321        };
322        let side_l_column_n = input_l.schema().len();
323
324        let schema_fields = match T {
325            JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
326            JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
327            _ => [
328                input_l.schema().fields.clone(),
329                input_r.schema().fields.clone(),
330            ]
331            .concat(),
332        };
333
334        let original_output_data_types = schema_fields
335            .iter()
336            .map(|field| field.data_type())
337            .collect_vec();
338        let actual_output_data_types = output_indices
339            .iter()
340            .map(|&idx| original_output_data_types[idx].clone())
341            .collect_vec();
342
343        // Data types of of hash join state.
344        let state_all_data_types_l = input_l.schema().data_types();
345        let state_all_data_types_r = input_r.schema().data_types();
346
347        let state_pk_indices_l = input_l.stream_key().to_vec();
348        let state_pk_indices_r = input_r.stream_key().to_vec();
349
350        let state_join_key_indices_l = params_l.join_key_indices;
351        let state_join_key_indices_r = params_r.join_key_indices;
352
353        let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
354        let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
355
356        let degree_pk_indices_l = (state_join_key_indices_l.len()
357            ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
358            .collect_vec();
359        let degree_pk_indices_r = (state_join_key_indices_r.len()
360            ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
361            .collect_vec();
362
363        // If pk is contained in join key.
364        let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
365        let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
366
367        // check whether join key contains pk in both side
368        let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
369
370        let join_key_data_types_l = state_join_key_indices_l
371            .iter()
372            .map(|idx| state_all_data_types_l[*idx].clone())
373            .collect_vec();
374
375        let join_key_data_types_r = state_join_key_indices_r
376            .iter()
377            .map(|idx| state_all_data_types_r[*idx].clone())
378            .collect_vec();
379
380        assert_eq!(join_key_data_types_l, join_key_data_types_r);
381
382        let null_matched = K::Bitmap::from_bool_vec(null_safe);
383
384        let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
385        let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
386
387        let degree_state_l = need_degree_table_l.then(|| {
388            TableInner::new(
389                degree_pk_indices_l,
390                degree_join_key_indices_l,
391                degree_state_table_l,
392            )
393        });
394        let degree_state_r = need_degree_table_r.then(|| {
395            TableInner::new(
396                degree_pk_indices_r,
397                degree_join_key_indices_r,
398                degree_state_table_r,
399            )
400        });
401
402        let (left_to_output, right_to_output) = {
403            let (left_len, right_len) = if is_left_semi_or_anti(T) {
404                (state_all_data_types_l.len(), 0usize)
405            } else if is_right_semi_or_anti(T) {
406                (0usize, state_all_data_types_r.len())
407            } else {
408                (state_all_data_types_l.len(), state_all_data_types_r.len())
409            };
410            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
411        };
412
413        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
414        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
415
416        let left_input_len = input_l.schema().len();
417        let right_input_len = input_r.schema().len();
418        let mut l2inequality_index = vec![vec![]; left_input_len];
419        let mut r2inequality_index = vec![vec![]; right_input_len];
420        let mut l_state_clean_columns = vec![];
421        let mut r_state_clean_columns = vec![];
422        let inequality_pairs: Vec<(Vec<usize>, InequalityPairInfo)> = inequality_pairs
423            .into_iter()
424            .enumerate()
425            .map(|(index, pair)| {
426                // Map input columns to inequality index
427                // The second field indicates whether this column is required to be less than
428                // the corresponding column on the other side.
429                // For `left >= right`: left is NOT less than right, right IS less than left
430                // For `left <= right`: left IS less than right, right is NOT less than left
431                let left_is_larger = pair.left_side_is_larger();
432                l2inequality_index[pair.left_idx].push((index, !left_is_larger));
433                r2inequality_index[pair.right_idx].push((index, left_is_larger));
434
435                // Determine state cleanup columns
436                if pair.clean_left_state {
437                    l_state_clean_columns.push((pair.left_idx, index));
438                }
439                if pair.clean_right_state {
440                    r_state_clean_columns.push((pair.right_idx, index));
441                }
442
443                // Get output indices for watermark emission
444                // We only emit watermarks for the LARGER side's output columns
445                let output_indices = if pair.left_side_is_larger() {
446                    // Left is larger, emit for left output columns
447                    l2o_indexed
448                        .get_vec(&pair.left_idx)
449                        .cloned()
450                        .unwrap_or_default()
451                } else {
452                    // Right is larger, emit for right output columns
453                    r2o_indexed
454                        .get_vec(&pair.right_idx)
455                        .cloned()
456                        .unwrap_or_default()
457                };
458
459                (output_indices, pair)
460            })
461            .collect_vec();
462
463        let mut l_non_null_fields = l2inequality_index
464            .iter()
465            .positions(|inequalities| !inequalities.is_empty())
466            .collect_vec();
467        let mut r_non_null_fields = r2inequality_index
468            .iter()
469            .positions(|inequalities| !inequalities.is_empty())
470            .collect_vec();
471
472        if append_only_optimize {
473            l_state_clean_columns.clear();
474            r_state_clean_columns.clear();
475            l_non_null_fields.clear();
476            r_non_null_fields.clear();
477        }
478
479        let inequality_watermarks = vec![None; inequality_pairs.len()];
480        let watermark_buffers = BTreeMap::new();
481
482        Self {
483            ctx: ctx.clone(),
484            info,
485            input_l: Some(input_l),
486            input_r: Some(input_r),
487            actual_output_data_types,
488            side_l: JoinSide {
489                ht: JoinHashMap::new(
490                    watermark_epoch.clone(),
491                    join_key_data_types_l,
492                    state_join_key_indices_l.clone(),
493                    state_all_data_types_l.clone(),
494                    state_table_l,
495                    params_l.deduped_pk_indices,
496                    degree_state_l,
497                    null_matched.clone(),
498                    pk_contained_in_jk_l,
499                    None,
500                    metrics.clone(),
501                    ctx.id,
502                    ctx.fragment_id,
503                    "left",
504                ),
505                join_key_indices: state_join_key_indices_l,
506                all_data_types: state_all_data_types_l,
507                i2o_mapping: left_to_output,
508                i2o_mapping_indexed: l2o_indexed,
509                input2inequality_index: l2inequality_index,
510                non_null_fields: l_non_null_fields,
511                state_clean_columns: l_state_clean_columns,
512                start_pos: 0,
513                need_degree_table: need_degree_table_l,
514                _marker: PhantomData,
515            },
516            side_r: JoinSide {
517                ht: JoinHashMap::new(
518                    watermark_epoch,
519                    join_key_data_types_r,
520                    state_join_key_indices_r.clone(),
521                    state_all_data_types_r.clone(),
522                    state_table_r,
523                    params_r.deduped_pk_indices,
524                    degree_state_r,
525                    null_matched,
526                    pk_contained_in_jk_r,
527                    None,
528                    metrics.clone(),
529                    ctx.id,
530                    ctx.fragment_id,
531                    "right",
532                ),
533                join_key_indices: state_join_key_indices_r,
534                all_data_types: state_all_data_types_r,
535                start_pos: side_l_column_n,
536                i2o_mapping: right_to_output,
537                i2o_mapping_indexed: r2o_indexed,
538                input2inequality_index: r2inequality_index,
539                non_null_fields: r_non_null_fields,
540                state_clean_columns: r_state_clean_columns,
541                need_degree_table: need_degree_table_r,
542                _marker: PhantomData,
543            },
544            cond,
545            inequality_pairs,
546            inequality_watermarks,
547            append_only_optimize,
548            metrics,
549            chunk_size,
550            cnt_rows_received: 0,
551            watermark_buffers,
552            high_join_amplification_threshold,
553            entry_state_max_rows,
554        }
555    }
556
557    #[try_stream(ok = Message, error = StreamExecutorError)]
558    async fn into_stream(mut self) {
559        let input_l = self.input_l.take().unwrap();
560        let input_r = self.input_r.take().unwrap();
561        let aligned_stream = barrier_align(
562            input_l.execute(),
563            input_r.execute(),
564            self.ctx.id,
565            self.ctx.fragment_id,
566            self.metrics.clone(),
567            "Join",
568        );
569        pin_mut!(aligned_stream);
570
571        let actor_id = self.ctx.id;
572
573        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
574        let first_epoch = barrier.epoch;
575        // The first barrier message should be propagated.
576        yield Message::Barrier(barrier);
577        self.side_l.init(first_epoch).await?;
578        self.side_r.init(first_epoch).await?;
579
580        let actor_id_str = self.ctx.id.to_string();
581        let fragment_id_str = self.ctx.fragment_id.to_string();
582
583        // initialized some metrics
584        let join_actor_input_waiting_duration_ns = self
585            .metrics
586            .join_actor_input_waiting_duration_ns
587            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
588        let left_join_match_duration_ns = self
589            .metrics
590            .join_match_duration_ns
591            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
592        let right_join_match_duration_ns = self
593            .metrics
594            .join_match_duration_ns
595            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
596
597        let barrier_join_match_duration_ns = self
598            .metrics
599            .join_match_duration_ns
600            .with_guarded_label_values(&[
601                actor_id_str.as_str(),
602                fragment_id_str.as_str(),
603                "barrier",
604            ]);
605
606        let left_join_cached_entry_count = self
607            .metrics
608            .join_cached_entry_count
609            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
610
611        let right_join_cached_entry_count = self
612            .metrics
613            .join_cached_entry_count
614            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
615
616        let mut start_time = Instant::now();
617
618        while let Some(msg) = aligned_stream
619            .next()
620            .instrument_await("hash_join_barrier_align")
621            .await
622        {
623            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
624            match msg? {
625                AlignedMessage::WatermarkLeft(watermark) => {
626                    for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
627                        yield Message::Watermark(watermark_to_emit);
628                    }
629                }
630                AlignedMessage::WatermarkRight(watermark) => {
631                    for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
632                        yield Message::Watermark(watermark_to_emit);
633                    }
634                }
635                AlignedMessage::Left(chunk) => {
636                    let mut left_time = Duration::from_nanos(0);
637                    let mut left_start_time = Instant::now();
638                    #[for_await]
639                    for chunk in Self::eq_join_left(EqJoinArgs {
640                        ctx: &self.ctx,
641                        side_l: &mut self.side_l,
642                        side_r: &mut self.side_r,
643                        actual_output_data_types: &self.actual_output_data_types,
644                        cond: &mut self.cond,
645                        inequality_watermarks: &self.inequality_watermarks,
646                        chunk,
647                        append_only_optimize: self.append_only_optimize,
648                        chunk_size: self.chunk_size,
649                        cnt_rows_received: &mut self.cnt_rows_received,
650                        high_join_amplification_threshold: self.high_join_amplification_threshold,
651                        entry_state_max_rows: self.entry_state_max_rows,
652                    }) {
653                        left_time += left_start_time.elapsed();
654                        yield Message::Chunk(chunk?);
655                        left_start_time = Instant::now();
656                    }
657                    left_time += left_start_time.elapsed();
658                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
659                    self.try_flush_data().await?;
660                }
661                AlignedMessage::Right(chunk) => {
662                    let mut right_time = Duration::from_nanos(0);
663                    let mut right_start_time = Instant::now();
664                    #[for_await]
665                    for chunk in Self::eq_join_right(EqJoinArgs {
666                        ctx: &self.ctx,
667                        side_l: &mut self.side_l,
668                        side_r: &mut self.side_r,
669                        actual_output_data_types: &self.actual_output_data_types,
670                        cond: &mut self.cond,
671                        inequality_watermarks: &self.inequality_watermarks,
672                        chunk,
673                        append_only_optimize: self.append_only_optimize,
674                        chunk_size: self.chunk_size,
675                        cnt_rows_received: &mut self.cnt_rows_received,
676                        high_join_amplification_threshold: self.high_join_amplification_threshold,
677                        entry_state_max_rows: self.entry_state_max_rows,
678                    }) {
679                        right_time += right_start_time.elapsed();
680                        yield Message::Chunk(chunk?);
681                        right_start_time = Instant::now();
682                    }
683                    right_time += right_start_time.elapsed();
684                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
685                    self.try_flush_data().await?;
686                }
687                AlignedMessage::Barrier(barrier) => {
688                    let barrier_start_time = Instant::now();
689                    let (left_post_commit, right_post_commit) =
690                        self.flush_data(barrier.epoch).await?;
691
692                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
693
694                    // We don't include the time post yielding barrier because the vnode update
695                    // is a one-off and rare operation.
696                    barrier_join_match_duration_ns
697                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
698                    yield Message::Barrier(barrier);
699
700                    // Update the vnode bitmap for state tables of both sides if asked.
701                    right_post_commit
702                        .post_yield_barrier(update_vnode_bitmap.clone())
703                        .await?;
704                    if left_post_commit
705                        .post_yield_barrier(update_vnode_bitmap)
706                        .await?
707                        .unwrap_or(false)
708                    {
709                        self.watermark_buffers
710                            .values_mut()
711                            .for_each(|buffers| buffers.clear());
712                        self.inequality_watermarks.fill(None);
713                    }
714
715                    // Report metrics of cached join rows/entries
716                    for (join_cached_entry_count, ht) in [
717                        (&left_join_cached_entry_count, &self.side_l.ht),
718                        (&right_join_cached_entry_count, &self.side_r.ht),
719                    ] {
720                        join_cached_entry_count.set(ht.entry_count() as i64);
721                    }
722                }
723            }
724            start_time = Instant::now();
725        }
726    }
727
728    async fn flush_data(
729        &mut self,
730        epoch: EpochPair,
731    ) -> StreamExecutorResult<(
732        JoinHashMapPostCommit<'_, K, S, E>,
733        JoinHashMapPostCommit<'_, K, S, E>,
734    )> {
735        // All changes to the state has been buffered in the mem-table of the state table. Just
736        // `commit` them here.
737        let left = self.side_l.ht.flush(epoch).await?;
738        let right = self.side_r.ht.flush(epoch).await?;
739        Ok((left, right))
740    }
741
742    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
743        // All changes to the state has been buffered in the mem-table of the state table. Just
744        // `commit` them here.
745        self.side_l.ht.try_flush().await?;
746        self.side_r.ht.try_flush().await?;
747        Ok(())
748    }
749
750    // We need to manually evict the cache.
751    fn evict_cache(
752        side_update: &mut JoinSide<K, S, E>,
753        side_match: &mut JoinSide<K, S, E>,
754        cnt_rows_received: &mut u32,
755    ) {
756        *cnt_rows_received += 1;
757        if *cnt_rows_received == EVICT_EVERY_N_ROWS {
758            side_update.ht.evict();
759            side_match.ht.evict();
760            *cnt_rows_received = 0;
761        }
762    }
763
764    fn handle_watermark(
765        &mut self,
766        side: SideTypePrimitive,
767        watermark: Watermark,
768    ) -> StreamExecutorResult<Vec<Watermark>> {
769        let (side_update, side_match) = if side == SideType::Left {
770            (&mut self.side_l, &mut self.side_r)
771        } else {
772            (&mut self.side_r, &mut self.side_l)
773        };
774
775        // State cleaning
776        if side_update.join_key_indices[0] == watermark.col_idx {
777            side_match.ht.update_watermark(watermark.val.clone());
778        }
779
780        // Select watermarks to yield.
781        let wm_in_jk = side_update
782            .join_key_indices
783            .iter()
784            .positions(|idx| *idx == watermark.col_idx);
785        let mut watermarks_to_emit = vec![];
786        for idx in wm_in_jk {
787            let buffers = self
788                .watermark_buffers
789                .entry(idx)
790                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
791            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
792                let empty_indices = vec![];
793                let output_indices = side_update
794                    .i2o_mapping_indexed
795                    .get_vec(&side_update.join_key_indices[idx])
796                    .unwrap_or(&empty_indices)
797                    .iter()
798                    .chain(
799                        side_match
800                            .i2o_mapping_indexed
801                            .get_vec(&side_match.join_key_indices[idx])
802                            .unwrap_or(&empty_indices),
803                    );
804                for output_idx in output_indices {
805                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
806                }
807            };
808        }
809        // Process inequality watermarks
810        // We can only yield the LARGER side's watermark downstream.
811        // For `left >= right`: left is larger, emit for left output columns
812        // For `left <= right`: right is larger, emit for right output columns
813        let mut update_left_watermark = None;
814        let mut update_right_watermark = None;
815        if let Some(watermark_indices) = side_update.input2inequality_index.get(watermark.col_idx) {
816            for (inequality_index, _) in watermark_indices {
817                let buffers = self
818                    .watermark_buffers
819                    .entry(side_update.join_key_indices.len() + inequality_index)
820                    .or_insert_with(|| {
821                        BufferedWatermarks::with_ids([SideType::Left, SideType::Right])
822                    });
823                if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone())
824                {
825                    let (output_indices, pair_info) = &self.inequality_pairs[*inequality_index];
826                    let left_is_larger = pair_info.left_side_is_larger();
827
828                    // Emit watermark for the larger side's output columns only
829                    for output_idx in output_indices {
830                        watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
831                    }
832                    // Store watermark for state cleaning (via useful_state_clean_columns)
833                    self.inequality_watermarks[*inequality_index] =
834                        Some(selected_watermark.clone());
835
836                    // Mark which side needs state cleaning (only if the clean flag is set)
837                    if left_is_larger && pair_info.clean_left_state {
838                        update_left_watermark = Some(selected_watermark.val.clone());
839                    } else if !left_is_larger && pair_info.clean_right_state {
840                        update_right_watermark = Some(selected_watermark.val.clone());
841                    }
842                }
843            }
844            // Do state cleaning by update_watermark on the larger side (after the loop to avoid borrow issues)
845            // TODO(yuhao-su): implement state cleaning
846            if let Some(_val) = update_left_watermark {
847                // self.side_l.ht.update_watermark(val);
848            }
849            if let Some(_val) = update_right_watermark {
850                // self.side_r.ht.update_watermark(val);
851            }
852        }
853        Ok(watermarks_to_emit)
854    }
855
856    fn row_concat(
857        row_update: impl Row,
858        update_start_pos: usize,
859        row_matched: impl Row,
860        matched_start_pos: usize,
861    ) -> OwnedRow {
862        let mut new_row = vec![None; row_update.len() + row_matched.len()];
863
864        for (i, datum_ref) in row_update.iter().enumerate() {
865            new_row[i + update_start_pos] = datum_ref.to_owned_datum();
866        }
867        for (i, datum_ref) in row_matched.iter().enumerate() {
868            new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
869        }
870        OwnedRow::new(new_row)
871    }
872
873    /// Used to forward `eq_join_oneside` to show join side in stack.
874    fn eq_join_left(
875        args: EqJoinArgs<'_, K, S, E>,
876    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
877        Self::eq_join_oneside::<{ SideType::Left }>(args)
878    }
879
880    /// Used to forward `eq_join_oneside` to show join side in stack.
881    fn eq_join_right(
882        args: EqJoinArgs<'_, K, S, E>,
883    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
884        Self::eq_join_oneside::<{ SideType::Right }>(args)
885    }
886
887    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
888    async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
889        let EqJoinArgs {
890            ctx,
891            side_l,
892            side_r,
893            actual_output_data_types,
894            cond,
895            inequality_watermarks,
896            chunk,
897            append_only_optimize,
898            chunk_size,
899            cnt_rows_received,
900            high_join_amplification_threshold,
901            entry_state_max_rows,
902            ..
903        } = args;
904
905        let (side_update, side_match) = if SIDE == SideType::Left {
906            (side_l, side_r)
907        } else {
908            (side_r, side_l)
909        };
910
911        let useful_state_clean_columns = side_match
912            .state_clean_columns
913            .iter()
914            .filter_map(|(column_idx, inequality_index)| {
915                inequality_watermarks[*inequality_index]
916                    .as_ref()
917                    .map(|watermark| (*column_idx, watermark))
918            })
919            .collect_vec();
920
921        let mut hashjoin_chunk_builder =
922            JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
923                chunk_size,
924                actual_output_data_types.to_vec(),
925                side_update.i2o_mapping.clone(),
926                side_match.i2o_mapping.clone(),
927            ));
928
929        let join_matched_join_keys = ctx
930            .streaming_metrics
931            .join_matched_join_keys
932            .with_guarded_label_values(&[
933                &ctx.id.to_string(),
934                &ctx.fragment_id.to_string(),
935                &side_update.ht.table_id().to_string(),
936            ]);
937
938        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
939        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
940            let Some((op, row)) = r else {
941                continue;
942            };
943            Self::evict_cache(side_update, side_match, cnt_rows_received);
944
945            let cache_lookup_result = {
946                let probe_non_null_requirement_satisfied = side_update
947                    .non_null_fields
948                    .iter()
949                    .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
950                let build_non_null_requirement_satisfied =
951                    key.null_bitmap().is_subset(side_match.ht.null_matched());
952                if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
953                    side_match.ht.take_state_opt(key)
954                } else {
955                    CacheResult::NeverMatch
956                }
957            };
958            let mut total_matches = 0;
959
960            macro_rules! match_rows {
961                ($op:ident) => {
962                    Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
963                        cache_lookup_result,
964                        row,
965                        key,
966                        &mut hashjoin_chunk_builder,
967                        side_match,
968                        side_update,
969                        &useful_state_clean_columns,
970                        cond,
971                        append_only_optimize,
972                        entry_state_max_rows,
973                    )
974                };
975            }
976
977            match op {
978                Op::Insert | Op::UpdateInsert =>
979                {
980                    #[for_await]
981                    for chunk in match_rows!(Insert) {
982                        let chunk = chunk?;
983                        total_matches += chunk.cardinality();
984                        yield chunk;
985                    }
986                }
987                Op::Delete | Op::UpdateDelete =>
988                {
989                    #[for_await]
990                    for chunk in match_rows!(Delete) {
991                        let chunk = chunk?;
992                        total_matches += chunk.cardinality();
993                        yield chunk;
994                    }
995                }
996            };
997
998            join_matched_join_keys.observe(total_matches as _);
999            if total_matches > high_join_amplification_threshold {
1000                let join_key_data_types = side_update.ht.join_key_data_types();
1001                let key = key.deserialize(join_key_data_types)?;
1002                tracing::warn!(target: "high_join_amplification",
1003                    matched_rows_len = total_matches,
1004                    update_table_id = %side_update.ht.table_id(),
1005                    match_table_id = %side_match.ht.table_id(),
1006                    join_key = ?key,
1007                    actor_id = %ctx.id,
1008                    fragment_id = %ctx.fragment_id,
1009                    "large rows matched for join key"
1010                );
1011            }
1012        }
1013        // NOTE(kwannoel): We don't track metrics for this last chunk.
1014        if let Some(chunk) = hashjoin_chunk_builder.take() {
1015            yield chunk;
1016        }
1017    }
1018
1019    /// For the probe-side row, we need to check if it has values in cache, if not, we need to
1020    /// fetch the matched rows from the state table.
1021    ///
1022    /// Every matched build-side row being processed needs to go through the following phases:
1023    /// 1. Handle join condition evaluation.
1024    /// 2. Always do cache refill, if the state count is good.
1025    /// 3. Handle state cleaning.
1026    /// 4. Handle degree table update.
1027    #[allow(clippy::too_many_arguments)]
1028    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
1029    async fn handle_match_rows<
1030        'a,
1031        const SIDE: SideTypePrimitive,
1032        const JOIN_OP: JoinOpPrimitive,
1033    >(
1034        cached_lookup_result: CacheResult<E>,
1035        row: RowRef<'a>,
1036        key: &'a K,
1037        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1038        side_match: &'a mut JoinSide<K, S, E>,
1039        side_update: &'a mut JoinSide<K, S, E>,
1040        useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1041        cond: &'a mut Option<NonStrictExpression>,
1042        append_only_optimize: bool,
1043        entry_state_max_rows: usize,
1044    ) {
1045        let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1046        let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1047        let mut entry_state_count = 0;
1048
1049        let mut degree = 0;
1050        let mut append_only_matched_row = None;
1051        let mut matched_rows_to_clean = vec![];
1052
1053        macro_rules! match_row {
1054            (
1055                $match_order_key_indices:expr,
1056                $degree_table:expr,
1057                $matched_row:expr,
1058                $matched_row_ref:expr,
1059                $from_cache:literal,
1060                $map_output:expr,
1061            ) => {
1062                Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1063                    row,
1064                    $matched_row,
1065                    $matched_row_ref,
1066                    hashjoin_chunk_builder,
1067                    $match_order_key_indices,
1068                    $degree_table,
1069                    side_update.start_pos,
1070                    side_match.start_pos,
1071                    cond,
1072                    &mut degree,
1073                    useful_state_clean_columns,
1074                    append_only_optimize,
1075                    &mut append_only_matched_row,
1076                    &mut matched_rows_to_clean,
1077                    $map_output,
1078                )
1079            };
1080        }
1081
1082        let entry_state = match cached_lookup_result {
1083            CacheResult::NeverMatch => {
1084                let op = match JOIN_OP {
1085                    JoinOp::Insert => Op::Insert,
1086                    JoinOp::Delete => Op::Delete,
1087                };
1088                if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1089                    yield chunk;
1090                }
1091                return Ok(());
1092            }
1093            CacheResult::Hit(mut cached_rows) => {
1094                let (match_order_key_indices, match_degree_state) =
1095                    side_match.ht.get_degree_state_mut_ref();
1096                // Handle cached rows which match the probe-side row.
1097                for (matched_row_ref, matched_row) in
1098                    cached_rows.values_mut(&side_match.all_data_types)
1099                {
1100                    let matched_row = matched_row?;
1101                    if let Some(chunk) = match_row!(
1102                        match_order_key_indices,
1103                        match_degree_state,
1104                        matched_row,
1105                        Some(matched_row_ref),
1106                        true,
1107                        Either::Left,
1108                    )
1109                    .await
1110                    {
1111                        yield chunk;
1112                    }
1113                }
1114
1115                cached_rows
1116            }
1117            CacheResult::Miss => {
1118                // Handle rows which are not in cache.
1119                let (matched_rows, match_order_key_indices, degree_table) = side_match
1120                    .ht
1121                    .fetch_matched_rows_and_get_degree_table_ref(key)
1122                    .await?;
1123
1124                #[for_await]
1125                for matched_row in matched_rows {
1126                    let (encoded_pk, matched_row) = matched_row?;
1127
1128                    let mut matched_row_ref = None;
1129
1130                    // cache refill
1131                    if entry_state_count <= entry_state_max_rows {
1132                        let row_ref = entry_state
1133                            .insert(encoded_pk, E::encode(&matched_row), None) // TODO(kwannoel): handle ineq key for asof join.
1134                            .with_context(|| format!("row: {}", row.display(),))?;
1135                        matched_row_ref = Some(row_ref);
1136                        entry_state_count += 1;
1137                    }
1138                    if let Some(chunk) = match_row!(
1139                        match_order_key_indices,
1140                        degree_table,
1141                        matched_row,
1142                        matched_row_ref,
1143                        false,
1144                        Either::Right,
1145                    )
1146                    .await
1147                    {
1148                        yield chunk;
1149                    }
1150                }
1151                Box::new(entry_state)
1152            }
1153        };
1154
1155        // forward rows depending on join types
1156        let op = match JOIN_OP {
1157            JoinOp::Insert => Op::Insert,
1158            JoinOp::Delete => Op::Delete,
1159        };
1160        if degree == 0 {
1161            if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1162                yield chunk;
1163            }
1164        } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1165        {
1166            yield chunk;
1167        }
1168
1169        // cache refill
1170        if cache_hit || entry_state_count <= entry_state_max_rows {
1171            side_match.ht.update_state(key, entry_state);
1172        }
1173
1174        // watermark state cleaning
1175        for matched_row in matched_rows_to_clean {
1176            side_match.ht.delete_handle_degree(key, matched_row)?;
1177        }
1178
1179        // apply append_only optimization to clean matched_rows which have been persisted
1180        if append_only_optimize && let Some(row) = append_only_matched_row {
1181            assert_matches!(JOIN_OP, JoinOp::Insert);
1182            side_match.ht.delete_handle_degree(key, row)?;
1183            return Ok(());
1184        }
1185
1186        // no append_only optimization, update state table(s).
1187        match JOIN_OP {
1188            JoinOp::Insert => {
1189                side_update
1190                    .ht
1191                    .insert_handle_degree(key, JoinRow::new(row, degree))?;
1192            }
1193            JoinOp::Delete => {
1194                side_update
1195                    .ht
1196                    .delete_handle_degree(key, JoinRow::new(row, degree))?;
1197            }
1198        }
1199    }
1200
1201    #[allow(clippy::too_many_arguments)]
1202    #[inline]
1203    async fn handle_match_row<
1204        'a,
1205        R: Row,  // input row type
1206        RO: Row, // output row type
1207        const SIDE: SideTypePrimitive,
1208        const JOIN_OP: JoinOpPrimitive,
1209        const MATCHED_ROWS_FROM_CACHE: bool,
1210    >(
1211        update_row: RowRef<'a>,
1212        mut matched_row: JoinRow<R>,
1213        mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1214        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1215        match_order_key_indices: &[usize],
1216        match_degree_table: &mut Option<TableInner<S>>,
1217        side_update_start_pos: usize,
1218        side_match_start_pos: usize,
1219        cond: &Option<NonStrictExpression>,
1220        update_row_degree: &mut u64,
1221        useful_state_clean_columns: &[(usize, &'a Watermark)],
1222        append_only_optimize: bool,
1223        append_only_matched_row: &mut Option<JoinRow<RO>>,
1224        matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1225        map_output: impl Fn(R) -> RO,
1226    ) -> Option<StreamChunk> {
1227        let mut need_state_clean = false;
1228        let mut chunk_opt = None;
1229
1230        // TODO(kwannoel): Instead of evaluating this every loop,
1231        // we can call this only if there's a non-equi expression.
1232        // check join cond
1233        let join_condition_satisfied = Self::check_join_condition(
1234            update_row,
1235            side_update_start_pos,
1236            &matched_row.row,
1237            side_match_start_pos,
1238            cond,
1239        )
1240        .await;
1241
1242        if join_condition_satisfied {
1243            // update degree
1244            *update_row_degree += 1;
1245            // send matched row downstream
1246
1247            // Inserts must happen before degrees are updated,
1248            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1249            if matches!(JOIN_OP, JoinOp::Insert)
1250                && !forward_exactly_once(T, SIDE)
1251                && let Some(chunk) =
1252                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1253            {
1254                chunk_opt = Some(chunk);
1255            }
1256            // TODO(kwannoel): We can actually statically decide this, using join side + join type.
1257            if let Some(degree_table) = match_degree_table {
1258                update_degree::<S, { JOIN_OP }>(
1259                    match_order_key_indices,
1260                    degree_table,
1261                    &mut matched_row,
1262                );
1263                if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1264                    // update matched row in cache
1265                    match JOIN_OP {
1266                        JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1267                        JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1268                    }
1269                }
1270            }
1271
1272            // Deletes must happen after degree table is updated.
1273            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1274            if matches!(JOIN_OP, JoinOp::Delete)
1275                && !forward_exactly_once(T, SIDE)
1276                && let Some(chunk) =
1277                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1278            {
1279                chunk_opt = Some(chunk);
1280            }
1281        } else {
1282            // check if need state cleaning
1283            for (column_idx, watermark) in useful_state_clean_columns {
1284                if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1285                    scalar
1286                        .default_cmp(&watermark.val.as_scalar_ref_impl())
1287                        .is_lt()
1288                }) {
1289                    need_state_clean = true;
1290                    break;
1291                }
1292            }
1293        }
1294        // If the stream is append-only and the join key covers pk in both side,
1295        // then we can remove matched rows since pk is unique and will not be
1296        // inserted again
1297        if append_only_optimize {
1298            assert_matches!(JOIN_OP, JoinOp::Insert);
1299            // Since join key contains pk and pk is unique, there should be only
1300            // one row if matched.
1301            assert!(append_only_matched_row.is_none());
1302            *append_only_matched_row = Some(matched_row.map(map_output));
1303        } else if need_state_clean {
1304            // `append_only_optimize` and `need_state_clean` won't both be true.
1305            // 'else' here is only to suppress compiler error, otherwise
1306            // `matched_row` will be moved twice.
1307            matched_rows_to_clean.push(matched_row.map(map_output));
1308        }
1309
1310        chunk_opt
1311    }
1312
1313    // TODO(yuhao-su): We should find a better way to eval the expression
1314    // without concat two rows.
1315    // if there are non-equi expressions
1316    // NOTE(kwannoel): We can probably let `eval` use `impl Row` instead of `OwnedRow`.
1317    #[inline]
1318    async fn check_join_condition(
1319        row: impl Row,
1320        side_update_start_pos: usize,
1321        matched_row: impl Row,
1322        side_match_start_pos: usize,
1323        join_condition: &Option<NonStrictExpression>,
1324    ) -> bool {
1325        if let Some(join_condition) = join_condition {
1326            let new_row = Self::row_concat(
1327                row,
1328                side_update_start_pos,
1329                matched_row,
1330                side_match_start_pos,
1331            );
1332            join_condition
1333                .eval_row_infallible(&new_row)
1334                .await
1335                .map(|s| *s.as_bool())
1336                .unwrap_or(false)
1337        } else {
1338            true
1339        }
1340    }
1341}
1342
1343#[cfg(test)]
1344mod tests {
1345    use std::sync::atomic::AtomicU64;
1346
1347    use pretty_assertions::assert_eq;
1348    use risingwave_common::array::*;
1349    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1350    use risingwave_common::hash::{Key64, Key128};
1351    use risingwave_common::util::epoch::test_epoch;
1352    use risingwave_common::util::sort_util::OrderType;
1353    use risingwave_storage::memory::MemoryStateStore;
1354
1355    use super::*;
1356    use crate::common::table::test_utils::gen_pbtable;
1357    use crate::executor::MemoryEncoding;
1358    use crate::executor::test_utils::expr::build_from_pretty;
1359    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1360
1361    async fn create_in_memory_state_table(
1362        mem_state: MemoryStateStore,
1363        data_types: &[DataType],
1364        order_types: &[OrderType],
1365        pk_indices: &[usize],
1366        table_id: u32,
1367    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1368        let column_descs = data_types
1369            .iter()
1370            .enumerate()
1371            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1372            .collect_vec();
1373        let state_table = StateTable::from_table_catalog(
1374            &gen_pbtable(
1375                TableId::new(table_id),
1376                column_descs,
1377                order_types.to_vec(),
1378                pk_indices.to_vec(),
1379                0,
1380            ),
1381            mem_state.clone(),
1382            None,
1383        )
1384        .await;
1385
1386        // Create degree table
1387        let mut degree_table_column_descs = vec![];
1388        pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1389            degree_table_column_descs.push(ColumnDesc::unnamed(
1390                ColumnId::new(pk_id as i32),
1391                data_types[*idx].clone(),
1392            ))
1393        });
1394        degree_table_column_descs.push(ColumnDesc::unnamed(
1395            ColumnId::new(pk_indices.len() as i32),
1396            DataType::Int64,
1397        ));
1398        let degree_state_table = StateTable::from_table_catalog(
1399            &gen_pbtable(
1400                TableId::new(table_id + 1),
1401                degree_table_column_descs,
1402                order_types.to_vec(),
1403                pk_indices.to_vec(),
1404                0,
1405            ),
1406            mem_state,
1407            None,
1408        )
1409        .await;
1410        (state_table, degree_state_table)
1411    }
1412
1413    fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1414        build_from_pretty(
1415            condition_text
1416                .as_deref()
1417                .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1418        )
1419    }
1420
1421    async fn create_executor<const T: JoinTypePrimitive>(
1422        with_condition: bool,
1423        null_safe: bool,
1424        condition_text: Option<String>,
1425        inequality_pairs: Vec<InequalityPairInfo>,
1426    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1427        let schema = Schema {
1428            fields: vec![
1429                Field::unnamed(DataType::Int64), // join key
1430                Field::unnamed(DataType::Int64),
1431            ],
1432        };
1433        let (tx_l, source_l) = MockSource::channel();
1434        let source_l = source_l.into_executor(schema.clone(), vec![1]);
1435        let (tx_r, source_r) = MockSource::channel();
1436        let source_r = source_r.into_executor(schema, vec![1]);
1437        let params_l = JoinParams::new(vec![0], vec![1]);
1438        let params_r = JoinParams::new(vec![0], vec![1]);
1439        let cond = with_condition.then(|| create_cond(condition_text));
1440
1441        let mem_state = MemoryStateStore::new();
1442
1443        let (state_l, degree_state_l) = create_in_memory_state_table(
1444            mem_state.clone(),
1445            &[DataType::Int64, DataType::Int64],
1446            &[OrderType::ascending(), OrderType::ascending()],
1447            &[0, 1],
1448            0,
1449        )
1450        .await;
1451
1452        let (state_r, degree_state_r) = create_in_memory_state_table(
1453            mem_state,
1454            &[DataType::Int64, DataType::Int64],
1455            &[OrderType::ascending(), OrderType::ascending()],
1456            &[0, 1],
1457            2,
1458        )
1459        .await;
1460
1461        let schema = match T {
1462            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1463            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1464            _ => [source_l.schema().fields(), source_r.schema().fields()]
1465                .concat()
1466                .into_iter()
1467                .collect(),
1468        };
1469        let schema_len = schema.len();
1470        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1471
1472        let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1473            ActorContext::for_test(123),
1474            info,
1475            source_l,
1476            source_r,
1477            params_l,
1478            params_r,
1479            vec![null_safe],
1480            (0..schema_len).collect_vec(),
1481            cond,
1482            inequality_pairs,
1483            state_l,
1484            degree_state_l,
1485            state_r,
1486            degree_state_r,
1487            Arc::new(AtomicU64::new(0)),
1488            false,
1489            Arc::new(StreamingMetrics::unused()),
1490            1024,
1491            2048,
1492        );
1493        (tx_l, tx_r, executor.boxed().execute())
1494    }
1495
1496    async fn create_classical_executor<const T: JoinTypePrimitive>(
1497        with_condition: bool,
1498        null_safe: bool,
1499        condition_text: Option<String>,
1500    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1501        create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1502    }
1503
1504    async fn create_append_only_executor<const T: JoinTypePrimitive>(
1505        with_condition: bool,
1506    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1507        let schema = Schema {
1508            fields: vec![
1509                Field::unnamed(DataType::Int64),
1510                Field::unnamed(DataType::Int64),
1511                Field::unnamed(DataType::Int64),
1512            ],
1513        };
1514        let (tx_l, source_l) = MockSource::channel();
1515        let source_l = source_l.into_executor(schema.clone(), vec![0]);
1516        let (tx_r, source_r) = MockSource::channel();
1517        let source_r = source_r.into_executor(schema, vec![0]);
1518        let params_l = JoinParams::new(vec![0, 1], vec![]);
1519        let params_r = JoinParams::new(vec![0, 1], vec![]);
1520        let cond = with_condition.then(|| create_cond(None));
1521
1522        let mem_state = MemoryStateStore::new();
1523
1524        let (state_l, degree_state_l) = create_in_memory_state_table(
1525            mem_state.clone(),
1526            &[DataType::Int64, DataType::Int64, DataType::Int64],
1527            &[
1528                OrderType::ascending(),
1529                OrderType::ascending(),
1530                OrderType::ascending(),
1531            ],
1532            &[0, 1, 0],
1533            0,
1534        )
1535        .await;
1536
1537        let (state_r, degree_state_r) = create_in_memory_state_table(
1538            mem_state,
1539            &[DataType::Int64, DataType::Int64, DataType::Int64],
1540            &[
1541                OrderType::ascending(),
1542                OrderType::ascending(),
1543                OrderType::ascending(),
1544            ],
1545            &[0, 1, 1],
1546            1,
1547        )
1548        .await;
1549
1550        let schema = match T {
1551            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1552            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1553            _ => [source_l.schema().fields(), source_r.schema().fields()]
1554                .concat()
1555                .into_iter()
1556                .collect(),
1557        };
1558        let schema_len = schema.len();
1559        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1560
1561        let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1562            ActorContext::for_test(123),
1563            info,
1564            source_l,
1565            source_r,
1566            params_l,
1567            params_r,
1568            vec![false],
1569            (0..schema_len).collect_vec(),
1570            cond,
1571            vec![],
1572            state_l,
1573            degree_state_l,
1574            state_r,
1575            degree_state_r,
1576            Arc::new(AtomicU64::new(0)),
1577            true,
1578            Arc::new(StreamingMetrics::unused()),
1579            1024,
1580            2048,
1581        );
1582        (tx_l, tx_r, executor.boxed().execute())
1583    }
1584
1585    #[tokio::test]
1586    async fn test_inequality_join_watermark() -> StreamExecutorResult<()> {
1587        // Test inequality join with watermark-based state cleanup
1588        // Condition: left.col1 >= right.col1 (left side is larger, clean left state)
1589        // Left state rows with col1 < watermark will be cleaned
1590        let chunk_l1 = StreamChunk::from_pretty(
1591            "  I I
1592             + 2 4
1593             + 2 7
1594             + 3 8",
1595        );
1596        let chunk_r1 = StreamChunk::from_pretty(
1597            "  I I
1598             + 2 6",
1599        );
1600        let chunk_r2 = StreamChunk::from_pretty(
1601            "  I I
1602             + 2 3",
1603        );
1604        // Test with condition: left.col1 >= right.col1
1605        let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1606            true,
1607            false,
1608            Some(String::from(
1609                "(greater_than_or_equal:boolean $1:int8 $3:int8)",
1610            )),
1611            vec![InequalityPairInfo {
1612                left_idx: 1,
1613                right_idx: 1,
1614                clean_left_state: true, // left >= right, left is larger
1615                clean_right_state: false,
1616                op: InequalityType::GreaterThanOrEqual,
1617            }],
1618        )
1619        .await;
1620
1621        // push the init barrier for left and right
1622        tx_l.push_barrier(test_epoch(1), false);
1623        tx_r.push_barrier(test_epoch(1), false);
1624        hash_join.next_unwrap_ready_barrier()?;
1625
1626        // push the left chunk: (2, 4), (2, 7), (3, 8)
1627        tx_l.push_chunk(chunk_l1);
1628        hash_join.next_unwrap_pending();
1629
1630        // push watermarks: left=10, right=6
1631        // Output watermark is min(10, 6) = 6 for both output columns
1632        tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1633        hash_join.next_unwrap_pending();
1634
1635        tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1636        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1637        assert_eq!(
1638            output_watermark,
1639            Watermark::new(1, DataType::Int64, ScalarImpl::Int64(6))
1640        );
1641
1642        // After watermark, left state rows with col1 < 6 are cleaned
1643        // Row (2, 4) should be cleaned (4 < 6)
1644        // Row (2, 7) should remain (7 >= 6)
1645        // Row (3, 8) should remain (8 >= 6)
1646
1647        // push right chunk (2, 6)
1648        // Only (2, 7) can match: 7 >= 6 is TRUE
1649        // (2, 4) was cleaned, so it won't match
1650        tx_r.push_chunk(chunk_r1);
1651        let chunk = hash_join.next_unwrap_ready_chunk()?;
1652        assert_eq!(
1653            chunk,
1654            StreamChunk::from_pretty(
1655                " I I I I
1656                + 2 7 2 6"
1657            )
1658        );
1659
1660        // push right chunk (2, 3)
1661        // (2, 7) can match: 7 >= 3 is TRUE
1662        // (2, 4) was cleaned, won't match
1663        tx_r.push_chunk(chunk_r2);
1664        let chunk = hash_join.next_unwrap_ready_chunk()?;
1665        assert_eq!(
1666            chunk,
1667            StreamChunk::from_pretty(
1668                " I I I I
1669                + 2 7 2 3"
1670            )
1671        );
1672
1673        Ok(())
1674    }
1675
1676    #[tokio::test]
1677    async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1678        let chunk_l1 = StreamChunk::from_pretty(
1679            "  I I
1680             + 1 4
1681             + 2 5
1682             + 3 6",
1683        );
1684        let chunk_l2 = StreamChunk::from_pretty(
1685            "  I I
1686             + 3 8
1687             - 3 8",
1688        );
1689        let chunk_r1 = StreamChunk::from_pretty(
1690            "  I I
1691             + 2 7
1692             + 4 8
1693             + 6 9",
1694        );
1695        let chunk_r2 = StreamChunk::from_pretty(
1696            "  I  I
1697             + 3 10
1698             + 6 11",
1699        );
1700        let (mut tx_l, mut tx_r, mut hash_join) =
1701            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1702
1703        // push the init barrier for left and right
1704        tx_l.push_barrier(test_epoch(1), false);
1705        tx_r.push_barrier(test_epoch(1), false);
1706        hash_join.next_unwrap_ready_barrier()?;
1707
1708        // push the 1st left chunk
1709        tx_l.push_chunk(chunk_l1);
1710        hash_join.next_unwrap_pending();
1711
1712        // push the init barrier for left and right
1713        tx_l.push_barrier(test_epoch(2), false);
1714        tx_r.push_barrier(test_epoch(2), false);
1715        hash_join.next_unwrap_ready_barrier()?;
1716
1717        // push the 2nd left chunk
1718        tx_l.push_chunk(chunk_l2);
1719        hash_join.next_unwrap_pending();
1720
1721        // push the 1st right chunk
1722        tx_r.push_chunk(chunk_r1);
1723        let chunk = hash_join.next_unwrap_ready_chunk()?;
1724        assert_eq!(
1725            chunk,
1726            StreamChunk::from_pretty(
1727                " I I I I
1728                + 2 5 2 7"
1729            )
1730        );
1731
1732        // push the 2nd right chunk
1733        tx_r.push_chunk(chunk_r2);
1734        let chunk = hash_join.next_unwrap_ready_chunk()?;
1735        assert_eq!(
1736            chunk,
1737            StreamChunk::from_pretty(
1738                " I I I I
1739                + 3 6 3 10"
1740            )
1741        );
1742
1743        Ok(())
1744    }
1745
1746    #[tokio::test]
1747    async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1748        let chunk_l1 = StreamChunk::from_pretty(
1749            "  I I
1750             + 1 4
1751             + 2 5
1752             + . 6",
1753        );
1754        let chunk_l2 = StreamChunk::from_pretty(
1755            "  I I
1756             + . 8
1757             - . 8",
1758        );
1759        let chunk_r1 = StreamChunk::from_pretty(
1760            "  I I
1761             + 2 7
1762             + 4 8
1763             + 6 9",
1764        );
1765        let chunk_r2 = StreamChunk::from_pretty(
1766            "  I  I
1767             + . 10
1768             + 6 11",
1769        );
1770        let (mut tx_l, mut tx_r, mut hash_join) =
1771            create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1772
1773        // push the init barrier for left and right
1774        tx_l.push_barrier(test_epoch(1), false);
1775        tx_r.push_barrier(test_epoch(1), false);
1776        hash_join.next_unwrap_ready_barrier()?;
1777
1778        // push the 1st left chunk
1779        tx_l.push_chunk(chunk_l1);
1780        hash_join.next_unwrap_pending();
1781
1782        // push the init barrier for left and right
1783        tx_l.push_barrier(test_epoch(2), false);
1784        tx_r.push_barrier(test_epoch(2), false);
1785        hash_join.next_unwrap_ready_barrier()?;
1786
1787        // push the 2nd left chunk
1788        tx_l.push_chunk(chunk_l2);
1789        hash_join.next_unwrap_pending();
1790
1791        // push the 1st right chunk
1792        tx_r.push_chunk(chunk_r1);
1793        let chunk = hash_join.next_unwrap_ready_chunk()?;
1794        assert_eq!(
1795            chunk,
1796            StreamChunk::from_pretty(
1797                " I I I I
1798                + 2 5 2 7"
1799            )
1800        );
1801
1802        // push the 2nd right chunk
1803        tx_r.push_chunk(chunk_r2);
1804        let chunk = hash_join.next_unwrap_ready_chunk()?;
1805        assert_eq!(
1806            chunk,
1807            StreamChunk::from_pretty(
1808                " I I I I
1809                + . 6 . 10"
1810            )
1811        );
1812
1813        Ok(())
1814    }
1815
1816    #[tokio::test]
1817    async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1818        let chunk_l1 = StreamChunk::from_pretty(
1819            "  I I
1820             + 1 4
1821             + 2 5
1822             + 3 6",
1823        );
1824        let chunk_l2 = StreamChunk::from_pretty(
1825            "  I I
1826             + 3 8
1827             - 3 8",
1828        );
1829        let chunk_r1 = StreamChunk::from_pretty(
1830            "  I I
1831             + 2 7
1832             + 4 8
1833             + 6 9",
1834        );
1835        let chunk_r2 = StreamChunk::from_pretty(
1836            "  I  I
1837             + 3 10
1838             + 6 11",
1839        );
1840        let chunk_l3 = StreamChunk::from_pretty(
1841            "  I I
1842             + 6 10",
1843        );
1844        let chunk_r3 = StreamChunk::from_pretty(
1845            "  I  I
1846             - 6 11",
1847        );
1848        let chunk_r4 = StreamChunk::from_pretty(
1849            "  I  I
1850             - 6 9",
1851        );
1852        let (mut tx_l, mut tx_r, mut hash_join) =
1853            create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1854
1855        // push the init barrier for left and right
1856        tx_l.push_barrier(test_epoch(1), false);
1857        tx_r.push_barrier(test_epoch(1), false);
1858        hash_join.next_unwrap_ready_barrier()?;
1859
1860        // push the 1st left chunk
1861        tx_l.push_chunk(chunk_l1);
1862        hash_join.next_unwrap_pending();
1863
1864        // push the init barrier for left and right
1865        tx_l.push_barrier(test_epoch(2), false);
1866        tx_r.push_barrier(test_epoch(2), false);
1867        hash_join.next_unwrap_ready_barrier()?;
1868
1869        // push the 2nd left chunk
1870        tx_l.push_chunk(chunk_l2);
1871        hash_join.next_unwrap_pending();
1872
1873        // push the 1st right chunk
1874        tx_r.push_chunk(chunk_r1);
1875        let chunk = hash_join.next_unwrap_ready_chunk()?;
1876        assert_eq!(
1877            chunk,
1878            StreamChunk::from_pretty(
1879                " I I
1880                + 2 5"
1881            )
1882        );
1883
1884        // push the 2nd right chunk
1885        tx_r.push_chunk(chunk_r2);
1886        let chunk = hash_join.next_unwrap_ready_chunk()?;
1887        assert_eq!(
1888            chunk,
1889            StreamChunk::from_pretty(
1890                " I I
1891                + 3 6"
1892            )
1893        );
1894
1895        // push the 3rd left chunk (tests forward_exactly_once)
1896        tx_l.push_chunk(chunk_l3);
1897        let chunk = hash_join.next_unwrap_ready_chunk()?;
1898        assert_eq!(
1899            chunk,
1900            StreamChunk::from_pretty(
1901                " I I
1902                + 6 10"
1903            )
1904        );
1905
1906        // push the 3rd right chunk
1907        // (tests that no change if there are still matches)
1908        tx_r.push_chunk(chunk_r3);
1909        hash_join.next_unwrap_pending();
1910
1911        // push the 3rd left chunk
1912        // (tests that deletion occurs when there are no more matches)
1913        tx_r.push_chunk(chunk_r4);
1914        let chunk = hash_join.next_unwrap_ready_chunk()?;
1915        assert_eq!(
1916            chunk,
1917            StreamChunk::from_pretty(
1918                " I I
1919                - 6 10"
1920            )
1921        );
1922
1923        Ok(())
1924    }
1925
1926    #[tokio::test]
1927    async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1928        let chunk_l1 = StreamChunk::from_pretty(
1929            "  I I
1930             + 1 4
1931             + 2 5
1932             + . 6",
1933        );
1934        let chunk_l2 = StreamChunk::from_pretty(
1935            "  I I
1936             + . 8
1937             - . 8",
1938        );
1939        let chunk_r1 = StreamChunk::from_pretty(
1940            "  I I
1941             + 2 7
1942             + 4 8
1943             + 6 9",
1944        );
1945        let chunk_r2 = StreamChunk::from_pretty(
1946            "  I  I
1947             + . 10
1948             + 6 11",
1949        );
1950        let chunk_l3 = StreamChunk::from_pretty(
1951            "  I I
1952             + 6 10",
1953        );
1954        let chunk_r3 = StreamChunk::from_pretty(
1955            "  I  I
1956             - 6 11",
1957        );
1958        let chunk_r4 = StreamChunk::from_pretty(
1959            "  I  I
1960             - 6 9",
1961        );
1962        let (mut tx_l, mut tx_r, mut hash_join) =
1963            create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1964
1965        // push the init barrier for left and right
1966        tx_l.push_barrier(test_epoch(1), false);
1967        tx_r.push_barrier(test_epoch(1), false);
1968        hash_join.next_unwrap_ready_barrier()?;
1969
1970        // push the 1st left chunk
1971        tx_l.push_chunk(chunk_l1);
1972        hash_join.next_unwrap_pending();
1973
1974        // push the init barrier for left and right
1975        tx_l.push_barrier(test_epoch(2), false);
1976        tx_r.push_barrier(test_epoch(2), false);
1977        hash_join.next_unwrap_ready_barrier()?;
1978
1979        // push the 2nd left chunk
1980        tx_l.push_chunk(chunk_l2);
1981        hash_join.next_unwrap_pending();
1982
1983        // push the 1st right chunk
1984        tx_r.push_chunk(chunk_r1);
1985        let chunk = hash_join.next_unwrap_ready_chunk()?;
1986        assert_eq!(
1987            chunk,
1988            StreamChunk::from_pretty(
1989                " I I
1990                + 2 5"
1991            )
1992        );
1993
1994        // push the 2nd right chunk
1995        tx_r.push_chunk(chunk_r2);
1996        let chunk = hash_join.next_unwrap_ready_chunk()?;
1997        assert_eq!(
1998            chunk,
1999            StreamChunk::from_pretty(
2000                " I I
2001                + . 6"
2002            )
2003        );
2004
2005        // push the 3rd left chunk (tests forward_exactly_once)
2006        tx_l.push_chunk(chunk_l3);
2007        let chunk = hash_join.next_unwrap_ready_chunk()?;
2008        assert_eq!(
2009            chunk,
2010            StreamChunk::from_pretty(
2011                " I I
2012                + 6 10"
2013            )
2014        );
2015
2016        // push the 3rd right chunk
2017        // (tests that no change if there are still matches)
2018        tx_r.push_chunk(chunk_r3);
2019        hash_join.next_unwrap_pending();
2020
2021        // push the 3rd left chunk
2022        // (tests that deletion occurs when there are no more matches)
2023        tx_r.push_chunk(chunk_r4);
2024        let chunk = hash_join.next_unwrap_ready_chunk()?;
2025        assert_eq!(
2026            chunk,
2027            StreamChunk::from_pretty(
2028                " I I
2029                - 6 10"
2030            )
2031        );
2032
2033        Ok(())
2034    }
2035
2036    #[tokio::test]
2037    async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
2038        let chunk_l1 = StreamChunk::from_pretty(
2039            "  I I I
2040             + 1 4 1
2041             + 2 5 2
2042             + 3 6 3",
2043        );
2044        let chunk_l2 = StreamChunk::from_pretty(
2045            "  I I I
2046             + 4 9 4
2047             + 5 10 5",
2048        );
2049        let chunk_r1 = StreamChunk::from_pretty(
2050            "  I I I
2051             + 2 5 1
2052             + 4 9 2
2053             + 6 9 3",
2054        );
2055        let chunk_r2 = StreamChunk::from_pretty(
2056            "  I I I
2057             + 1 4 4
2058             + 3 6 5",
2059        );
2060
2061        let (mut tx_l, mut tx_r, mut hash_join) =
2062            create_append_only_executor::<{ JoinType::Inner }>(false).await;
2063
2064        // push the init barrier for left and right
2065        tx_l.push_barrier(test_epoch(1), false);
2066        tx_r.push_barrier(test_epoch(1), false);
2067        hash_join.next_unwrap_ready_barrier()?;
2068
2069        // push the 1st left chunk
2070        tx_l.push_chunk(chunk_l1);
2071        hash_join.next_unwrap_pending();
2072
2073        // push the init barrier for left and right
2074        tx_l.push_barrier(test_epoch(2), false);
2075        tx_r.push_barrier(test_epoch(2), false);
2076        hash_join.next_unwrap_ready_barrier()?;
2077
2078        // push the 2nd left chunk
2079        tx_l.push_chunk(chunk_l2);
2080        hash_join.next_unwrap_pending();
2081
2082        // push the 1st right chunk
2083        tx_r.push_chunk(chunk_r1);
2084        let chunk = hash_join.next_unwrap_ready_chunk()?;
2085        assert_eq!(
2086            chunk,
2087            StreamChunk::from_pretty(
2088                " I I I I I I
2089                + 2 5 2 2 5 1
2090                + 4 9 4 4 9 2"
2091            )
2092        );
2093
2094        // push the 2nd right chunk
2095        tx_r.push_chunk(chunk_r2);
2096        let chunk = hash_join.next_unwrap_ready_chunk()?;
2097        assert_eq!(
2098            chunk,
2099            StreamChunk::from_pretty(
2100                " I I I I I I
2101                + 1 4 1 1 4 4
2102                + 3 6 3 3 6 5"
2103            )
2104        );
2105
2106        Ok(())
2107    }
2108
2109    #[tokio::test]
2110    async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2111        let chunk_l1 = StreamChunk::from_pretty(
2112            "  I I I
2113             + 1 4 1
2114             + 2 5 2
2115             + 3 6 3",
2116        );
2117        let chunk_l2 = StreamChunk::from_pretty(
2118            "  I I I
2119             + 4 9 4
2120             + 5 10 5",
2121        );
2122        let chunk_r1 = StreamChunk::from_pretty(
2123            "  I I I
2124             + 2 5 1
2125             + 4 9 2
2126             + 6 9 3",
2127        );
2128        let chunk_r2 = StreamChunk::from_pretty(
2129            "  I I I
2130             + 1 4 4
2131             + 3 6 5",
2132        );
2133
2134        let (mut tx_l, mut tx_r, mut hash_join) =
2135            create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2136
2137        // push the init barrier for left and right
2138        tx_l.push_barrier(test_epoch(1), false);
2139        tx_r.push_barrier(test_epoch(1), false);
2140        hash_join.next_unwrap_ready_barrier()?;
2141
2142        // push the 1st left chunk
2143        tx_l.push_chunk(chunk_l1);
2144        hash_join.next_unwrap_pending();
2145
2146        // push the init barrier for left and right
2147        tx_l.push_barrier(test_epoch(2), false);
2148        tx_r.push_barrier(test_epoch(2), false);
2149        hash_join.next_unwrap_ready_barrier()?;
2150
2151        // push the 2nd left chunk
2152        tx_l.push_chunk(chunk_l2);
2153        hash_join.next_unwrap_pending();
2154
2155        // push the 1st right chunk
2156        tx_r.push_chunk(chunk_r1);
2157        let chunk = hash_join.next_unwrap_ready_chunk()?;
2158        assert_eq!(
2159            chunk,
2160            StreamChunk::from_pretty(
2161                " I I I
2162                + 2 5 2
2163                + 4 9 4"
2164            )
2165        );
2166
2167        // push the 2nd right chunk
2168        tx_r.push_chunk(chunk_r2);
2169        let chunk = hash_join.next_unwrap_ready_chunk()?;
2170        assert_eq!(
2171            chunk,
2172            StreamChunk::from_pretty(
2173                " I I I
2174                + 1 4 1
2175                + 3 6 3"
2176            )
2177        );
2178
2179        Ok(())
2180    }
2181
2182    #[tokio::test]
2183    async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2184        let chunk_l1 = StreamChunk::from_pretty(
2185            "  I I I
2186             + 1 4 1
2187             + 2 5 2
2188             + 3 6 3",
2189        );
2190        let chunk_l2 = StreamChunk::from_pretty(
2191            "  I I I
2192             + 4 9 4
2193             + 5 10 5",
2194        );
2195        let chunk_r1 = StreamChunk::from_pretty(
2196            "  I I I
2197             + 2 5 1
2198             + 4 9 2
2199             + 6 9 3",
2200        );
2201        let chunk_r2 = StreamChunk::from_pretty(
2202            "  I I I
2203             + 1 4 4
2204             + 3 6 5",
2205        );
2206
2207        let (mut tx_l, mut tx_r, mut hash_join) =
2208            create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2209
2210        // push the init barrier for left and right
2211        tx_l.push_barrier(test_epoch(1), false);
2212        tx_r.push_barrier(test_epoch(1), false);
2213        hash_join.next_unwrap_ready_barrier()?;
2214
2215        // push the 1st left chunk
2216        tx_l.push_chunk(chunk_l1);
2217        hash_join.next_unwrap_pending();
2218
2219        // push the init barrier for left and right
2220        tx_l.push_barrier(test_epoch(2), false);
2221        tx_r.push_barrier(test_epoch(2), false);
2222        hash_join.next_unwrap_ready_barrier()?;
2223
2224        // push the 2nd left chunk
2225        tx_l.push_chunk(chunk_l2);
2226        hash_join.next_unwrap_pending();
2227
2228        // push the 1st right chunk
2229        tx_r.push_chunk(chunk_r1);
2230        let chunk = hash_join.next_unwrap_ready_chunk()?;
2231        assert_eq!(
2232            chunk,
2233            StreamChunk::from_pretty(
2234                " I I I
2235                + 2 5 1
2236                + 4 9 2"
2237            )
2238        );
2239
2240        // push the 2nd right chunk
2241        tx_r.push_chunk(chunk_r2);
2242        let chunk = hash_join.next_unwrap_ready_chunk()?;
2243        assert_eq!(
2244            chunk,
2245            StreamChunk::from_pretty(
2246                " I I I
2247                + 1 4 4
2248                + 3 6 5"
2249            )
2250        );
2251
2252        Ok(())
2253    }
2254
2255    #[tokio::test]
2256    async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2257        let chunk_r1 = StreamChunk::from_pretty(
2258            "  I I
2259             + 1 4
2260             + 2 5
2261             + 3 6",
2262        );
2263        let chunk_r2 = StreamChunk::from_pretty(
2264            "  I I
2265             + 3 8
2266             - 3 8",
2267        );
2268        let chunk_l1 = StreamChunk::from_pretty(
2269            "  I I
2270             + 2 7
2271             + 4 8
2272             + 6 9",
2273        );
2274        let chunk_l2 = StreamChunk::from_pretty(
2275            "  I  I
2276             + 3 10
2277             + 6 11",
2278        );
2279        let chunk_r3 = StreamChunk::from_pretty(
2280            "  I I
2281             + 6 10",
2282        );
2283        let chunk_l3 = StreamChunk::from_pretty(
2284            "  I  I
2285             - 6 11",
2286        );
2287        let chunk_l4 = StreamChunk::from_pretty(
2288            "  I  I
2289             - 6 9",
2290        );
2291        let (mut tx_l, mut tx_r, mut hash_join) =
2292            create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2293
2294        // push the init barrier for left and right
2295        tx_l.push_barrier(test_epoch(1), false);
2296        tx_r.push_barrier(test_epoch(1), false);
2297        hash_join.next_unwrap_ready_barrier()?;
2298
2299        // push the 1st right chunk
2300        tx_r.push_chunk(chunk_r1);
2301        hash_join.next_unwrap_pending();
2302
2303        // push the init barrier for left and right
2304        tx_l.push_barrier(test_epoch(2), false);
2305        tx_r.push_barrier(test_epoch(2), false);
2306        hash_join.next_unwrap_ready_barrier()?;
2307
2308        // push the 2nd right chunk
2309        tx_r.push_chunk(chunk_r2);
2310        hash_join.next_unwrap_pending();
2311
2312        // push the 1st left chunk
2313        tx_l.push_chunk(chunk_l1);
2314        let chunk = hash_join.next_unwrap_ready_chunk()?;
2315        assert_eq!(
2316            chunk,
2317            StreamChunk::from_pretty(
2318                " I I
2319                + 2 5"
2320            )
2321        );
2322
2323        // push the 2nd left chunk
2324        tx_l.push_chunk(chunk_l2);
2325        let chunk = hash_join.next_unwrap_ready_chunk()?;
2326        assert_eq!(
2327            chunk,
2328            StreamChunk::from_pretty(
2329                " I I
2330                + 3 6"
2331            )
2332        );
2333
2334        // push the 3rd right chunk (tests forward_exactly_once)
2335        tx_r.push_chunk(chunk_r3);
2336        let chunk = hash_join.next_unwrap_ready_chunk()?;
2337        assert_eq!(
2338            chunk,
2339            StreamChunk::from_pretty(
2340                " I I
2341                + 6 10"
2342            )
2343        );
2344
2345        // push the 3rd left chunk
2346        // (tests that no change if there are still matches)
2347        tx_l.push_chunk(chunk_l3);
2348        hash_join.next_unwrap_pending();
2349
2350        // push the 3rd right chunk
2351        // (tests that deletion occurs when there are no more matches)
2352        tx_l.push_chunk(chunk_l4);
2353        let chunk = hash_join.next_unwrap_ready_chunk()?;
2354        assert_eq!(
2355            chunk,
2356            StreamChunk::from_pretty(
2357                " I I
2358                - 6 10"
2359            )
2360        );
2361
2362        Ok(())
2363    }
2364
2365    #[tokio::test]
2366    async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2367        let chunk_l1 = StreamChunk::from_pretty(
2368            "  I I
2369             + 1 4
2370             + 2 5
2371             + 3 6",
2372        );
2373        let chunk_l2 = StreamChunk::from_pretty(
2374            "  I I
2375             + 3 8
2376             - 3 8",
2377        );
2378        let chunk_r1 = StreamChunk::from_pretty(
2379            "  I I
2380             + 2 7
2381             + 4 8
2382             + 6 9",
2383        );
2384        let chunk_r2 = StreamChunk::from_pretty(
2385            "  I  I
2386             + 3 10
2387             + 6 11
2388             + 1 2
2389             + 1 3",
2390        );
2391        let chunk_l3 = StreamChunk::from_pretty(
2392            "  I I
2393             + 9 10",
2394        );
2395        let chunk_r3 = StreamChunk::from_pretty(
2396            "  I I
2397             - 1 2",
2398        );
2399        let chunk_r4 = StreamChunk::from_pretty(
2400            "  I I
2401             - 1 3",
2402        );
2403        let (mut tx_l, mut tx_r, mut hash_join) =
2404            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2405
2406        // push the init barrier for left and right
2407        tx_l.push_barrier(test_epoch(1), false);
2408        tx_r.push_barrier(test_epoch(1), false);
2409        hash_join.next_unwrap_ready_barrier()?;
2410
2411        // push the 1st left chunk
2412        tx_l.push_chunk(chunk_l1);
2413        let chunk = hash_join.next_unwrap_ready_chunk()?;
2414        assert_eq!(
2415            chunk,
2416            StreamChunk::from_pretty(
2417                " I I
2418                + 1 4
2419                + 2 5
2420                + 3 6",
2421            )
2422        );
2423
2424        // push the init barrier for left and right
2425        tx_l.push_barrier(test_epoch(2), false);
2426        tx_r.push_barrier(test_epoch(2), false);
2427        hash_join.next_unwrap_ready_barrier()?;
2428
2429        // push the 2nd left chunk
2430        tx_l.push_chunk(chunk_l2);
2431        let chunk = hash_join.next_unwrap_ready_chunk()?;
2432        assert_eq!(
2433            chunk,
2434            StreamChunk::from_pretty(
2435                "  I I
2436                 + 3 8 D
2437                 - 3 8 D",
2438            )
2439        );
2440
2441        // push the 1st right chunk
2442        tx_r.push_chunk(chunk_r1);
2443        let chunk = hash_join.next_unwrap_ready_chunk()?;
2444        assert_eq!(
2445            chunk,
2446            StreamChunk::from_pretty(
2447                " I I
2448                - 2 5"
2449            )
2450        );
2451
2452        // push the 2nd right chunk
2453        tx_r.push_chunk(chunk_r2);
2454        let chunk = hash_join.next_unwrap_ready_chunk()?;
2455        assert_eq!(
2456            chunk,
2457            StreamChunk::from_pretty(
2458                " I I
2459                - 3 6
2460                - 1 4"
2461            )
2462        );
2463
2464        // push the 3rd left chunk (tests forward_exactly_once)
2465        tx_l.push_chunk(chunk_l3);
2466        let chunk = hash_join.next_unwrap_ready_chunk()?;
2467        assert_eq!(
2468            chunk,
2469            StreamChunk::from_pretty(
2470                " I I
2471                + 9 10"
2472            )
2473        );
2474
2475        // push the 3rd right chunk
2476        // (tests that no change if there are still matches)
2477        tx_r.push_chunk(chunk_r3);
2478        hash_join.next_unwrap_pending();
2479
2480        // push the 4th right chunk
2481        // (tests that insertion occurs when there are no more matches)
2482        tx_r.push_chunk(chunk_r4);
2483        let chunk = hash_join.next_unwrap_ready_chunk()?;
2484        assert_eq!(
2485            chunk,
2486            StreamChunk::from_pretty(
2487                " I I
2488                + 1 4"
2489            )
2490        );
2491
2492        Ok(())
2493    }
2494
2495    #[tokio::test]
2496    async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2497        let chunk_r1 = StreamChunk::from_pretty(
2498            "  I I
2499             + 1 4
2500             + 2 5
2501             + 3 6",
2502        );
2503        let chunk_r2 = StreamChunk::from_pretty(
2504            "  I I
2505             + 3 8
2506             - 3 8",
2507        );
2508        let chunk_l1 = StreamChunk::from_pretty(
2509            "  I I
2510             + 2 7
2511             + 4 8
2512             + 6 9",
2513        );
2514        let chunk_l2 = StreamChunk::from_pretty(
2515            "  I  I
2516             + 3 10
2517             + 6 11
2518             + 1 2
2519             + 1 3",
2520        );
2521        let chunk_r3 = StreamChunk::from_pretty(
2522            "  I I
2523             + 9 10",
2524        );
2525        let chunk_l3 = StreamChunk::from_pretty(
2526            "  I I
2527             - 1 2",
2528        );
2529        let chunk_l4 = StreamChunk::from_pretty(
2530            "  I I
2531             - 1 3",
2532        );
2533        let (mut tx_r, mut tx_l, mut hash_join) =
2534            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2535
2536        // push the init barrier for left and right
2537        tx_r.push_barrier(test_epoch(1), false);
2538        tx_l.push_barrier(test_epoch(1), false);
2539        hash_join.next_unwrap_ready_barrier()?;
2540
2541        // push the 1st right chunk
2542        tx_r.push_chunk(chunk_r1);
2543        let chunk = hash_join.next_unwrap_ready_chunk()?;
2544        assert_eq!(
2545            chunk,
2546            StreamChunk::from_pretty(
2547                " I I
2548                + 1 4
2549                + 2 5
2550                + 3 6",
2551            )
2552        );
2553
2554        // push the init barrier for left and right
2555        tx_r.push_barrier(test_epoch(2), false);
2556        tx_l.push_barrier(test_epoch(2), false);
2557        hash_join.next_unwrap_ready_barrier()?;
2558
2559        // push the 2nd right chunk
2560        tx_r.push_chunk(chunk_r2);
2561        let chunk = hash_join.next_unwrap_ready_chunk()?;
2562        assert_eq!(
2563            chunk,
2564            StreamChunk::from_pretty(
2565                "  I I
2566                 + 3 8 D
2567                 - 3 8 D",
2568            )
2569        );
2570
2571        // push the 1st left chunk
2572        tx_l.push_chunk(chunk_l1);
2573        let chunk = hash_join.next_unwrap_ready_chunk()?;
2574        assert_eq!(
2575            chunk,
2576            StreamChunk::from_pretty(
2577                " I I
2578                - 2 5"
2579            )
2580        );
2581
2582        // push the 2nd left chunk
2583        tx_l.push_chunk(chunk_l2);
2584        let chunk = hash_join.next_unwrap_ready_chunk()?;
2585        assert_eq!(
2586            chunk,
2587            StreamChunk::from_pretty(
2588                " I I
2589                - 3 6
2590                - 1 4"
2591            )
2592        );
2593
2594        // push the 3rd right chunk (tests forward_exactly_once)
2595        tx_r.push_chunk(chunk_r3);
2596        let chunk = hash_join.next_unwrap_ready_chunk()?;
2597        assert_eq!(
2598            chunk,
2599            StreamChunk::from_pretty(
2600                " I I
2601                + 9 10"
2602            )
2603        );
2604
2605        // push the 3rd left chunk
2606        // (tests that no change if there are still matches)
2607        tx_l.push_chunk(chunk_l3);
2608        hash_join.next_unwrap_pending();
2609
2610        // push the 4th left chunk
2611        // (tests that insertion occurs when there are no more matches)
2612        tx_l.push_chunk(chunk_l4);
2613        let chunk = hash_join.next_unwrap_ready_chunk()?;
2614        assert_eq!(
2615            chunk,
2616            StreamChunk::from_pretty(
2617                " I I
2618                + 1 4"
2619            )
2620        );
2621
2622        Ok(())
2623    }
2624
2625    #[tokio::test]
2626    async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2627        let chunk_l1 = StreamChunk::from_pretty(
2628            "  I I
2629             + 1 4
2630             + 2 5
2631             + 3 6",
2632        );
2633        let chunk_l2 = StreamChunk::from_pretty(
2634            "  I I
2635             + 6 8
2636             + 3 8",
2637        );
2638        let chunk_r1 = StreamChunk::from_pretty(
2639            "  I I
2640             + 2 7
2641             + 4 8
2642             + 6 9",
2643        );
2644        let chunk_r2 = StreamChunk::from_pretty(
2645            "  I  I
2646             + 3 10
2647             + 6 11",
2648        );
2649        let (mut tx_l, mut tx_r, mut hash_join) =
2650            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2651
2652        // push the init barrier for left and right
2653        tx_l.push_barrier(test_epoch(1), false);
2654        tx_r.push_barrier(test_epoch(1), false);
2655        hash_join.next_unwrap_ready_barrier()?;
2656
2657        // push the 1st left chunk
2658        tx_l.push_chunk(chunk_l1);
2659        hash_join.next_unwrap_pending();
2660
2661        // push a barrier to left side
2662        tx_l.push_barrier(test_epoch(2), false);
2663
2664        // push the 2nd left chunk
2665        tx_l.push_chunk(chunk_l2);
2666
2667        // join the first right chunk
2668        tx_r.push_chunk(chunk_r1);
2669
2670        // Consume stream chunk
2671        let chunk = hash_join.next_unwrap_ready_chunk()?;
2672        assert_eq!(
2673            chunk,
2674            StreamChunk::from_pretty(
2675                " I I I I
2676                + 2 5 2 7"
2677            )
2678        );
2679
2680        // push a barrier to right side
2681        tx_r.push_barrier(test_epoch(2), false);
2682
2683        // get the aligned barrier here
2684        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2685        assert!(matches!(
2686            hash_join.next_unwrap_ready_barrier()?,
2687            Barrier {
2688                epoch,
2689                mutation: None,
2690                ..
2691            } if epoch == expected_epoch
2692        ));
2693
2694        // join the 2nd left chunk
2695        let chunk = hash_join.next_unwrap_ready_chunk()?;
2696        assert_eq!(
2697            chunk,
2698            StreamChunk::from_pretty(
2699                " I I I I
2700                + 6 8 6 9"
2701            )
2702        );
2703
2704        // push the 2nd right chunk
2705        tx_r.push_chunk(chunk_r2);
2706        let chunk = hash_join.next_unwrap_ready_chunk()?;
2707        assert_eq!(
2708            chunk,
2709            StreamChunk::from_pretty(
2710                " I I I I
2711                + 3 6 3 10
2712                + 3 8 3 10
2713                + 6 8 6 11"
2714            )
2715        );
2716
2717        Ok(())
2718    }
2719
2720    #[tokio::test]
2721    async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2722        let chunk_l1 = StreamChunk::from_pretty(
2723            "  I I
2724             + 1 4
2725             + 2 .
2726             + 3 .",
2727        );
2728        let chunk_l2 = StreamChunk::from_pretty(
2729            "  I I
2730             + 6 .
2731             + 3 8",
2732        );
2733        let chunk_r1 = StreamChunk::from_pretty(
2734            "  I I
2735             + 2 7
2736             + 4 8
2737             + 6 9",
2738        );
2739        let chunk_r2 = StreamChunk::from_pretty(
2740            "  I  I
2741             + 3 10
2742             + 6 11",
2743        );
2744        let (mut tx_l, mut tx_r, mut hash_join) =
2745            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2746
2747        // push the init barrier for left and right
2748        tx_l.push_barrier(test_epoch(1), false);
2749        tx_r.push_barrier(test_epoch(1), false);
2750        hash_join.next_unwrap_ready_barrier()?;
2751
2752        // push the 1st left chunk
2753        tx_l.push_chunk(chunk_l1);
2754        hash_join.next_unwrap_pending();
2755
2756        // push a barrier to left side
2757        tx_l.push_barrier(test_epoch(2), false);
2758
2759        // push the 2nd left chunk
2760        tx_l.push_chunk(chunk_l2);
2761
2762        // join the first right chunk
2763        tx_r.push_chunk(chunk_r1);
2764
2765        // Consume stream chunk
2766        let chunk = hash_join.next_unwrap_ready_chunk()?;
2767        assert_eq!(
2768            chunk,
2769            StreamChunk::from_pretty(
2770                " I I I I
2771                + 2 . 2 7"
2772            )
2773        );
2774
2775        // push a barrier to right side
2776        tx_r.push_barrier(test_epoch(2), false);
2777
2778        // get the aligned barrier here
2779        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2780        assert!(matches!(
2781            hash_join.next_unwrap_ready_barrier()?,
2782            Barrier {
2783                epoch,
2784                mutation: None,
2785                ..
2786            } if epoch == expected_epoch
2787        ));
2788
2789        // join the 2nd left chunk
2790        let chunk = hash_join.next_unwrap_ready_chunk()?;
2791        assert_eq!(
2792            chunk,
2793            StreamChunk::from_pretty(
2794                " I I I I
2795                + 6 . 6 9"
2796            )
2797        );
2798
2799        // push the 2nd right chunk
2800        tx_r.push_chunk(chunk_r2);
2801        let chunk = hash_join.next_unwrap_ready_chunk()?;
2802        assert_eq!(
2803            chunk,
2804            StreamChunk::from_pretty(
2805                " I I I I
2806                + 3 8 3 10
2807                + 3 . 3 10
2808                + 6 . 6 11"
2809            )
2810        );
2811
2812        Ok(())
2813    }
2814
2815    #[tokio::test]
2816    async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2817        let chunk_l1 = StreamChunk::from_pretty(
2818            "  I I
2819             + 1 4
2820             + 2 5
2821             + 3 6",
2822        );
2823        let chunk_l2 = StreamChunk::from_pretty(
2824            "  I I
2825             + 3 8
2826             - 3 8",
2827        );
2828        let chunk_r1 = StreamChunk::from_pretty(
2829            "  I I
2830             + 2 7
2831             + 4 8
2832             + 6 9",
2833        );
2834        let chunk_r2 = StreamChunk::from_pretty(
2835            "  I  I
2836             + 3 10
2837             + 6 11",
2838        );
2839        let (mut tx_l, mut tx_r, mut hash_join) =
2840            create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2841
2842        // push the init barrier for left and right
2843        tx_l.push_barrier(test_epoch(1), false);
2844        tx_r.push_barrier(test_epoch(1), false);
2845        hash_join.next_unwrap_ready_barrier()?;
2846
2847        // push the 1st left chunk
2848        tx_l.push_chunk(chunk_l1);
2849        let chunk = hash_join.next_unwrap_ready_chunk()?;
2850        assert_eq!(
2851            chunk,
2852            StreamChunk::from_pretty(
2853                " I I I I
2854                + 1 4 . .
2855                + 2 5 . .
2856                + 3 6 . ."
2857            )
2858        );
2859
2860        // push the 2nd left chunk
2861        tx_l.push_chunk(chunk_l2);
2862        let chunk = hash_join.next_unwrap_ready_chunk()?;
2863        assert_eq!(
2864            chunk,
2865            StreamChunk::from_pretty(
2866                " I I I I
2867                + 3 8 . . D
2868                - 3 8 . . D"
2869            )
2870        );
2871
2872        // push the 1st right chunk
2873        tx_r.push_chunk(chunk_r1);
2874        let chunk = hash_join.next_unwrap_ready_chunk()?;
2875        assert_eq!(
2876            chunk,
2877            StreamChunk::from_pretty(
2878                " I I I I
2879                - 2 5 . .
2880                + 2 5 2 7"
2881            )
2882        );
2883
2884        // push the 2nd right chunk
2885        tx_r.push_chunk(chunk_r2);
2886        let chunk = hash_join.next_unwrap_ready_chunk()?;
2887        assert_eq!(
2888            chunk,
2889            StreamChunk::from_pretty(
2890                " I I I I
2891                - 3 6 . .
2892                + 3 6 3 10"
2893            )
2894        );
2895
2896        Ok(())
2897    }
2898
2899    #[tokio::test]
2900    async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2901        let chunk_l1 = StreamChunk::from_pretty(
2902            "  I I
2903             + 1 4
2904             + 2 5
2905             + . 6",
2906        );
2907        let chunk_l2 = StreamChunk::from_pretty(
2908            "  I I
2909             + . 8
2910             - . 8",
2911        );
2912        let chunk_r1 = StreamChunk::from_pretty(
2913            "  I I
2914             + 2 7
2915             + 4 8
2916             + 6 9",
2917        );
2918        let chunk_r2 = StreamChunk::from_pretty(
2919            "  I  I
2920             + . 10
2921             + 6 11",
2922        );
2923        let (mut tx_l, mut tx_r, mut hash_join) =
2924            create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2925
2926        // push the init barrier for left and right
2927        tx_l.push_barrier(test_epoch(1), false);
2928        tx_r.push_barrier(test_epoch(1), false);
2929        hash_join.next_unwrap_ready_barrier()?;
2930
2931        // push the 1st left chunk
2932        tx_l.push_chunk(chunk_l1);
2933        let chunk = hash_join.next_unwrap_ready_chunk()?;
2934        assert_eq!(
2935            chunk,
2936            StreamChunk::from_pretty(
2937                " I I I I
2938                + 1 4 . .
2939                + 2 5 . .
2940                + . 6 . ."
2941            )
2942        );
2943
2944        // push the 2nd left chunk
2945        tx_l.push_chunk(chunk_l2);
2946        let chunk = hash_join.next_unwrap_ready_chunk()?;
2947        assert_eq!(
2948            chunk,
2949            StreamChunk::from_pretty(
2950                " I I I I
2951                + . 8 . . D
2952                - . 8 . . D"
2953            )
2954        );
2955
2956        // push the 1st right chunk
2957        tx_r.push_chunk(chunk_r1);
2958        let chunk = hash_join.next_unwrap_ready_chunk()?;
2959        assert_eq!(
2960            chunk,
2961            StreamChunk::from_pretty(
2962                " I I I I
2963                - 2 5 . .
2964                + 2 5 2 7"
2965            )
2966        );
2967
2968        // push the 2nd right chunk
2969        tx_r.push_chunk(chunk_r2);
2970        let chunk = hash_join.next_unwrap_ready_chunk()?;
2971        assert_eq!(
2972            chunk,
2973            StreamChunk::from_pretty(
2974                " I I I I
2975                - . 6 . .
2976                + . 6 . 10"
2977            )
2978        );
2979
2980        Ok(())
2981    }
2982
2983    #[tokio::test]
2984    async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2985        let chunk_l1 = StreamChunk::from_pretty(
2986            "  I I
2987             + 1 4
2988             + 2 5
2989             + 3 6",
2990        );
2991        let chunk_l2 = StreamChunk::from_pretty(
2992            "  I I
2993             + 3 8
2994             - 3 8",
2995        );
2996        let chunk_r1 = StreamChunk::from_pretty(
2997            "  I I
2998             + 2 7
2999             + 4 8
3000             + 6 9",
3001        );
3002        let chunk_r2 = StreamChunk::from_pretty(
3003            "  I  I
3004             + 5 10
3005             - 5 10",
3006        );
3007        let (mut tx_l, mut tx_r, mut hash_join) =
3008            create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
3009
3010        // push the init barrier for left and right
3011        tx_l.push_barrier(test_epoch(1), false);
3012        tx_r.push_barrier(test_epoch(1), false);
3013        hash_join.next_unwrap_ready_barrier()?;
3014
3015        // push the 1st left chunk
3016        tx_l.push_chunk(chunk_l1);
3017        hash_join.next_unwrap_pending();
3018
3019        // push the 2nd left chunk
3020        tx_l.push_chunk(chunk_l2);
3021        hash_join.next_unwrap_pending();
3022
3023        // push the 1st right chunk
3024        tx_r.push_chunk(chunk_r1);
3025        let chunk = hash_join.next_unwrap_ready_chunk()?;
3026        assert_eq!(
3027            chunk,
3028            StreamChunk::from_pretty(
3029                " I I I I
3030                + 2 5 2 7
3031                + . . 4 8
3032                + . . 6 9"
3033            )
3034        );
3035
3036        // push the 2nd right chunk
3037        tx_r.push_chunk(chunk_r2);
3038        let chunk = hash_join.next_unwrap_ready_chunk()?;
3039        assert_eq!(
3040            chunk,
3041            StreamChunk::from_pretty(
3042                " I I I I
3043                + . . 5 10 D
3044                - . . 5 10 D"
3045            )
3046        );
3047
3048        Ok(())
3049    }
3050
3051    #[tokio::test]
3052    async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3053        let chunk_l1 = StreamChunk::from_pretty(
3054            "  I I I
3055             + 1 4 1
3056             + 2 5 2
3057             + 3 6 3",
3058        );
3059        let chunk_l2 = StreamChunk::from_pretty(
3060            "  I I I
3061             + 4 9 4
3062             + 5 10 5",
3063        );
3064        let chunk_r1 = StreamChunk::from_pretty(
3065            "  I I I
3066             + 2 5 1
3067             + 4 9 2
3068             + 6 9 3",
3069        );
3070        let chunk_r2 = StreamChunk::from_pretty(
3071            "  I I I
3072             + 1 4 4
3073             + 3 6 5",
3074        );
3075
3076        let (mut tx_l, mut tx_r, mut hash_join) =
3077            create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3078
3079        // push the init barrier for left and right
3080        tx_l.push_barrier(test_epoch(1), false);
3081        tx_r.push_barrier(test_epoch(1), false);
3082        hash_join.next_unwrap_ready_barrier()?;
3083
3084        // push the 1st left chunk
3085        tx_l.push_chunk(chunk_l1);
3086        let chunk = hash_join.next_unwrap_ready_chunk()?;
3087        assert_eq!(
3088            chunk,
3089            StreamChunk::from_pretty(
3090                " I I I I I I
3091                + 1 4 1 . . .
3092                + 2 5 2 . . .
3093                + 3 6 3 . . ."
3094            )
3095        );
3096
3097        // push the 2nd left chunk
3098        tx_l.push_chunk(chunk_l2);
3099        let chunk = hash_join.next_unwrap_ready_chunk()?;
3100        assert_eq!(
3101            chunk,
3102            StreamChunk::from_pretty(
3103                " I I I I I I
3104                + 4 9 4 . . .
3105                + 5 10 5 . . ."
3106            )
3107        );
3108
3109        // push the 1st right chunk
3110        tx_r.push_chunk(chunk_r1);
3111        let chunk = hash_join.next_unwrap_ready_chunk()?;
3112        assert_eq!(
3113            chunk,
3114            StreamChunk::from_pretty(
3115                " I I I I I I
3116                - 2 5 2 . . .
3117                + 2 5 2 2 5 1
3118                - 4 9 4 . . .
3119                + 4 9 4 4 9 2"
3120            )
3121        );
3122
3123        // push the 2nd right chunk
3124        tx_r.push_chunk(chunk_r2);
3125        let chunk = hash_join.next_unwrap_ready_chunk()?;
3126        assert_eq!(
3127            chunk,
3128            StreamChunk::from_pretty(
3129                " I I I I I I
3130                - 1 4 1 . . .
3131                + 1 4 1 1 4 4
3132                - 3 6 3 . . .
3133                + 3 6 3 3 6 5"
3134            )
3135        );
3136
3137        Ok(())
3138    }
3139
3140    #[tokio::test]
3141    async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3142        let chunk_l1 = StreamChunk::from_pretty(
3143            "  I I I
3144             + 1 4 1
3145             + 2 5 2
3146             + 3 6 3",
3147        );
3148        let chunk_l2 = StreamChunk::from_pretty(
3149            "  I I I
3150             + 4 9 4
3151             + 5 10 5",
3152        );
3153        let chunk_r1 = StreamChunk::from_pretty(
3154            "  I I I
3155             + 2 5 1
3156             + 4 9 2
3157             + 6 9 3",
3158        );
3159        let chunk_r2 = StreamChunk::from_pretty(
3160            "  I I I
3161             + 1 4 4
3162             + 3 6 5
3163             + 7 7 6",
3164        );
3165
3166        let (mut tx_l, mut tx_r, mut hash_join) =
3167            create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3168
3169        // push the init barrier for left and right
3170        tx_l.push_barrier(test_epoch(1), false);
3171        tx_r.push_barrier(test_epoch(1), false);
3172        hash_join.next_unwrap_ready_barrier()?;
3173
3174        // push the 1st left chunk
3175        tx_l.push_chunk(chunk_l1);
3176        hash_join.next_unwrap_pending();
3177
3178        // push the 2nd left chunk
3179        tx_l.push_chunk(chunk_l2);
3180        hash_join.next_unwrap_pending();
3181
3182        // push the 1st right chunk
3183        tx_r.push_chunk(chunk_r1);
3184        let chunk = hash_join.next_unwrap_ready_chunk()?;
3185        assert_eq!(
3186            chunk,
3187            StreamChunk::from_pretty(
3188                "  I I I I I I
3189                + 2 5 2 2 5 1
3190                + 4 9 4 4 9 2
3191                + . . . 6 9 3"
3192            )
3193        );
3194
3195        // push the 2nd right chunk
3196        tx_r.push_chunk(chunk_r2);
3197        let chunk = hash_join.next_unwrap_ready_chunk()?;
3198        assert_eq!(
3199            chunk,
3200            StreamChunk::from_pretty(
3201                "  I I I I I I
3202                + 1 4 1 1 4 4
3203                + 3 6 3 3 6 5
3204                + . . . 7 7 6"
3205            )
3206        );
3207
3208        Ok(())
3209    }
3210
3211    #[tokio::test]
3212    async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3213        let chunk_l1 = StreamChunk::from_pretty(
3214            "  I I
3215             + 1 4
3216             + 2 5
3217             + 3 6",
3218        );
3219        let chunk_l2 = StreamChunk::from_pretty(
3220            "  I I
3221             + 3 8
3222             - 3 8",
3223        );
3224        let chunk_r1 = StreamChunk::from_pretty(
3225            "  I I
3226             + 2 7
3227             + 4 8
3228             + 6 9",
3229        );
3230        let chunk_r2 = StreamChunk::from_pretty(
3231            "  I  I
3232             + 5 10
3233             - 5 10",
3234        );
3235        let (mut tx_l, mut tx_r, mut hash_join) =
3236            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3237
3238        // push the init barrier for left and right
3239        tx_l.push_barrier(test_epoch(1), false);
3240        tx_r.push_barrier(test_epoch(1), false);
3241        hash_join.next_unwrap_ready_barrier()?;
3242
3243        // push the 1st left chunk
3244        tx_l.push_chunk(chunk_l1);
3245        let chunk = hash_join.next_unwrap_ready_chunk()?;
3246        assert_eq!(
3247            chunk,
3248            StreamChunk::from_pretty(
3249                " I I I I
3250                + 1 4 . .
3251                + 2 5 . .
3252                + 3 6 . ."
3253            )
3254        );
3255
3256        // push the 2nd left chunk
3257        tx_l.push_chunk(chunk_l2);
3258        let chunk = hash_join.next_unwrap_ready_chunk()?;
3259        assert_eq!(
3260            chunk,
3261            StreamChunk::from_pretty(
3262                " I I I I
3263                + 3 8 . . D
3264                - 3 8 . . D"
3265            )
3266        );
3267
3268        // push the 1st right chunk
3269        tx_r.push_chunk(chunk_r1);
3270        let chunk = hash_join.next_unwrap_ready_chunk()?;
3271        assert_eq!(
3272            chunk,
3273            StreamChunk::from_pretty(
3274                " I I I I
3275                - 2 5 . .
3276                + 2 5 2 7
3277                + . . 4 8
3278                + . . 6 9"
3279            )
3280        );
3281
3282        // push the 2nd right chunk
3283        tx_r.push_chunk(chunk_r2);
3284        let chunk = hash_join.next_unwrap_ready_chunk()?;
3285        assert_eq!(
3286            chunk,
3287            StreamChunk::from_pretty(
3288                " I I I I
3289                + . . 5 10 D
3290                - . . 5 10 D"
3291            )
3292        );
3293
3294        Ok(())
3295    }
3296
3297    #[tokio::test]
3298    async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3299        let (mut tx_l, mut tx_r, mut hash_join) =
3300            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3301
3302        // push the init barrier for left and right
3303        tx_l.push_barrier(test_epoch(1), false);
3304        tx_r.push_barrier(test_epoch(1), false);
3305        hash_join.next_unwrap_ready_barrier()?;
3306
3307        tx_l.push_chunk(StreamChunk::from_pretty(
3308            "  I I
3309             + 1 1
3310            ",
3311        ));
3312        let chunk = hash_join.next_unwrap_ready_chunk()?;
3313        assert_eq!(
3314            chunk,
3315            StreamChunk::from_pretty(
3316                " I I I I
3317                + 1 1 . ."
3318            )
3319        );
3320
3321        tx_r.push_chunk(StreamChunk::from_pretty(
3322            "  I I
3323             + 1 1
3324            ",
3325        ));
3326        let chunk = hash_join.next_unwrap_ready_chunk()?;
3327
3328        assert_eq!(
3329            chunk,
3330            StreamChunk::from_pretty(
3331                " I I I I
3332                - 1 1 . .
3333                + 1 1 1 1"
3334            )
3335        );
3336
3337        tx_l.push_chunk(StreamChunk::from_pretty(
3338            "   I I
3339              - 1 1
3340              + 1 2
3341            ",
3342        ));
3343        let chunk = hash_join.next_unwrap_ready_chunk()?;
3344        let chunk = chunk.compact_vis();
3345        assert_eq!(
3346            chunk,
3347            StreamChunk::from_pretty(
3348                " I I I I
3349                - 1 1 1 1
3350                + 1 2 1 1
3351                "
3352            )
3353        );
3354
3355        Ok(())
3356    }
3357
3358    #[tokio::test]
3359    async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3360    {
3361        let chunk_l1 = StreamChunk::from_pretty(
3362            "  I I
3363             + 1 4
3364             + 2 5
3365             + 3 6
3366             + 3 7",
3367        );
3368        let chunk_l2 = StreamChunk::from_pretty(
3369            "  I I
3370             + 3 8
3371             - 3 8
3372             - 1 4", // delete row to cause an empty JoinHashEntry
3373        );
3374        let chunk_r1 = StreamChunk::from_pretty(
3375            "  I I
3376             + 2 6
3377             + 4 8
3378             + 3 4",
3379        );
3380        let chunk_r2 = StreamChunk::from_pretty(
3381            "  I  I
3382             + 5 10
3383             - 5 10
3384             + 1 2",
3385        );
3386        let (mut tx_l, mut tx_r, mut hash_join) =
3387            create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3388
3389        // push the init barrier for left and right
3390        tx_l.push_barrier(test_epoch(1), false);
3391        tx_r.push_barrier(test_epoch(1), false);
3392        hash_join.next_unwrap_ready_barrier()?;
3393
3394        // push the 1st left chunk
3395        tx_l.push_chunk(chunk_l1);
3396        let chunk = hash_join.next_unwrap_ready_chunk()?;
3397        assert_eq!(
3398            chunk,
3399            StreamChunk::from_pretty(
3400                " I I I I
3401                + 1 4 . .
3402                + 2 5 . .
3403                + 3 6 . .
3404                + 3 7 . ."
3405            )
3406        );
3407
3408        // push the 2nd left chunk
3409        tx_l.push_chunk(chunk_l2);
3410        let chunk = hash_join.next_unwrap_ready_chunk()?;
3411        assert_eq!(
3412            chunk,
3413            StreamChunk::from_pretty(
3414                " I I I I
3415                + 3 8 . . D
3416                - 3 8 . . D
3417                - 1 4 . ."
3418            )
3419        );
3420
3421        // push the 1st right chunk
3422        tx_r.push_chunk(chunk_r1);
3423        let chunk = hash_join.next_unwrap_ready_chunk()?;
3424        assert_eq!(
3425            chunk,
3426            StreamChunk::from_pretty(
3427                " I I I I
3428                - 2 5 . .
3429                + 2 5 2 6
3430                + . . 4 8
3431                + . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once
3432                            * despite matching on eq join on 2
3433                            * entries */
3434            )
3435        );
3436
3437        // push the 2nd right chunk
3438        tx_r.push_chunk(chunk_r2);
3439        let chunk = hash_join.next_unwrap_ready_chunk()?;
3440        assert_eq!(
3441            chunk,
3442            StreamChunk::from_pretty(
3443                " I I I I
3444                + . . 5 10 D
3445                - . . 5 10 D
3446                + . . 1 2" /* regression test (#2420): 1 2 forwarded even if matches on an empty
3447                            * join entry */
3448            )
3449        );
3450
3451        Ok(())
3452    }
3453
3454    #[tokio::test]
3455    async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3456        let chunk_l1 = StreamChunk::from_pretty(
3457            "  I I
3458             + 1 4
3459             + 2 10
3460             + 3 6",
3461        );
3462        let chunk_l2 = StreamChunk::from_pretty(
3463            "  I I
3464             + 3 8
3465             - 3 8",
3466        );
3467        let chunk_r1 = StreamChunk::from_pretty(
3468            "  I I
3469             + 2 7
3470             + 4 8
3471             + 6 9",
3472        );
3473        let chunk_r2 = StreamChunk::from_pretty(
3474            "  I  I
3475             + 3 10
3476             + 6 11",
3477        );
3478        let (mut tx_l, mut tx_r, mut hash_join) =
3479            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3480
3481        // push the init barrier for left and right
3482        tx_l.push_barrier(test_epoch(1), false);
3483        tx_r.push_barrier(test_epoch(1), false);
3484        hash_join.next_unwrap_ready_barrier()?;
3485
3486        // push the 1st left chunk
3487        tx_l.push_chunk(chunk_l1);
3488        hash_join.next_unwrap_pending();
3489
3490        // push the 2nd left chunk
3491        tx_l.push_chunk(chunk_l2);
3492        hash_join.next_unwrap_pending();
3493
3494        // push the 1st right chunk
3495        tx_r.push_chunk(chunk_r1);
3496        hash_join.next_unwrap_pending();
3497
3498        // push the 2nd right chunk
3499        tx_r.push_chunk(chunk_r2);
3500        let chunk = hash_join.next_unwrap_ready_chunk()?;
3501        assert_eq!(
3502            chunk,
3503            StreamChunk::from_pretty(
3504                " I I I I
3505                + 3 6 3 10"
3506            )
3507        );
3508
3509        Ok(())
3510    }
3511
3512    #[tokio::test]
3513    async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3514        let (mut tx_l, mut tx_r, mut hash_join) =
3515            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3516
3517        // push the init barrier for left and right
3518        tx_l.push_barrier(test_epoch(1), false);
3519        tx_r.push_barrier(test_epoch(1), false);
3520        hash_join.next_unwrap_ready_barrier()?;
3521
3522        tx_l.push_int64_watermark(0, 100);
3523
3524        tx_l.push_int64_watermark(0, 200);
3525
3526        tx_l.push_barrier(test_epoch(2), false);
3527        tx_r.push_barrier(test_epoch(2), false);
3528        hash_join.next_unwrap_ready_barrier()?;
3529
3530        tx_r.push_int64_watermark(0, 50);
3531
3532        let w1 = hash_join.next().await.unwrap().unwrap();
3533        let w1 = w1.as_watermark().unwrap();
3534
3535        let w2 = hash_join.next().await.unwrap().unwrap();
3536        let w2 = w2.as_watermark().unwrap();
3537
3538        tx_r.push_int64_watermark(0, 100);
3539
3540        let w3 = hash_join.next().await.unwrap().unwrap();
3541        let w3 = w3.as_watermark().unwrap();
3542
3543        let w4 = hash_join.next().await.unwrap().unwrap();
3544        let w4 = w4.as_watermark().unwrap();
3545
3546        assert_eq!(
3547            w1,
3548            &Watermark {
3549                col_idx: 2,
3550                data_type: DataType::Int64,
3551                val: ScalarImpl::Int64(50)
3552            }
3553        );
3554
3555        assert_eq!(
3556            w2,
3557            &Watermark {
3558                col_idx: 0,
3559                data_type: DataType::Int64,
3560                val: ScalarImpl::Int64(50)
3561            }
3562        );
3563
3564        assert_eq!(
3565            w3,
3566            &Watermark {
3567                col_idx: 2,
3568                data_type: DataType::Int64,
3569                val: ScalarImpl::Int64(100)
3570            }
3571        );
3572
3573        assert_eq!(
3574            w4,
3575            &Watermark {
3576                col_idx: 0,
3577                data_type: DataType::Int64,
3578                val: ScalarImpl::Int64(100)
3579            }
3580        );
3581
3582        Ok(())
3583    }
3584}