risingwave_stream/executor/
hash_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::assert_matches::assert_matches;
15use std::collections::{BTreeMap, HashSet};
16use std::marker::PhantomData;
17use std::time::Duration;
18
19use anyhow::Context;
20use itertools::Itertools;
21use multimap::MultiMap;
22use risingwave_common::array::Op;
23use risingwave_common::hash::{HashKey, NullBitmap};
24use risingwave_common::row::RowExt;
25use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_expr::ExprError;
29use risingwave_expr::expr::NonStrictExpression;
30use tokio::time::Instant;
31
32use self::builder::JoinChunkBuilder;
33use super::barrier_align::*;
34use super::join::hash_join::*;
35use super::join::row::{JoinEncoding, JoinRow};
36use super::join::*;
37use super::watermark::*;
38use crate::executor::CachedJoinRow;
39use crate::executor::join::builder::JoinStreamChunkBuilder;
40use crate::executor::join::hash_join::CacheResult;
41use crate::executor::prelude::*;
42
43/// Evict the cache every n rows.
44const EVICT_EVERY_N_ROWS: u32 = 16;
45
46fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
47    HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
48}
49
50pub struct JoinParams {
51    /// Indices of the join keys
52    pub join_key_indices: Vec<usize>,
53    /// Indices of the input pk after dedup
54    pub deduped_pk_indices: Vec<usize>,
55}
56
57impl JoinParams {
58    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
59        Self {
60            join_key_indices,
61            deduped_pk_indices,
62        }
63    }
64}
65
66struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
67    /// Store all data from a one side stream
68    ht: JoinHashMap<K, S, E>,
69    /// Indices of the join key columns
70    join_key_indices: Vec<usize>,
71    /// The data type of all columns without degree.
72    all_data_types: Vec<DataType>,
73    /// The start position for the side in output new columns
74    start_pos: usize,
75    /// The mapping from input indices of a side to output columes.
76    i2o_mapping: Vec<(usize, usize)>,
77    i2o_mapping_indexed: MultiMap<usize, usize>,
78    /// The first field of the ith element indicates that when a watermark at the ith column of
79    /// this side comes, what band join conditions should be updated in order to possibly
80    /// generate a new watermark at that column or the corresponding column in the counterpart
81    /// join side.
82    ///
83    /// The second field indicates that whether the column is required less than the
84    /// the corresponding column in the counterpart join side in the band join condition.
85    input2inequality_index: Vec<Vec<(usize, bool)>>,
86    /// Some fields which are required non null to match due to inequalities.
87    non_null_fields: Vec<usize>,
88    /// (i, j) in this `Vec` means that state data in this join side can be cleaned if the value of
89    /// its ith column is less than the synthetic watermark of the jth band join condition.
90    state_clean_columns: Vec<(usize, usize)>,
91    /// Whether degree table is needed for this side.
92    need_degree_table: bool,
93    _marker: std::marker::PhantomData<E>,
94}
95
96impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        f.debug_struct("JoinSide")
99            .field("join_key_indices", &self.join_key_indices)
100            .field("col_types", &self.all_data_types)
101            .field("start_pos", &self.start_pos)
102            .field("i2o_mapping", &self.i2o_mapping)
103            .field("need_degree_table", &self.need_degree_table)
104            .finish()
105    }
106}
107
108impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
109    // WARNING: Please do not call this until we implement it.
110    fn is_dirty(&self) -> bool {
111        unimplemented!()
112    }
113
114    #[expect(dead_code)]
115    fn clear_cache(&mut self) {
116        assert!(
117            !self.is_dirty(),
118            "cannot clear cache while states of hash join are dirty"
119        );
120
121        // TODO: not working with rearranged chain
122        // self.ht.clear();
123    }
124
125    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
126        self.ht.init(epoch).await
127    }
128}
129
130/// `HashJoinExecutor` takes two input streams and runs equal hash join on them.
131/// The output columns are the concatenation of left and right columns.
132pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
133{
134    ctx: ActorContextRef,
135    info: ExecutorInfo,
136
137    /// Left input executor
138    input_l: Option<Executor>,
139    /// Right input executor
140    input_r: Option<Executor>,
141    /// The data types of the formed new columns
142    actual_output_data_types: Vec<DataType>,
143    /// The parameters of the left join executor
144    side_l: JoinSide<K, S, E>,
145    /// The parameters of the right join executor
146    side_r: JoinSide<K, S, E>,
147    /// Optional non-equi join conditions
148    cond: Option<NonStrictExpression>,
149    /// Column indices of watermark output and offset expression of each inequality, respectively.
150    inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
151    /// The output watermark of each inequality condition and its value is the minimum of the
152    /// calculation result of both side. It will be used to generate watermark into downstream
153    /// and do state cleaning if `clean_state` field of that inequality is `true`.
154    inequality_watermarks: Vec<Option<Watermark>>,
155
156    /// Whether the logic can be optimized for append-only stream
157    append_only_optimize: bool,
158
159    metrics: Arc<StreamingMetrics>,
160    /// The maximum size of the chunk produced by executor at a time
161    chunk_size: usize,
162    /// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
163    cnt_rows_received: u32,
164
165    /// watermark column index -> `BufferedWatermarks`
166    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
167
168    /// When to alert high join amplification
169    high_join_amplification_threshold: usize,
170
171    /// Max number of rows that will be cached in the entry state.
172    entry_state_max_rows: usize,
173}
174
175impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
176    for HashJoinExecutor<K, S, T, E>
177{
178    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179        f.debug_struct("HashJoinExecutor")
180            .field("join_type", &T)
181            .field("input_left", &self.input_l.as_ref().unwrap().identity())
182            .field("input_right", &self.input_r.as_ref().unwrap().identity())
183            .field("side_l", &self.side_l)
184            .field("side_r", &self.side_r)
185            .field("pk_indices", &self.info.pk_indices)
186            .field("schema", &self.info.schema)
187            .field("actual_output_data_types", &self.actual_output_data_types)
188            .finish()
189    }
190}
191
192impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
193    for HashJoinExecutor<K, S, T, E>
194{
195    fn execute(self: Box<Self>) -> BoxedMessageStream {
196        self.into_stream().boxed()
197    }
198}
199
200struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
201    ctx: &'a ActorContextRef,
202    side_l: &'a mut JoinSide<K, S, E>,
203    side_r: &'a mut JoinSide<K, S, E>,
204    actual_output_data_types: &'a [DataType],
205    cond: &'a mut Option<NonStrictExpression>,
206    inequality_watermarks: &'a [Option<Watermark>],
207    chunk: StreamChunk,
208    append_only_optimize: bool,
209    chunk_size: usize,
210    cnt_rows_received: &'a mut u32,
211    high_join_amplification_threshold: usize,
212    entry_state_max_rows: usize,
213}
214
215impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
216    HashJoinExecutor<K, S, T, E>
217{
218    #[allow(clippy::too_many_arguments)]
219    pub fn new(
220        ctx: ActorContextRef,
221        info: ExecutorInfo,
222        input_l: Executor,
223        input_r: Executor,
224        params_l: JoinParams,
225        params_r: JoinParams,
226        null_safe: Vec<bool>,
227        output_indices: Vec<usize>,
228        cond: Option<NonStrictExpression>,
229        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
230        state_table_l: StateTable<S>,
231        degree_state_table_l: StateTable<S>,
232        state_table_r: StateTable<S>,
233        degree_state_table_r: StateTable<S>,
234        watermark_epoch: AtomicU64Ref,
235        is_append_only: bool,
236        metrics: Arc<StreamingMetrics>,
237        chunk_size: usize,
238        high_join_amplification_threshold: usize,
239    ) -> Self {
240        Self::new_with_cache_size(
241            ctx,
242            info,
243            input_l,
244            input_r,
245            params_l,
246            params_r,
247            null_safe,
248            output_indices,
249            cond,
250            inequality_pairs,
251            state_table_l,
252            degree_state_table_l,
253            state_table_r,
254            degree_state_table_r,
255            watermark_epoch,
256            is_append_only,
257            metrics,
258            chunk_size,
259            high_join_amplification_threshold,
260            None,
261        )
262    }
263
264    #[allow(clippy::too_many_arguments)]
265    pub fn new_with_cache_size(
266        ctx: ActorContextRef,
267        info: ExecutorInfo,
268        input_l: Executor,
269        input_r: Executor,
270        params_l: JoinParams,
271        params_r: JoinParams,
272        null_safe: Vec<bool>,
273        output_indices: Vec<usize>,
274        cond: Option<NonStrictExpression>,
275        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
276        state_table_l: StateTable<S>,
277        degree_state_table_l: StateTable<S>,
278        state_table_r: StateTable<S>,
279        degree_state_table_r: StateTable<S>,
280        watermark_epoch: AtomicU64Ref,
281        is_append_only: bool,
282        metrics: Arc<StreamingMetrics>,
283        chunk_size: usize,
284        high_join_amplification_threshold: usize,
285        entry_state_max_rows: Option<usize>,
286    ) -> Self {
287        let entry_state_max_rows = match entry_state_max_rows {
288            None => {
289                ctx.streaming_config
290                    .developer
291                    .hash_join_entry_state_max_rows
292            }
293            Some(entry_state_max_rows) => entry_state_max_rows,
294        };
295        let side_l_column_n = input_l.schema().len();
296
297        let schema_fields = match T {
298            JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
299            JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
300            _ => [
301                input_l.schema().fields.clone(),
302                input_r.schema().fields.clone(),
303            ]
304            .concat(),
305        };
306
307        let original_output_data_types = schema_fields
308            .iter()
309            .map(|field| field.data_type())
310            .collect_vec();
311        let actual_output_data_types = output_indices
312            .iter()
313            .map(|&idx| original_output_data_types[idx].clone())
314            .collect_vec();
315
316        // Data types of of hash join state.
317        let state_all_data_types_l = input_l.schema().data_types();
318        let state_all_data_types_r = input_r.schema().data_types();
319
320        let state_pk_indices_l = input_l.pk_indices().to_vec();
321        let state_pk_indices_r = input_r.pk_indices().to_vec();
322
323        let state_join_key_indices_l = params_l.join_key_indices;
324        let state_join_key_indices_r = params_r.join_key_indices;
325
326        let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
327        let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
328
329        let degree_pk_indices_l = (state_join_key_indices_l.len()
330            ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
331            .collect_vec();
332        let degree_pk_indices_r = (state_join_key_indices_r.len()
333            ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
334            .collect_vec();
335
336        // If pk is contained in join key.
337        let pk_contained_in_jk_l =
338            is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
339        let pk_contained_in_jk_r =
340            is_subset(state_pk_indices_r.clone(), state_join_key_indices_r.clone());
341
342        // check whether join key contains pk in both side
343        let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
344
345        let join_key_data_types_l = state_join_key_indices_l
346            .iter()
347            .map(|idx| state_all_data_types_l[*idx].clone())
348            .collect_vec();
349
350        let join_key_data_types_r = state_join_key_indices_r
351            .iter()
352            .map(|idx| state_all_data_types_r[*idx].clone())
353            .collect_vec();
354
355        assert_eq!(join_key_data_types_l, join_key_data_types_r);
356
357        let null_matched = K::Bitmap::from_bool_vec(null_safe);
358
359        let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
360        let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
361
362        let degree_state_l = need_degree_table_l.then(|| {
363            TableInner::new(
364                degree_pk_indices_l,
365                degree_join_key_indices_l,
366                degree_state_table_l,
367            )
368        });
369        let degree_state_r = need_degree_table_r.then(|| {
370            TableInner::new(
371                degree_pk_indices_r,
372                degree_join_key_indices_r,
373                degree_state_table_r,
374            )
375        });
376
377        let (left_to_output, right_to_output) = {
378            let (left_len, right_len) = if is_left_semi_or_anti(T) {
379                (state_all_data_types_l.len(), 0usize)
380            } else if is_right_semi_or_anti(T) {
381                (0usize, state_all_data_types_r.len())
382            } else {
383                (state_all_data_types_l.len(), state_all_data_types_r.len())
384            };
385            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
386        };
387
388        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
389        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
390
391        let left_input_len = input_l.schema().len();
392        let right_input_len = input_r.schema().len();
393        let mut l2inequality_index = vec![vec![]; left_input_len];
394        let mut r2inequality_index = vec![vec![]; right_input_len];
395        let mut l_state_clean_columns = vec![];
396        let mut r_state_clean_columns = vec![];
397        let inequality_pairs = inequality_pairs
398            .into_iter()
399            .enumerate()
400            .map(
401                |(
402                    index,
403                    (key_required_larger, key_required_smaller, clean_state, delta_expression),
404                )| {
405                    let output_indices = if key_required_larger < key_required_smaller {
406                        if clean_state {
407                            l_state_clean_columns.push((key_required_larger, index));
408                        }
409                        l2inequality_index[key_required_larger].push((index, false));
410                        r2inequality_index[key_required_smaller - left_input_len]
411                            .push((index, true));
412                        l2o_indexed
413                            .get_vec(&key_required_larger)
414                            .cloned()
415                            .unwrap_or_default()
416                    } else {
417                        if clean_state {
418                            r_state_clean_columns
419                                .push((key_required_larger - left_input_len, index));
420                        }
421                        l2inequality_index[key_required_smaller].push((index, true));
422                        r2inequality_index[key_required_larger - left_input_len]
423                            .push((index, false));
424                        r2o_indexed
425                            .get_vec(&(key_required_larger - left_input_len))
426                            .cloned()
427                            .unwrap_or_default()
428                    };
429                    (output_indices, delta_expression)
430                },
431            )
432            .collect_vec();
433
434        let mut l_non_null_fields = l2inequality_index
435            .iter()
436            .positions(|inequalities| !inequalities.is_empty())
437            .collect_vec();
438        let mut r_non_null_fields = r2inequality_index
439            .iter()
440            .positions(|inequalities| !inequalities.is_empty())
441            .collect_vec();
442
443        if append_only_optimize {
444            l_state_clean_columns.clear();
445            r_state_clean_columns.clear();
446            l_non_null_fields.clear();
447            r_non_null_fields.clear();
448        }
449
450        let inequality_watermarks = vec![None; inequality_pairs.len()];
451        let watermark_buffers = BTreeMap::new();
452
453        Self {
454            ctx: ctx.clone(),
455            info,
456            input_l: Some(input_l),
457            input_r: Some(input_r),
458            actual_output_data_types,
459            side_l: JoinSide {
460                ht: JoinHashMap::new(
461                    watermark_epoch.clone(),
462                    join_key_data_types_l,
463                    state_join_key_indices_l.clone(),
464                    state_all_data_types_l.clone(),
465                    state_table_l,
466                    params_l.deduped_pk_indices,
467                    degree_state_l,
468                    null_matched.clone(),
469                    pk_contained_in_jk_l,
470                    None,
471                    metrics.clone(),
472                    ctx.id,
473                    ctx.fragment_id,
474                    "left",
475                ),
476                join_key_indices: state_join_key_indices_l,
477                all_data_types: state_all_data_types_l,
478                i2o_mapping: left_to_output,
479                i2o_mapping_indexed: l2o_indexed,
480                input2inequality_index: l2inequality_index,
481                non_null_fields: l_non_null_fields,
482                state_clean_columns: l_state_clean_columns,
483                start_pos: 0,
484                need_degree_table: need_degree_table_l,
485                _marker: PhantomData,
486            },
487            side_r: JoinSide {
488                ht: JoinHashMap::new(
489                    watermark_epoch,
490                    join_key_data_types_r,
491                    state_join_key_indices_r.clone(),
492                    state_all_data_types_r.clone(),
493                    state_table_r,
494                    params_r.deduped_pk_indices,
495                    degree_state_r,
496                    null_matched,
497                    pk_contained_in_jk_r,
498                    None,
499                    metrics.clone(),
500                    ctx.id,
501                    ctx.fragment_id,
502                    "right",
503                ),
504                join_key_indices: state_join_key_indices_r,
505                all_data_types: state_all_data_types_r,
506                start_pos: side_l_column_n,
507                i2o_mapping: right_to_output,
508                i2o_mapping_indexed: r2o_indexed,
509                input2inequality_index: r2inequality_index,
510                non_null_fields: r_non_null_fields,
511                state_clean_columns: r_state_clean_columns,
512                need_degree_table: need_degree_table_r,
513                _marker: PhantomData,
514            },
515            cond,
516            inequality_pairs,
517            inequality_watermarks,
518            append_only_optimize,
519            metrics,
520            chunk_size,
521            cnt_rows_received: 0,
522            watermark_buffers,
523            high_join_amplification_threshold,
524            entry_state_max_rows,
525        }
526    }
527
528    #[try_stream(ok = Message, error = StreamExecutorError)]
529    async fn into_stream(mut self) {
530        let input_l = self.input_l.take().unwrap();
531        let input_r = self.input_r.take().unwrap();
532        let aligned_stream = barrier_align(
533            input_l.execute(),
534            input_r.execute(),
535            self.ctx.id,
536            self.ctx.fragment_id,
537            self.metrics.clone(),
538            "Join",
539        );
540        pin_mut!(aligned_stream);
541
542        let actor_id = self.ctx.id;
543
544        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
545        let first_epoch = barrier.epoch;
546        // The first barrier message should be propagated.
547        yield Message::Barrier(barrier);
548        self.side_l.init(first_epoch).await?;
549        self.side_r.init(first_epoch).await?;
550
551        let actor_id_str = self.ctx.id.to_string();
552        let fragment_id_str = self.ctx.fragment_id.to_string();
553
554        // initialized some metrics
555        let join_actor_input_waiting_duration_ns = self
556            .metrics
557            .join_actor_input_waiting_duration_ns
558            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
559        let left_join_match_duration_ns = self
560            .metrics
561            .join_match_duration_ns
562            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
563        let right_join_match_duration_ns = self
564            .metrics
565            .join_match_duration_ns
566            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
567
568        let barrier_join_match_duration_ns = self
569            .metrics
570            .join_match_duration_ns
571            .with_guarded_label_values(&[
572                actor_id_str.as_str(),
573                fragment_id_str.as_str(),
574                "barrier",
575            ]);
576
577        let left_join_cached_entry_count = self
578            .metrics
579            .join_cached_entry_count
580            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
581
582        let right_join_cached_entry_count = self
583            .metrics
584            .join_cached_entry_count
585            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
586
587        let mut start_time = Instant::now();
588
589        while let Some(msg) = aligned_stream
590            .next()
591            .instrument_await("hash_join_barrier_align")
592            .await
593        {
594            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
595            match msg? {
596                AlignedMessage::WatermarkLeft(watermark) => {
597                    for watermark_to_emit in
598                        self.handle_watermark(SideType::Left, watermark).await?
599                    {
600                        yield Message::Watermark(watermark_to_emit);
601                    }
602                }
603                AlignedMessage::WatermarkRight(watermark) => {
604                    for watermark_to_emit in
605                        self.handle_watermark(SideType::Right, watermark).await?
606                    {
607                        yield Message::Watermark(watermark_to_emit);
608                    }
609                }
610                AlignedMessage::Left(chunk) => {
611                    let mut left_time = Duration::from_nanos(0);
612                    let mut left_start_time = Instant::now();
613                    #[for_await]
614                    for chunk in Self::eq_join_left(EqJoinArgs {
615                        ctx: &self.ctx,
616                        side_l: &mut self.side_l,
617                        side_r: &mut self.side_r,
618                        actual_output_data_types: &self.actual_output_data_types,
619                        cond: &mut self.cond,
620                        inequality_watermarks: &self.inequality_watermarks,
621                        chunk,
622                        append_only_optimize: self.append_only_optimize,
623                        chunk_size: self.chunk_size,
624                        cnt_rows_received: &mut self.cnt_rows_received,
625                        high_join_amplification_threshold: self.high_join_amplification_threshold,
626                        entry_state_max_rows: self.entry_state_max_rows,
627                    }) {
628                        left_time += left_start_time.elapsed();
629                        yield Message::Chunk(chunk?);
630                        left_start_time = Instant::now();
631                    }
632                    left_time += left_start_time.elapsed();
633                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
634                    self.try_flush_data().await?;
635                }
636                AlignedMessage::Right(chunk) => {
637                    let mut right_time = Duration::from_nanos(0);
638                    let mut right_start_time = Instant::now();
639                    #[for_await]
640                    for chunk in Self::eq_join_right(EqJoinArgs {
641                        ctx: &self.ctx,
642                        side_l: &mut self.side_l,
643                        side_r: &mut self.side_r,
644                        actual_output_data_types: &self.actual_output_data_types,
645                        cond: &mut self.cond,
646                        inequality_watermarks: &self.inequality_watermarks,
647                        chunk,
648                        append_only_optimize: self.append_only_optimize,
649                        chunk_size: self.chunk_size,
650                        cnt_rows_received: &mut self.cnt_rows_received,
651                        high_join_amplification_threshold: self.high_join_amplification_threshold,
652                        entry_state_max_rows: self.entry_state_max_rows,
653                    }) {
654                        right_time += right_start_time.elapsed();
655                        yield Message::Chunk(chunk?);
656                        right_start_time = Instant::now();
657                    }
658                    right_time += right_start_time.elapsed();
659                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
660                    self.try_flush_data().await?;
661                }
662                AlignedMessage::Barrier(barrier) => {
663                    let barrier_start_time = Instant::now();
664                    let (left_post_commit, right_post_commit) =
665                        self.flush_data(barrier.epoch).await?;
666
667                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
668
669                    // We don't include the time post yielding barrier because the vnode update
670                    // is a one-off and rare operation.
671                    barrier_join_match_duration_ns
672                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
673                    yield Message::Barrier(barrier);
674
675                    // Update the vnode bitmap for state tables of both sides if asked.
676                    right_post_commit
677                        .post_yield_barrier(update_vnode_bitmap.clone())
678                        .await?;
679                    if left_post_commit
680                        .post_yield_barrier(update_vnode_bitmap)
681                        .await?
682                        .unwrap_or(false)
683                    {
684                        self.watermark_buffers
685                            .values_mut()
686                            .for_each(|buffers| buffers.clear());
687                        self.inequality_watermarks.fill(None);
688                    }
689
690                    // Report metrics of cached join rows/entries
691                    for (join_cached_entry_count, ht) in [
692                        (&left_join_cached_entry_count, &self.side_l.ht),
693                        (&right_join_cached_entry_count, &self.side_r.ht),
694                    ] {
695                        join_cached_entry_count.set(ht.entry_count() as i64);
696                    }
697                }
698            }
699            start_time = Instant::now();
700        }
701    }
702
703    async fn flush_data(
704        &mut self,
705        epoch: EpochPair,
706    ) -> StreamExecutorResult<(
707        JoinHashMapPostCommit<'_, K, S, E>,
708        JoinHashMapPostCommit<'_, K, S, E>,
709    )> {
710        // All changes to the state has been buffered in the mem-table of the state table. Just
711        // `commit` them here.
712        let left = self.side_l.ht.flush(epoch).await?;
713        let right = self.side_r.ht.flush(epoch).await?;
714        Ok((left, right))
715    }
716
717    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
718        // All changes to the state has been buffered in the mem-table of the state table. Just
719        // `commit` them here.
720        self.side_l.ht.try_flush().await?;
721        self.side_r.ht.try_flush().await?;
722        Ok(())
723    }
724
725    // We need to manually evict the cache.
726    fn evict_cache(
727        side_update: &mut JoinSide<K, S, E>,
728        side_match: &mut JoinSide<K, S, E>,
729        cnt_rows_received: &mut u32,
730    ) {
731        *cnt_rows_received += 1;
732        if *cnt_rows_received == EVICT_EVERY_N_ROWS {
733            side_update.ht.evict();
734            side_match.ht.evict();
735            *cnt_rows_received = 0;
736        }
737    }
738
739    async fn handle_watermark(
740        &mut self,
741        side: SideTypePrimitive,
742        watermark: Watermark,
743    ) -> StreamExecutorResult<Vec<Watermark>> {
744        let (side_update, side_match) = if side == SideType::Left {
745            (&mut self.side_l, &mut self.side_r)
746        } else {
747            (&mut self.side_r, &mut self.side_l)
748        };
749
750        // State cleaning
751        if side_update.join_key_indices[0] == watermark.col_idx {
752            side_match.ht.update_watermark(watermark.val.clone());
753        }
754
755        // Select watermarks to yield.
756        let wm_in_jk = side_update
757            .join_key_indices
758            .iter()
759            .positions(|idx| *idx == watermark.col_idx);
760        let mut watermarks_to_emit = vec![];
761        for idx in wm_in_jk {
762            let buffers = self
763                .watermark_buffers
764                .entry(idx)
765                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
766            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
767                let empty_indices = vec![];
768                let output_indices = side_update
769                    .i2o_mapping_indexed
770                    .get_vec(&side_update.join_key_indices[idx])
771                    .unwrap_or(&empty_indices)
772                    .iter()
773                    .chain(
774                        side_match
775                            .i2o_mapping_indexed
776                            .get_vec(&side_match.join_key_indices[idx])
777                            .unwrap_or(&empty_indices),
778                    );
779                for output_idx in output_indices {
780                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
781                }
782            };
783        }
784        for (inequality_index, need_offset) in
785            &side_update.input2inequality_index[watermark.col_idx]
786        {
787            let buffers = self
788                .watermark_buffers
789                .entry(side_update.join_key_indices.len() + inequality_index)
790                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
791            let mut input_watermark = watermark.clone();
792            if *need_offset
793                && let Some(delta_expression) = self.inequality_pairs[*inequality_index].1.as_ref()
794            {
795                // allow since we will handle error manually.
796                #[allow(clippy::disallowed_methods)]
797                let eval_result = delta_expression
798                    .inner()
799                    .eval_row(&OwnedRow::new(vec![Some(input_watermark.val)]))
800                    .await;
801                match eval_result {
802                    Ok(value) => input_watermark.val = value.unwrap(),
803                    Err(err) => {
804                        if !matches!(err, ExprError::NumericOutOfRange) {
805                            self.ctx.on_compute_error(err, &self.info.identity);
806                        }
807                        continue;
808                    }
809                }
810            };
811            if let Some(selected_watermark) = buffers.handle_watermark(side, input_watermark) {
812                for output_idx in &self.inequality_pairs[*inequality_index].0 {
813                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
814                }
815                self.inequality_watermarks[*inequality_index] = Some(selected_watermark);
816            }
817        }
818        Ok(watermarks_to_emit)
819    }
820
821    fn row_concat(
822        row_update: impl Row,
823        update_start_pos: usize,
824        row_matched: impl Row,
825        matched_start_pos: usize,
826    ) -> OwnedRow {
827        let mut new_row = vec![None; row_update.len() + row_matched.len()];
828
829        for (i, datum_ref) in row_update.iter().enumerate() {
830            new_row[i + update_start_pos] = datum_ref.to_owned_datum();
831        }
832        for (i, datum_ref) in row_matched.iter().enumerate() {
833            new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
834        }
835        OwnedRow::new(new_row)
836    }
837
838    /// Used to forward `eq_join_oneside` to show join side in stack.
839    fn eq_join_left(
840        args: EqJoinArgs<'_, K, S, E>,
841    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
842        Self::eq_join_oneside::<{ SideType::Left }>(args)
843    }
844
845    /// Used to forward `eq_join_oneside` to show join side in stack.
846    fn eq_join_right(
847        args: EqJoinArgs<'_, K, S, E>,
848    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
849        Self::eq_join_oneside::<{ SideType::Right }>(args)
850    }
851
852    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
853    async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
854        let EqJoinArgs {
855            ctx,
856            side_l,
857            side_r,
858            actual_output_data_types,
859            cond,
860            inequality_watermarks,
861            chunk,
862            append_only_optimize,
863            chunk_size,
864            cnt_rows_received,
865            high_join_amplification_threshold,
866            entry_state_max_rows,
867            ..
868        } = args;
869
870        let (side_update, side_match) = if SIDE == SideType::Left {
871            (side_l, side_r)
872        } else {
873            (side_r, side_l)
874        };
875
876        let useful_state_clean_columns = side_match
877            .state_clean_columns
878            .iter()
879            .filter_map(|(column_idx, inequality_index)| {
880                inequality_watermarks[*inequality_index]
881                    .as_ref()
882                    .map(|watermark| (*column_idx, watermark))
883            })
884            .collect_vec();
885
886        let mut hashjoin_chunk_builder =
887            JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
888                chunk_size,
889                actual_output_data_types.to_vec(),
890                side_update.i2o_mapping.clone(),
891                side_match.i2o_mapping.clone(),
892            ));
893
894        let join_matched_join_keys = ctx
895            .streaming_metrics
896            .join_matched_join_keys
897            .with_guarded_label_values(&[
898                &ctx.id.to_string(),
899                &ctx.fragment_id.to_string(),
900                &side_update.ht.table_id().to_string(),
901            ]);
902
903        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
904        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
905            let Some((op, row)) = r else {
906                continue;
907            };
908            Self::evict_cache(side_update, side_match, cnt_rows_received);
909
910            let cache_lookup_result = {
911                let probe_non_null_requirement_satisfied = side_update
912                    .non_null_fields
913                    .iter()
914                    .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
915                let build_non_null_requirement_satisfied =
916                    key.null_bitmap().is_subset(side_match.ht.null_matched());
917                if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
918                    side_match.ht.take_state_opt(key)
919                } else {
920                    CacheResult::NeverMatch
921                }
922            };
923            let mut total_matches = 0;
924
925            macro_rules! match_rows {
926                ($op:ident) => {
927                    Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
928                        cache_lookup_result,
929                        row,
930                        key,
931                        &mut hashjoin_chunk_builder,
932                        side_match,
933                        side_update,
934                        &useful_state_clean_columns,
935                        cond,
936                        append_only_optimize,
937                        entry_state_max_rows,
938                    )
939                };
940            }
941
942            match op {
943                Op::Insert | Op::UpdateInsert =>
944                {
945                    #[for_await]
946                    for chunk in match_rows!(Insert) {
947                        let chunk = chunk?;
948                        total_matches += chunk.cardinality();
949                        yield chunk;
950                    }
951                }
952                Op::Delete | Op::UpdateDelete =>
953                {
954                    #[for_await]
955                    for chunk in match_rows!(Delete) {
956                        let chunk = chunk?;
957                        total_matches += chunk.cardinality();
958                        yield chunk;
959                    }
960                }
961            };
962
963            join_matched_join_keys.observe(total_matches as _);
964            if total_matches > high_join_amplification_threshold {
965                let join_key_data_types = side_update.ht.join_key_data_types();
966                let key = key.deserialize(join_key_data_types)?;
967                tracing::warn!(target: "high_join_amplification",
968                    matched_rows_len = total_matches,
969                    update_table_id = side_update.ht.table_id(),
970                    match_table_id = side_match.ht.table_id(),
971                    join_key = ?key,
972                    actor_id = ctx.id,
973                    fragment_id = ctx.fragment_id,
974                    "large rows matched for join key"
975                );
976            }
977        }
978        // NOTE(kwannoel): We don't track metrics for this last chunk.
979        if let Some(chunk) = hashjoin_chunk_builder.take() {
980            yield chunk;
981        }
982    }
983
984    /// For the probe-side row, we need to check if it has values in cache, if not, we need to
985    /// fetch the matched rows from the state table.
986    ///
987    /// Every matched build-side row being processed needs to go through the following phases:
988    /// 1. Handle join condition evaluation.
989    /// 2. Always do cache refill, if the state count is good.
990    /// 3. Handle state cleaning.
991    /// 4. Handle degree table update.
992    #[allow(clippy::too_many_arguments)]
993    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
994    async fn handle_match_rows<
995        'a,
996        const SIDE: SideTypePrimitive,
997        const JOIN_OP: JoinOpPrimitive,
998    >(
999        cached_lookup_result: CacheResult<E>,
1000        row: RowRef<'a>,
1001        key: &'a K,
1002        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1003        side_match: &'a mut JoinSide<K, S, E>,
1004        side_update: &'a mut JoinSide<K, S, E>,
1005        useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1006        cond: &'a mut Option<NonStrictExpression>,
1007        append_only_optimize: bool,
1008        entry_state_max_rows: usize,
1009    ) {
1010        let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1011        let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1012        let mut entry_state_count = 0;
1013
1014        let mut degree = 0;
1015        let mut append_only_matched_row: Option<JoinRow<OwnedRow>> = None;
1016        let mut matched_rows_to_clean = vec![];
1017
1018        macro_rules! match_row {
1019            (
1020                $match_order_key_indices:expr,
1021                $degree_table:expr,
1022                $matched_row:expr,
1023                $matched_row_ref:expr,
1024                $from_cache:literal
1025            ) => {
1026                Self::handle_match_row::<SIDE, { JOIN_OP }, { $from_cache }>(
1027                    row,
1028                    $matched_row,
1029                    $matched_row_ref,
1030                    hashjoin_chunk_builder,
1031                    $match_order_key_indices,
1032                    $degree_table,
1033                    side_update.start_pos,
1034                    side_match.start_pos,
1035                    cond,
1036                    &mut degree,
1037                    useful_state_clean_columns,
1038                    append_only_optimize,
1039                    &mut append_only_matched_row,
1040                    &mut matched_rows_to_clean,
1041                )
1042            };
1043        }
1044
1045        let entry_state = match cached_lookup_result {
1046            CacheResult::NeverMatch => {
1047                let op = match JOIN_OP {
1048                    JoinOp::Insert => Op::Insert,
1049                    JoinOp::Delete => Op::Delete,
1050                };
1051                if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1052                    yield chunk;
1053                }
1054                return Ok(());
1055            }
1056            CacheResult::Hit(mut cached_rows) => {
1057                let (match_order_key_indices, match_degree_state) =
1058                    side_match.ht.get_degree_state_mut_ref();
1059                // Handle cached rows which match the probe-side row.
1060                for (matched_row_ref, matched_row) in
1061                    cached_rows.values_mut(&side_match.all_data_types)
1062                {
1063                    let matched_row = matched_row?;
1064                    if let Some(chunk) = match_row!(
1065                        match_order_key_indices,
1066                        match_degree_state,
1067                        matched_row,
1068                        Some(matched_row_ref),
1069                        true
1070                    )
1071                    .await
1072                    {
1073                        yield chunk;
1074                    }
1075                }
1076
1077                cached_rows
1078            }
1079            CacheResult::Miss => {
1080                // Handle rows which are not in cache.
1081                let (matched_rows, match_order_key_indices, degree_table) = side_match
1082                    .ht
1083                    .fetch_matched_rows_and_get_degree_table_ref(key)
1084                    .await?;
1085
1086                #[for_await]
1087                for matched_row in matched_rows {
1088                    let (encoded_pk, matched_row) = matched_row?;
1089
1090                    let mut matched_row_ref = None;
1091
1092                    // cache refill
1093                    if entry_state_count <= entry_state_max_rows {
1094                        let row_ref = entry_state
1095                            .insert(encoded_pk, E::encode(&matched_row), None) // TODO(kwannoel): handle ineq key for asof join.
1096                            .with_context(|| format!("row: {}", row.display(),))?;
1097                        matched_row_ref = Some(row_ref);
1098                        entry_state_count += 1;
1099                    }
1100                    if let Some(chunk) = match_row!(
1101                        match_order_key_indices,
1102                        degree_table,
1103                        matched_row,
1104                        matched_row_ref,
1105                        false
1106                    )
1107                    .await
1108                    {
1109                        yield chunk;
1110                    }
1111                }
1112                Box::new(entry_state)
1113            }
1114        };
1115
1116        // forward rows depending on join types
1117        let op = match JOIN_OP {
1118            JoinOp::Insert => Op::Insert,
1119            JoinOp::Delete => Op::Delete,
1120        };
1121        if degree == 0 {
1122            if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1123                yield chunk;
1124            }
1125        } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1126        {
1127            yield chunk;
1128        }
1129
1130        // cache refill
1131        if cache_hit || entry_state_count <= entry_state_max_rows {
1132            side_match.ht.update_state(key, entry_state);
1133        }
1134
1135        // watermark state cleaning
1136        for matched_row in matched_rows_to_clean {
1137            side_match.ht.delete_handle_degree(key, matched_row)?;
1138        }
1139
1140        // apply append_only optimization to clean matched_rows which have been persisted
1141        if append_only_optimize && let Some(row) = append_only_matched_row {
1142            assert_matches!(JOIN_OP, JoinOp::Insert);
1143            side_match.ht.delete_handle_degree(key, row)?;
1144            return Ok(());
1145        }
1146
1147        // no append_only optimization, update state table(s).
1148        match JOIN_OP {
1149            JoinOp::Insert => {
1150                side_update
1151                    .ht
1152                    .insert_handle_degree(key, JoinRow::new(row, degree))?;
1153            }
1154            JoinOp::Delete => {
1155                side_update
1156                    .ht
1157                    .delete_handle_degree(key, JoinRow::new(row, degree))?;
1158            }
1159        }
1160    }
1161
1162    #[allow(clippy::too_many_arguments)]
1163    #[inline]
1164    async fn handle_match_row<
1165        'a,
1166        const SIDE: SideTypePrimitive,
1167        const JOIN_OP: JoinOpPrimitive,
1168        const MATCHED_ROWS_FROM_CACHE: bool,
1169    >(
1170        update_row: RowRef<'a>,
1171        mut matched_row: JoinRow<OwnedRow>,
1172        mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1173        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1174        match_order_key_indices: &[usize],
1175        match_degree_table: &mut Option<TableInner<S>>,
1176        side_update_start_pos: usize,
1177        side_match_start_pos: usize,
1178        cond: &Option<NonStrictExpression>,
1179        update_row_degree: &mut u64,
1180        useful_state_clean_columns: &[(usize, &'a Watermark)],
1181        append_only_optimize: bool,
1182        append_only_matched_row: &mut Option<JoinRow<OwnedRow>>,
1183        matched_rows_to_clean: &mut Vec<JoinRow<OwnedRow>>,
1184    ) -> Option<StreamChunk> {
1185        let mut need_state_clean = false;
1186        let mut chunk_opt = None;
1187
1188        // TODO(kwannoel): Instead of evaluating this every loop,
1189        // we can call this only if there's a non-equi expression.
1190        // check join cond
1191        let join_condition_satisfied = Self::check_join_condition(
1192            update_row,
1193            side_update_start_pos,
1194            &matched_row.row,
1195            side_match_start_pos,
1196            cond,
1197        )
1198        .await;
1199
1200        if join_condition_satisfied {
1201            // update degree
1202            *update_row_degree += 1;
1203            // send matched row downstream
1204
1205            // Inserts must happen before degrees are updated,
1206            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1207            if matches!(JOIN_OP, JoinOp::Insert)
1208                && !forward_exactly_once(T, SIDE)
1209                && let Some(chunk) =
1210                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1211            {
1212                chunk_opt = Some(chunk);
1213            }
1214            // TODO(kwannoel): We can actually statically decide this, using join side + join type.
1215            if let Some(degree_table) = match_degree_table {
1216                update_degree::<S, { JOIN_OP }>(
1217                    match_order_key_indices,
1218                    degree_table,
1219                    &mut matched_row,
1220                );
1221                if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1222                    // update matched row in cache
1223                    match JOIN_OP {
1224                        JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1225                        JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1226                    }
1227                }
1228            }
1229
1230            // Deletes must happen after degree table is updated.
1231            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1232            if matches!(JOIN_OP, JoinOp::Delete)
1233                && !forward_exactly_once(T, SIDE)
1234                && let Some(chunk) =
1235                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1236            {
1237                chunk_opt = Some(chunk);
1238            }
1239        } else {
1240            // check if need state cleaning
1241            for (column_idx, watermark) in useful_state_clean_columns {
1242                if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1243                    scalar
1244                        .default_cmp(&watermark.val.as_scalar_ref_impl())
1245                        .is_lt()
1246                }) {
1247                    need_state_clean = true;
1248                    break;
1249                }
1250            }
1251        }
1252        // If the stream is append-only and the join key covers pk in both side,
1253        // then we can remove matched rows since pk is unique and will not be
1254        // inserted again
1255        if append_only_optimize {
1256            assert_matches!(JOIN_OP, JoinOp::Insert);
1257            // Since join key contains pk and pk is unique, there should be only
1258            // one row if matched.
1259            assert!(append_only_matched_row.is_none());
1260            *append_only_matched_row = Some(matched_row);
1261        } else if need_state_clean {
1262            // `append_only_optimize` and `need_state_clean` won't both be true.
1263            // 'else' here is only to suppress compiler error, otherwise
1264            // `matched_row` will be moved twice.
1265            matched_rows_to_clean.push(matched_row);
1266        }
1267
1268        chunk_opt
1269    }
1270
1271    // TODO(yuhao-su): We should find a better way to eval the expression
1272    // without concat two rows.
1273    // if there are non-equi expressions
1274    // NOTE(kwannoel): We can probably let `eval` use `impl Row` instead of `OwnedRow`.
1275    #[inline]
1276    async fn check_join_condition(
1277        row: impl Row,
1278        side_update_start_pos: usize,
1279        matched_row: impl Row,
1280        side_match_start_pos: usize,
1281        join_condition: &Option<NonStrictExpression>,
1282    ) -> bool {
1283        if let Some(join_condition) = join_condition {
1284            let new_row = Self::row_concat(
1285                row,
1286                side_update_start_pos,
1287                matched_row,
1288                side_match_start_pos,
1289            );
1290            join_condition
1291                .eval_row_infallible(&new_row)
1292                .await
1293                .map(|s| *s.as_bool())
1294                .unwrap_or(false)
1295        } else {
1296            true
1297        }
1298    }
1299}
1300
1301#[cfg(test)]
1302mod tests {
1303    use std::sync::atomic::AtomicU64;
1304
1305    use pretty_assertions::assert_eq;
1306    use risingwave_common::array::*;
1307    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1308    use risingwave_common::hash::{Key64, Key128};
1309    use risingwave_common::util::epoch::test_epoch;
1310    use risingwave_common::util::sort_util::OrderType;
1311    use risingwave_storage::memory::MemoryStateStore;
1312
1313    use super::*;
1314    use crate::common::table::test_utils::gen_pbtable;
1315    use crate::executor::MemoryEncoding;
1316    use crate::executor::test_utils::expr::build_from_pretty;
1317    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1318
1319    async fn create_in_memory_state_table(
1320        mem_state: MemoryStateStore,
1321        data_types: &[DataType],
1322        order_types: &[OrderType],
1323        pk_indices: &[usize],
1324        table_id: u32,
1325    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1326        let column_descs = data_types
1327            .iter()
1328            .enumerate()
1329            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1330            .collect_vec();
1331        let state_table = StateTable::from_table_catalog(
1332            &gen_pbtable(
1333                TableId::new(table_id),
1334                column_descs,
1335                order_types.to_vec(),
1336                pk_indices.to_vec(),
1337                0,
1338            ),
1339            mem_state.clone(),
1340            None,
1341        )
1342        .await;
1343
1344        // Create degree table
1345        let mut degree_table_column_descs = vec![];
1346        pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1347            degree_table_column_descs.push(ColumnDesc::unnamed(
1348                ColumnId::new(pk_id as i32),
1349                data_types[*idx].clone(),
1350            ))
1351        });
1352        degree_table_column_descs.push(ColumnDesc::unnamed(
1353            ColumnId::new(pk_indices.len() as i32),
1354            DataType::Int64,
1355        ));
1356        let degree_state_table = StateTable::from_table_catalog(
1357            &gen_pbtable(
1358                TableId::new(table_id + 1),
1359                degree_table_column_descs,
1360                order_types.to_vec(),
1361                pk_indices.to_vec(),
1362                0,
1363            ),
1364            mem_state,
1365            None,
1366        )
1367        .await;
1368        (state_table, degree_state_table)
1369    }
1370
1371    fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1372        build_from_pretty(
1373            condition_text
1374                .as_deref()
1375                .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1376        )
1377    }
1378
1379    async fn create_executor<const T: JoinTypePrimitive>(
1380        with_condition: bool,
1381        null_safe: bool,
1382        condition_text: Option<String>,
1383        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
1384    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1385        let schema = Schema {
1386            fields: vec![
1387                Field::unnamed(DataType::Int64), // join key
1388                Field::unnamed(DataType::Int64),
1389            ],
1390        };
1391        let (tx_l, source_l) = MockSource::channel();
1392        let source_l = source_l.into_executor(schema.clone(), vec![1]);
1393        let (tx_r, source_r) = MockSource::channel();
1394        let source_r = source_r.into_executor(schema, vec![1]);
1395        let params_l = JoinParams::new(vec![0], vec![1]);
1396        let params_r = JoinParams::new(vec![0], vec![1]);
1397        let cond = with_condition.then(|| create_cond(condition_text));
1398
1399        let mem_state = MemoryStateStore::new();
1400
1401        let (state_l, degree_state_l) = create_in_memory_state_table(
1402            mem_state.clone(),
1403            &[DataType::Int64, DataType::Int64],
1404            &[OrderType::ascending(), OrderType::ascending()],
1405            &[0, 1],
1406            0,
1407        )
1408        .await;
1409
1410        let (state_r, degree_state_r) = create_in_memory_state_table(
1411            mem_state,
1412            &[DataType::Int64, DataType::Int64],
1413            &[OrderType::ascending(), OrderType::ascending()],
1414            &[0, 1],
1415            2,
1416        )
1417        .await;
1418
1419        let schema = match T {
1420            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1421            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1422            _ => [source_l.schema().fields(), source_r.schema().fields()]
1423                .concat()
1424                .into_iter()
1425                .collect(),
1426        };
1427        let schema_len = schema.len();
1428        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1429
1430        let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1431            ActorContext::for_test(123),
1432            info,
1433            source_l,
1434            source_r,
1435            params_l,
1436            params_r,
1437            vec![null_safe],
1438            (0..schema_len).collect_vec(),
1439            cond,
1440            inequality_pairs,
1441            state_l,
1442            degree_state_l,
1443            state_r,
1444            degree_state_r,
1445            Arc::new(AtomicU64::new(0)),
1446            false,
1447            Arc::new(StreamingMetrics::unused()),
1448            1024,
1449            2048,
1450        );
1451        (tx_l, tx_r, executor.boxed().execute())
1452    }
1453
1454    async fn create_classical_executor<const T: JoinTypePrimitive>(
1455        with_condition: bool,
1456        null_safe: bool,
1457        condition_text: Option<String>,
1458    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1459        create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1460    }
1461
1462    async fn create_append_only_executor<const T: JoinTypePrimitive>(
1463        with_condition: bool,
1464    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1465        let schema = Schema {
1466            fields: vec![
1467                Field::unnamed(DataType::Int64),
1468                Field::unnamed(DataType::Int64),
1469                Field::unnamed(DataType::Int64),
1470            ],
1471        };
1472        let (tx_l, source_l) = MockSource::channel();
1473        let source_l = source_l.into_executor(schema.clone(), vec![0]);
1474        let (tx_r, source_r) = MockSource::channel();
1475        let source_r = source_r.into_executor(schema, vec![0]);
1476        let params_l = JoinParams::new(vec![0, 1], vec![]);
1477        let params_r = JoinParams::new(vec![0, 1], vec![]);
1478        let cond = with_condition.then(|| create_cond(None));
1479
1480        let mem_state = MemoryStateStore::new();
1481
1482        let (state_l, degree_state_l) = create_in_memory_state_table(
1483            mem_state.clone(),
1484            &[DataType::Int64, DataType::Int64, DataType::Int64],
1485            &[
1486                OrderType::ascending(),
1487                OrderType::ascending(),
1488                OrderType::ascending(),
1489            ],
1490            &[0, 1, 0],
1491            0,
1492        )
1493        .await;
1494
1495        let (state_r, degree_state_r) = create_in_memory_state_table(
1496            mem_state,
1497            &[DataType::Int64, DataType::Int64, DataType::Int64],
1498            &[
1499                OrderType::ascending(),
1500                OrderType::ascending(),
1501                OrderType::ascending(),
1502            ],
1503            &[0, 1, 1],
1504            1,
1505        )
1506        .await;
1507
1508        let schema = match T {
1509            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1510            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1511            _ => [source_l.schema().fields(), source_r.schema().fields()]
1512                .concat()
1513                .into_iter()
1514                .collect(),
1515        };
1516        let schema_len = schema.len();
1517        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1518
1519        let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1520            ActorContext::for_test(123),
1521            info,
1522            source_l,
1523            source_r,
1524            params_l,
1525            params_r,
1526            vec![false],
1527            (0..schema_len).collect_vec(),
1528            cond,
1529            vec![],
1530            state_l,
1531            degree_state_l,
1532            state_r,
1533            degree_state_r,
1534            Arc::new(AtomicU64::new(0)),
1535            true,
1536            Arc::new(StreamingMetrics::unused()),
1537            1024,
1538            2048,
1539        );
1540        (tx_l, tx_r, executor.boxed().execute())
1541    }
1542
1543    #[tokio::test]
1544    async fn test_interval_join() -> StreamExecutorResult<()> {
1545        let chunk_l1 = StreamChunk::from_pretty(
1546            "  I I
1547             + 1 4
1548             + 2 3
1549             + 2 5
1550             + 3 6",
1551        );
1552        let chunk_l2 = StreamChunk::from_pretty(
1553            "  I I
1554             + 3 8
1555             - 3 8",
1556        );
1557        let chunk_r1 = StreamChunk::from_pretty(
1558            "  I I
1559             + 2 6
1560             + 4 8
1561             + 6 9",
1562        );
1563        let chunk_r2 = StreamChunk::from_pretty(
1564            "  I  I
1565             + 2 3
1566             + 6 11",
1567        );
1568        let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1569            true,
1570            false,
1571            Some(String::from("(and:boolean (greater_than:boolean $1:int8 (subtract:int8 $3:int8 2:int8)) (greater_than:boolean $3:int8 (subtract:int8 $1:int8 2:int8)))")),
1572            vec![(1, 3, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)"))), (3, 1, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)")))],
1573        )
1574        .await;
1575
1576        // push the init barrier for left and right
1577        tx_l.push_barrier(test_epoch(1), false);
1578        tx_r.push_barrier(test_epoch(1), false);
1579        hash_join.next_unwrap_ready_barrier()?;
1580
1581        // push the 1st left chunk
1582        tx_l.push_chunk(chunk_l1);
1583        hash_join.next_unwrap_pending();
1584
1585        // push the init barrier for left and right
1586        tx_l.push_barrier(test_epoch(2), false);
1587        tx_r.push_barrier(test_epoch(2), false);
1588        hash_join.next_unwrap_ready_barrier()?;
1589
1590        // push the 2nd left chunk
1591        tx_l.push_chunk(chunk_l2);
1592        hash_join.next_unwrap_pending();
1593
1594        tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1595        hash_join.next_unwrap_pending();
1596
1597        tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1598        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1599        assert_eq!(
1600            output_watermark,
1601            Watermark::new(1, DataType::Int64, ScalarImpl::Int64(4))
1602        );
1603        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1604        assert_eq!(
1605            output_watermark,
1606            Watermark::new(3, DataType::Int64, ScalarImpl::Int64(6))
1607        );
1608
1609        // push the 1st right chunk
1610        tx_r.push_chunk(chunk_r1);
1611        let chunk = hash_join.next_unwrap_ready_chunk()?;
1612        // data "2 3" should have been cleaned
1613        assert_eq!(
1614            chunk,
1615            StreamChunk::from_pretty(
1616                " I I I I
1617                + 2 5 2 6"
1618            )
1619        );
1620
1621        // push the 2nd right chunk
1622        tx_r.push_chunk(chunk_r2);
1623        // pending means that state clean is successful, or the executor will yield a chunk "+ 2 3
1624        // 2 3" here.
1625        hash_join.next_unwrap_pending();
1626
1627        Ok(())
1628    }
1629
1630    #[tokio::test]
1631    async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1632        let chunk_l1 = StreamChunk::from_pretty(
1633            "  I I
1634             + 1 4
1635             + 2 5
1636             + 3 6",
1637        );
1638        let chunk_l2 = StreamChunk::from_pretty(
1639            "  I I
1640             + 3 8
1641             - 3 8",
1642        );
1643        let chunk_r1 = StreamChunk::from_pretty(
1644            "  I I
1645             + 2 7
1646             + 4 8
1647             + 6 9",
1648        );
1649        let chunk_r2 = StreamChunk::from_pretty(
1650            "  I  I
1651             + 3 10
1652             + 6 11",
1653        );
1654        let (mut tx_l, mut tx_r, mut hash_join) =
1655            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1656
1657        // push the init barrier for left and right
1658        tx_l.push_barrier(test_epoch(1), false);
1659        tx_r.push_barrier(test_epoch(1), false);
1660        hash_join.next_unwrap_ready_barrier()?;
1661
1662        // push the 1st left chunk
1663        tx_l.push_chunk(chunk_l1);
1664        hash_join.next_unwrap_pending();
1665
1666        // push the init barrier for left and right
1667        tx_l.push_barrier(test_epoch(2), false);
1668        tx_r.push_barrier(test_epoch(2), false);
1669        hash_join.next_unwrap_ready_barrier()?;
1670
1671        // push the 2nd left chunk
1672        tx_l.push_chunk(chunk_l2);
1673        hash_join.next_unwrap_pending();
1674
1675        // push the 1st right chunk
1676        tx_r.push_chunk(chunk_r1);
1677        let chunk = hash_join.next_unwrap_ready_chunk()?;
1678        assert_eq!(
1679            chunk,
1680            StreamChunk::from_pretty(
1681                " I I I I
1682                + 2 5 2 7"
1683            )
1684        );
1685
1686        // push the 2nd right chunk
1687        tx_r.push_chunk(chunk_r2);
1688        let chunk = hash_join.next_unwrap_ready_chunk()?;
1689        assert_eq!(
1690            chunk,
1691            StreamChunk::from_pretty(
1692                " I I I I
1693                + 3 6 3 10"
1694            )
1695        );
1696
1697        Ok(())
1698    }
1699
1700    #[tokio::test]
1701    async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1702        let chunk_l1 = StreamChunk::from_pretty(
1703            "  I I
1704             + 1 4
1705             + 2 5
1706             + . 6",
1707        );
1708        let chunk_l2 = StreamChunk::from_pretty(
1709            "  I I
1710             + . 8
1711             - . 8",
1712        );
1713        let chunk_r1 = StreamChunk::from_pretty(
1714            "  I I
1715             + 2 7
1716             + 4 8
1717             + 6 9",
1718        );
1719        let chunk_r2 = StreamChunk::from_pretty(
1720            "  I  I
1721             + . 10
1722             + 6 11",
1723        );
1724        let (mut tx_l, mut tx_r, mut hash_join) =
1725            create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1726
1727        // push the init barrier for left and right
1728        tx_l.push_barrier(test_epoch(1), false);
1729        tx_r.push_barrier(test_epoch(1), false);
1730        hash_join.next_unwrap_ready_barrier()?;
1731
1732        // push the 1st left chunk
1733        tx_l.push_chunk(chunk_l1);
1734        hash_join.next_unwrap_pending();
1735
1736        // push the init barrier for left and right
1737        tx_l.push_barrier(test_epoch(2), false);
1738        tx_r.push_barrier(test_epoch(2), false);
1739        hash_join.next_unwrap_ready_barrier()?;
1740
1741        // push the 2nd left chunk
1742        tx_l.push_chunk(chunk_l2);
1743        hash_join.next_unwrap_pending();
1744
1745        // push the 1st right chunk
1746        tx_r.push_chunk(chunk_r1);
1747        let chunk = hash_join.next_unwrap_ready_chunk()?;
1748        assert_eq!(
1749            chunk,
1750            StreamChunk::from_pretty(
1751                " I I I I
1752                + 2 5 2 7"
1753            )
1754        );
1755
1756        // push the 2nd right chunk
1757        tx_r.push_chunk(chunk_r2);
1758        let chunk = hash_join.next_unwrap_ready_chunk()?;
1759        assert_eq!(
1760            chunk,
1761            StreamChunk::from_pretty(
1762                " I I I I
1763                + . 6 . 10"
1764            )
1765        );
1766
1767        Ok(())
1768    }
1769
1770    #[tokio::test]
1771    async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1772        let chunk_l1 = StreamChunk::from_pretty(
1773            "  I I
1774             + 1 4
1775             + 2 5
1776             + 3 6",
1777        );
1778        let chunk_l2 = StreamChunk::from_pretty(
1779            "  I I
1780             + 3 8
1781             - 3 8",
1782        );
1783        let chunk_r1 = StreamChunk::from_pretty(
1784            "  I I
1785             + 2 7
1786             + 4 8
1787             + 6 9",
1788        );
1789        let chunk_r2 = StreamChunk::from_pretty(
1790            "  I  I
1791             + 3 10
1792             + 6 11",
1793        );
1794        let chunk_l3 = StreamChunk::from_pretty(
1795            "  I I
1796             + 6 10",
1797        );
1798        let chunk_r3 = StreamChunk::from_pretty(
1799            "  I  I
1800             - 6 11",
1801        );
1802        let chunk_r4 = StreamChunk::from_pretty(
1803            "  I  I
1804             - 6 9",
1805        );
1806        let (mut tx_l, mut tx_r, mut hash_join) =
1807            create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1808
1809        // push the init barrier for left and right
1810        tx_l.push_barrier(test_epoch(1), false);
1811        tx_r.push_barrier(test_epoch(1), false);
1812        hash_join.next_unwrap_ready_barrier()?;
1813
1814        // push the 1st left chunk
1815        tx_l.push_chunk(chunk_l1);
1816        hash_join.next_unwrap_pending();
1817
1818        // push the init barrier for left and right
1819        tx_l.push_barrier(test_epoch(2), false);
1820        tx_r.push_barrier(test_epoch(2), false);
1821        hash_join.next_unwrap_ready_barrier()?;
1822
1823        // push the 2nd left chunk
1824        tx_l.push_chunk(chunk_l2);
1825        hash_join.next_unwrap_pending();
1826
1827        // push the 1st right chunk
1828        tx_r.push_chunk(chunk_r1);
1829        let chunk = hash_join.next_unwrap_ready_chunk()?;
1830        assert_eq!(
1831            chunk,
1832            StreamChunk::from_pretty(
1833                " I I
1834                + 2 5"
1835            )
1836        );
1837
1838        // push the 2nd right chunk
1839        tx_r.push_chunk(chunk_r2);
1840        let chunk = hash_join.next_unwrap_ready_chunk()?;
1841        assert_eq!(
1842            chunk,
1843            StreamChunk::from_pretty(
1844                " I I
1845                + 3 6"
1846            )
1847        );
1848
1849        // push the 3rd left chunk (tests forward_exactly_once)
1850        tx_l.push_chunk(chunk_l3);
1851        let chunk = hash_join.next_unwrap_ready_chunk()?;
1852        assert_eq!(
1853            chunk,
1854            StreamChunk::from_pretty(
1855                " I I
1856                + 6 10"
1857            )
1858        );
1859
1860        // push the 3rd right chunk
1861        // (tests that no change if there are still matches)
1862        tx_r.push_chunk(chunk_r3);
1863        hash_join.next_unwrap_pending();
1864
1865        // push the 3rd left chunk
1866        // (tests that deletion occurs when there are no more matches)
1867        tx_r.push_chunk(chunk_r4);
1868        let chunk = hash_join.next_unwrap_ready_chunk()?;
1869        assert_eq!(
1870            chunk,
1871            StreamChunk::from_pretty(
1872                " I I
1873                - 6 10"
1874            )
1875        );
1876
1877        Ok(())
1878    }
1879
1880    #[tokio::test]
1881    async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1882        let chunk_l1 = StreamChunk::from_pretty(
1883            "  I I
1884             + 1 4
1885             + 2 5
1886             + . 6",
1887        );
1888        let chunk_l2 = StreamChunk::from_pretty(
1889            "  I I
1890             + . 8
1891             - . 8",
1892        );
1893        let chunk_r1 = StreamChunk::from_pretty(
1894            "  I I
1895             + 2 7
1896             + 4 8
1897             + 6 9",
1898        );
1899        let chunk_r2 = StreamChunk::from_pretty(
1900            "  I  I
1901             + . 10
1902             + 6 11",
1903        );
1904        let chunk_l3 = StreamChunk::from_pretty(
1905            "  I I
1906             + 6 10",
1907        );
1908        let chunk_r3 = StreamChunk::from_pretty(
1909            "  I  I
1910             - 6 11",
1911        );
1912        let chunk_r4 = StreamChunk::from_pretty(
1913            "  I  I
1914             - 6 9",
1915        );
1916        let (mut tx_l, mut tx_r, mut hash_join) =
1917            create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1918
1919        // push the init barrier for left and right
1920        tx_l.push_barrier(test_epoch(1), false);
1921        tx_r.push_barrier(test_epoch(1), false);
1922        hash_join.next_unwrap_ready_barrier()?;
1923
1924        // push the 1st left chunk
1925        tx_l.push_chunk(chunk_l1);
1926        hash_join.next_unwrap_pending();
1927
1928        // push the init barrier for left and right
1929        tx_l.push_barrier(test_epoch(2), false);
1930        tx_r.push_barrier(test_epoch(2), false);
1931        hash_join.next_unwrap_ready_barrier()?;
1932
1933        // push the 2nd left chunk
1934        tx_l.push_chunk(chunk_l2);
1935        hash_join.next_unwrap_pending();
1936
1937        // push the 1st right chunk
1938        tx_r.push_chunk(chunk_r1);
1939        let chunk = hash_join.next_unwrap_ready_chunk()?;
1940        assert_eq!(
1941            chunk,
1942            StreamChunk::from_pretty(
1943                " I I
1944                + 2 5"
1945            )
1946        );
1947
1948        // push the 2nd right chunk
1949        tx_r.push_chunk(chunk_r2);
1950        let chunk = hash_join.next_unwrap_ready_chunk()?;
1951        assert_eq!(
1952            chunk,
1953            StreamChunk::from_pretty(
1954                " I I
1955                + . 6"
1956            )
1957        );
1958
1959        // push the 3rd left chunk (tests forward_exactly_once)
1960        tx_l.push_chunk(chunk_l3);
1961        let chunk = hash_join.next_unwrap_ready_chunk()?;
1962        assert_eq!(
1963            chunk,
1964            StreamChunk::from_pretty(
1965                " I I
1966                + 6 10"
1967            )
1968        );
1969
1970        // push the 3rd right chunk
1971        // (tests that no change if there are still matches)
1972        tx_r.push_chunk(chunk_r3);
1973        hash_join.next_unwrap_pending();
1974
1975        // push the 3rd left chunk
1976        // (tests that deletion occurs when there are no more matches)
1977        tx_r.push_chunk(chunk_r4);
1978        let chunk = hash_join.next_unwrap_ready_chunk()?;
1979        assert_eq!(
1980            chunk,
1981            StreamChunk::from_pretty(
1982                " I I
1983                - 6 10"
1984            )
1985        );
1986
1987        Ok(())
1988    }
1989
1990    #[tokio::test]
1991    async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
1992        let chunk_l1 = StreamChunk::from_pretty(
1993            "  I I I
1994             + 1 4 1
1995             + 2 5 2
1996             + 3 6 3",
1997        );
1998        let chunk_l2 = StreamChunk::from_pretty(
1999            "  I I I
2000             + 4 9 4
2001             + 5 10 5",
2002        );
2003        let chunk_r1 = StreamChunk::from_pretty(
2004            "  I I I
2005             + 2 5 1
2006             + 4 9 2
2007             + 6 9 3",
2008        );
2009        let chunk_r2 = StreamChunk::from_pretty(
2010            "  I I I
2011             + 1 4 4
2012             + 3 6 5",
2013        );
2014
2015        let (mut tx_l, mut tx_r, mut hash_join) =
2016            create_append_only_executor::<{ JoinType::Inner }>(false).await;
2017
2018        // push the init barrier for left and right
2019        tx_l.push_barrier(test_epoch(1), false);
2020        tx_r.push_barrier(test_epoch(1), false);
2021        hash_join.next_unwrap_ready_barrier()?;
2022
2023        // push the 1st left chunk
2024        tx_l.push_chunk(chunk_l1);
2025        hash_join.next_unwrap_pending();
2026
2027        // push the init barrier for left and right
2028        tx_l.push_barrier(test_epoch(2), false);
2029        tx_r.push_barrier(test_epoch(2), false);
2030        hash_join.next_unwrap_ready_barrier()?;
2031
2032        // push the 2nd left chunk
2033        tx_l.push_chunk(chunk_l2);
2034        hash_join.next_unwrap_pending();
2035
2036        // push the 1st right chunk
2037        tx_r.push_chunk(chunk_r1);
2038        let chunk = hash_join.next_unwrap_ready_chunk()?;
2039        assert_eq!(
2040            chunk,
2041            StreamChunk::from_pretty(
2042                " I I I I I I
2043                + 2 5 2 2 5 1
2044                + 4 9 4 4 9 2"
2045            )
2046        );
2047
2048        // push the 2nd right chunk
2049        tx_r.push_chunk(chunk_r2);
2050        let chunk = hash_join.next_unwrap_ready_chunk()?;
2051        assert_eq!(
2052            chunk,
2053            StreamChunk::from_pretty(
2054                " I I I I I I
2055                + 1 4 1 1 4 4
2056                + 3 6 3 3 6 5"
2057            )
2058        );
2059
2060        Ok(())
2061    }
2062
2063    #[tokio::test]
2064    async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2065        let chunk_l1 = StreamChunk::from_pretty(
2066            "  I I I
2067             + 1 4 1
2068             + 2 5 2
2069             + 3 6 3",
2070        );
2071        let chunk_l2 = StreamChunk::from_pretty(
2072            "  I I I
2073             + 4 9 4
2074             + 5 10 5",
2075        );
2076        let chunk_r1 = StreamChunk::from_pretty(
2077            "  I I I
2078             + 2 5 1
2079             + 4 9 2
2080             + 6 9 3",
2081        );
2082        let chunk_r2 = StreamChunk::from_pretty(
2083            "  I I I
2084             + 1 4 4
2085             + 3 6 5",
2086        );
2087
2088        let (mut tx_l, mut tx_r, mut hash_join) =
2089            create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2090
2091        // push the init barrier for left and right
2092        tx_l.push_barrier(test_epoch(1), false);
2093        tx_r.push_barrier(test_epoch(1), false);
2094        hash_join.next_unwrap_ready_barrier()?;
2095
2096        // push the 1st left chunk
2097        tx_l.push_chunk(chunk_l1);
2098        hash_join.next_unwrap_pending();
2099
2100        // push the init barrier for left and right
2101        tx_l.push_barrier(test_epoch(2), false);
2102        tx_r.push_barrier(test_epoch(2), false);
2103        hash_join.next_unwrap_ready_barrier()?;
2104
2105        // push the 2nd left chunk
2106        tx_l.push_chunk(chunk_l2);
2107        hash_join.next_unwrap_pending();
2108
2109        // push the 1st right chunk
2110        tx_r.push_chunk(chunk_r1);
2111        let chunk = hash_join.next_unwrap_ready_chunk()?;
2112        assert_eq!(
2113            chunk,
2114            StreamChunk::from_pretty(
2115                " I I I
2116                + 2 5 2
2117                + 4 9 4"
2118            )
2119        );
2120
2121        // push the 2nd right chunk
2122        tx_r.push_chunk(chunk_r2);
2123        let chunk = hash_join.next_unwrap_ready_chunk()?;
2124        assert_eq!(
2125            chunk,
2126            StreamChunk::from_pretty(
2127                " I I I
2128                + 1 4 1
2129                + 3 6 3"
2130            )
2131        );
2132
2133        Ok(())
2134    }
2135
2136    #[tokio::test]
2137    async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2138        let chunk_l1 = StreamChunk::from_pretty(
2139            "  I I I
2140             + 1 4 1
2141             + 2 5 2
2142             + 3 6 3",
2143        );
2144        let chunk_l2 = StreamChunk::from_pretty(
2145            "  I I I
2146             + 4 9 4
2147             + 5 10 5",
2148        );
2149        let chunk_r1 = StreamChunk::from_pretty(
2150            "  I I I
2151             + 2 5 1
2152             + 4 9 2
2153             + 6 9 3",
2154        );
2155        let chunk_r2 = StreamChunk::from_pretty(
2156            "  I I I
2157             + 1 4 4
2158             + 3 6 5",
2159        );
2160
2161        let (mut tx_l, mut tx_r, mut hash_join) =
2162            create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2163
2164        // push the init barrier for left and right
2165        tx_l.push_barrier(test_epoch(1), false);
2166        tx_r.push_barrier(test_epoch(1), false);
2167        hash_join.next_unwrap_ready_barrier()?;
2168
2169        // push the 1st left chunk
2170        tx_l.push_chunk(chunk_l1);
2171        hash_join.next_unwrap_pending();
2172
2173        // push the init barrier for left and right
2174        tx_l.push_barrier(test_epoch(2), false);
2175        tx_r.push_barrier(test_epoch(2), false);
2176        hash_join.next_unwrap_ready_barrier()?;
2177
2178        // push the 2nd left chunk
2179        tx_l.push_chunk(chunk_l2);
2180        hash_join.next_unwrap_pending();
2181
2182        // push the 1st right chunk
2183        tx_r.push_chunk(chunk_r1);
2184        let chunk = hash_join.next_unwrap_ready_chunk()?;
2185        assert_eq!(
2186            chunk,
2187            StreamChunk::from_pretty(
2188                " I I I
2189                + 2 5 1
2190                + 4 9 2"
2191            )
2192        );
2193
2194        // push the 2nd right chunk
2195        tx_r.push_chunk(chunk_r2);
2196        let chunk = hash_join.next_unwrap_ready_chunk()?;
2197        assert_eq!(
2198            chunk,
2199            StreamChunk::from_pretty(
2200                " I I I
2201                + 1 4 4
2202                + 3 6 5"
2203            )
2204        );
2205
2206        Ok(())
2207    }
2208
2209    #[tokio::test]
2210    async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2211        let chunk_r1 = StreamChunk::from_pretty(
2212            "  I I
2213             + 1 4
2214             + 2 5
2215             + 3 6",
2216        );
2217        let chunk_r2 = StreamChunk::from_pretty(
2218            "  I I
2219             + 3 8
2220             - 3 8",
2221        );
2222        let chunk_l1 = StreamChunk::from_pretty(
2223            "  I I
2224             + 2 7
2225             + 4 8
2226             + 6 9",
2227        );
2228        let chunk_l2 = StreamChunk::from_pretty(
2229            "  I  I
2230             + 3 10
2231             + 6 11",
2232        );
2233        let chunk_r3 = StreamChunk::from_pretty(
2234            "  I I
2235             + 6 10",
2236        );
2237        let chunk_l3 = StreamChunk::from_pretty(
2238            "  I  I
2239             - 6 11",
2240        );
2241        let chunk_l4 = StreamChunk::from_pretty(
2242            "  I  I
2243             - 6 9",
2244        );
2245        let (mut tx_l, mut tx_r, mut hash_join) =
2246            create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2247
2248        // push the init barrier for left and right
2249        tx_l.push_barrier(test_epoch(1), false);
2250        tx_r.push_barrier(test_epoch(1), false);
2251        hash_join.next_unwrap_ready_barrier()?;
2252
2253        // push the 1st right chunk
2254        tx_r.push_chunk(chunk_r1);
2255        hash_join.next_unwrap_pending();
2256
2257        // push the init barrier for left and right
2258        tx_l.push_barrier(test_epoch(2), false);
2259        tx_r.push_barrier(test_epoch(2), false);
2260        hash_join.next_unwrap_ready_barrier()?;
2261
2262        // push the 2nd right chunk
2263        tx_r.push_chunk(chunk_r2);
2264        hash_join.next_unwrap_pending();
2265
2266        // push the 1st left chunk
2267        tx_l.push_chunk(chunk_l1);
2268        let chunk = hash_join.next_unwrap_ready_chunk()?;
2269        assert_eq!(
2270            chunk,
2271            StreamChunk::from_pretty(
2272                " I I
2273                + 2 5"
2274            )
2275        );
2276
2277        // push the 2nd left chunk
2278        tx_l.push_chunk(chunk_l2);
2279        let chunk = hash_join.next_unwrap_ready_chunk()?;
2280        assert_eq!(
2281            chunk,
2282            StreamChunk::from_pretty(
2283                " I I
2284                + 3 6"
2285            )
2286        );
2287
2288        // push the 3rd right chunk (tests forward_exactly_once)
2289        tx_r.push_chunk(chunk_r3);
2290        let chunk = hash_join.next_unwrap_ready_chunk()?;
2291        assert_eq!(
2292            chunk,
2293            StreamChunk::from_pretty(
2294                " I I
2295                + 6 10"
2296            )
2297        );
2298
2299        // push the 3rd left chunk
2300        // (tests that no change if there are still matches)
2301        tx_l.push_chunk(chunk_l3);
2302        hash_join.next_unwrap_pending();
2303
2304        // push the 3rd right chunk
2305        // (tests that deletion occurs when there are no more matches)
2306        tx_l.push_chunk(chunk_l4);
2307        let chunk = hash_join.next_unwrap_ready_chunk()?;
2308        assert_eq!(
2309            chunk,
2310            StreamChunk::from_pretty(
2311                " I I
2312                - 6 10"
2313            )
2314        );
2315
2316        Ok(())
2317    }
2318
2319    #[tokio::test]
2320    async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2321        let chunk_l1 = StreamChunk::from_pretty(
2322            "  I I
2323             + 1 4
2324             + 2 5
2325             + 3 6",
2326        );
2327        let chunk_l2 = StreamChunk::from_pretty(
2328            "  I I
2329             + 3 8
2330             - 3 8",
2331        );
2332        let chunk_r1 = StreamChunk::from_pretty(
2333            "  I I
2334             + 2 7
2335             + 4 8
2336             + 6 9",
2337        );
2338        let chunk_r2 = StreamChunk::from_pretty(
2339            "  I  I
2340             + 3 10
2341             + 6 11
2342             + 1 2
2343             + 1 3",
2344        );
2345        let chunk_l3 = StreamChunk::from_pretty(
2346            "  I I
2347             + 9 10",
2348        );
2349        let chunk_r3 = StreamChunk::from_pretty(
2350            "  I I
2351             - 1 2",
2352        );
2353        let chunk_r4 = StreamChunk::from_pretty(
2354            "  I I
2355             - 1 3",
2356        );
2357        let (mut tx_l, mut tx_r, mut hash_join) =
2358            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2359
2360        // push the init barrier for left and right
2361        tx_l.push_barrier(test_epoch(1), false);
2362        tx_r.push_barrier(test_epoch(1), false);
2363        hash_join.next_unwrap_ready_barrier()?;
2364
2365        // push the 1st left chunk
2366        tx_l.push_chunk(chunk_l1);
2367        let chunk = hash_join.next_unwrap_ready_chunk()?;
2368        assert_eq!(
2369            chunk,
2370            StreamChunk::from_pretty(
2371                " I I
2372                + 1 4
2373                + 2 5
2374                + 3 6",
2375            )
2376        );
2377
2378        // push the init barrier for left and right
2379        tx_l.push_barrier(test_epoch(2), false);
2380        tx_r.push_barrier(test_epoch(2), false);
2381        hash_join.next_unwrap_ready_barrier()?;
2382
2383        // push the 2nd left chunk
2384        tx_l.push_chunk(chunk_l2);
2385        let chunk = hash_join.next_unwrap_ready_chunk()?;
2386        assert_eq!(
2387            chunk,
2388            StreamChunk::from_pretty(
2389                "  I I
2390                 + 3 8
2391                 - 3 8",
2392            )
2393        );
2394
2395        // push the 1st right chunk
2396        tx_r.push_chunk(chunk_r1);
2397        let chunk = hash_join.next_unwrap_ready_chunk()?;
2398        assert_eq!(
2399            chunk,
2400            StreamChunk::from_pretty(
2401                " I I
2402                - 2 5"
2403            )
2404        );
2405
2406        // push the 2nd right chunk
2407        tx_r.push_chunk(chunk_r2);
2408        let chunk = hash_join.next_unwrap_ready_chunk()?;
2409        assert_eq!(
2410            chunk,
2411            StreamChunk::from_pretty(
2412                " I I
2413                - 3 6
2414                - 1 4"
2415            )
2416        );
2417
2418        // push the 3rd left chunk (tests forward_exactly_once)
2419        tx_l.push_chunk(chunk_l3);
2420        let chunk = hash_join.next_unwrap_ready_chunk()?;
2421        assert_eq!(
2422            chunk,
2423            StreamChunk::from_pretty(
2424                " I I
2425                + 9 10"
2426            )
2427        );
2428
2429        // push the 3rd right chunk
2430        // (tests that no change if there are still matches)
2431        tx_r.push_chunk(chunk_r3);
2432        hash_join.next_unwrap_pending();
2433
2434        // push the 4th right chunk
2435        // (tests that insertion occurs when there are no more matches)
2436        tx_r.push_chunk(chunk_r4);
2437        let chunk = hash_join.next_unwrap_ready_chunk()?;
2438        assert_eq!(
2439            chunk,
2440            StreamChunk::from_pretty(
2441                " I I
2442                + 1 4"
2443            )
2444        );
2445
2446        Ok(())
2447    }
2448
2449    #[tokio::test]
2450    async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2451        let chunk_r1 = StreamChunk::from_pretty(
2452            "  I I
2453             + 1 4
2454             + 2 5
2455             + 3 6",
2456        );
2457        let chunk_r2 = StreamChunk::from_pretty(
2458            "  I I
2459             + 3 8
2460             - 3 8",
2461        );
2462        let chunk_l1 = StreamChunk::from_pretty(
2463            "  I I
2464             + 2 7
2465             + 4 8
2466             + 6 9",
2467        );
2468        let chunk_l2 = StreamChunk::from_pretty(
2469            "  I  I
2470             + 3 10
2471             + 6 11
2472             + 1 2
2473             + 1 3",
2474        );
2475        let chunk_r3 = StreamChunk::from_pretty(
2476            "  I I
2477             + 9 10",
2478        );
2479        let chunk_l3 = StreamChunk::from_pretty(
2480            "  I I
2481             - 1 2",
2482        );
2483        let chunk_l4 = StreamChunk::from_pretty(
2484            "  I I
2485             - 1 3",
2486        );
2487        let (mut tx_r, mut tx_l, mut hash_join) =
2488            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2489
2490        // push the init barrier for left and right
2491        tx_r.push_barrier(test_epoch(1), false);
2492        tx_l.push_barrier(test_epoch(1), false);
2493        hash_join.next_unwrap_ready_barrier()?;
2494
2495        // push the 1st right chunk
2496        tx_r.push_chunk(chunk_r1);
2497        let chunk = hash_join.next_unwrap_ready_chunk()?;
2498        assert_eq!(
2499            chunk,
2500            StreamChunk::from_pretty(
2501                " I I
2502                + 1 4
2503                + 2 5
2504                + 3 6",
2505            )
2506        );
2507
2508        // push the init barrier for left and right
2509        tx_r.push_barrier(test_epoch(2), false);
2510        tx_l.push_barrier(test_epoch(2), false);
2511        hash_join.next_unwrap_ready_barrier()?;
2512
2513        // push the 2nd right chunk
2514        tx_r.push_chunk(chunk_r2);
2515        let chunk = hash_join.next_unwrap_ready_chunk()?;
2516        assert_eq!(
2517            chunk,
2518            StreamChunk::from_pretty(
2519                "  I I
2520                 + 3 8
2521                 - 3 8",
2522            )
2523        );
2524
2525        // push the 1st left chunk
2526        tx_l.push_chunk(chunk_l1);
2527        let chunk = hash_join.next_unwrap_ready_chunk()?;
2528        assert_eq!(
2529            chunk,
2530            StreamChunk::from_pretty(
2531                " I I
2532                - 2 5"
2533            )
2534        );
2535
2536        // push the 2nd left chunk
2537        tx_l.push_chunk(chunk_l2);
2538        let chunk = hash_join.next_unwrap_ready_chunk()?;
2539        assert_eq!(
2540            chunk,
2541            StreamChunk::from_pretty(
2542                " I I
2543                - 3 6
2544                - 1 4"
2545            )
2546        );
2547
2548        // push the 3rd right chunk (tests forward_exactly_once)
2549        tx_r.push_chunk(chunk_r3);
2550        let chunk = hash_join.next_unwrap_ready_chunk()?;
2551        assert_eq!(
2552            chunk,
2553            StreamChunk::from_pretty(
2554                " I I
2555                + 9 10"
2556            )
2557        );
2558
2559        // push the 3rd left chunk
2560        // (tests that no change if there are still matches)
2561        tx_l.push_chunk(chunk_l3);
2562        hash_join.next_unwrap_pending();
2563
2564        // push the 4th left chunk
2565        // (tests that insertion occurs when there are no more matches)
2566        tx_l.push_chunk(chunk_l4);
2567        let chunk = hash_join.next_unwrap_ready_chunk()?;
2568        assert_eq!(
2569            chunk,
2570            StreamChunk::from_pretty(
2571                " I I
2572                + 1 4"
2573            )
2574        );
2575
2576        Ok(())
2577    }
2578
2579    #[tokio::test]
2580    async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2581        let chunk_l1 = StreamChunk::from_pretty(
2582            "  I I
2583             + 1 4
2584             + 2 5
2585             + 3 6",
2586        );
2587        let chunk_l2 = StreamChunk::from_pretty(
2588            "  I I
2589             + 6 8
2590             + 3 8",
2591        );
2592        let chunk_r1 = StreamChunk::from_pretty(
2593            "  I I
2594             + 2 7
2595             + 4 8
2596             + 6 9",
2597        );
2598        let chunk_r2 = StreamChunk::from_pretty(
2599            "  I  I
2600             + 3 10
2601             + 6 11",
2602        );
2603        let (mut tx_l, mut tx_r, mut hash_join) =
2604            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2605
2606        // push the init barrier for left and right
2607        tx_l.push_barrier(test_epoch(1), false);
2608        tx_r.push_barrier(test_epoch(1), false);
2609        hash_join.next_unwrap_ready_barrier()?;
2610
2611        // push the 1st left chunk
2612        tx_l.push_chunk(chunk_l1);
2613        hash_join.next_unwrap_pending();
2614
2615        // push a barrier to left side
2616        tx_l.push_barrier(test_epoch(2), false);
2617
2618        // push the 2nd left chunk
2619        tx_l.push_chunk(chunk_l2);
2620
2621        // join the first right chunk
2622        tx_r.push_chunk(chunk_r1);
2623
2624        // Consume stream chunk
2625        let chunk = hash_join.next_unwrap_ready_chunk()?;
2626        assert_eq!(
2627            chunk,
2628            StreamChunk::from_pretty(
2629                " I I I I
2630                + 2 5 2 7"
2631            )
2632        );
2633
2634        // push a barrier to right side
2635        tx_r.push_barrier(test_epoch(2), false);
2636
2637        // get the aligned barrier here
2638        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2639        assert!(matches!(
2640            hash_join.next_unwrap_ready_barrier()?,
2641            Barrier {
2642                epoch,
2643                mutation: None,
2644                ..
2645            } if epoch == expected_epoch
2646        ));
2647
2648        // join the 2nd left chunk
2649        let chunk = hash_join.next_unwrap_ready_chunk()?;
2650        assert_eq!(
2651            chunk,
2652            StreamChunk::from_pretty(
2653                " I I I I
2654                + 6 8 6 9"
2655            )
2656        );
2657
2658        // push the 2nd right chunk
2659        tx_r.push_chunk(chunk_r2);
2660        let chunk = hash_join.next_unwrap_ready_chunk()?;
2661        assert_eq!(
2662            chunk,
2663            StreamChunk::from_pretty(
2664                " I I I I
2665                + 3 6 3 10
2666                + 3 8 3 10
2667                + 6 8 6 11"
2668            )
2669        );
2670
2671        Ok(())
2672    }
2673
2674    #[tokio::test]
2675    async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2676        let chunk_l1 = StreamChunk::from_pretty(
2677            "  I I
2678             + 1 4
2679             + 2 .
2680             + 3 .",
2681        );
2682        let chunk_l2 = StreamChunk::from_pretty(
2683            "  I I
2684             + 6 .
2685             + 3 8",
2686        );
2687        let chunk_r1 = StreamChunk::from_pretty(
2688            "  I I
2689             + 2 7
2690             + 4 8
2691             + 6 9",
2692        );
2693        let chunk_r2 = StreamChunk::from_pretty(
2694            "  I  I
2695             + 3 10
2696             + 6 11",
2697        );
2698        let (mut tx_l, mut tx_r, mut hash_join) =
2699            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2700
2701        // push the init barrier for left and right
2702        tx_l.push_barrier(test_epoch(1), false);
2703        tx_r.push_barrier(test_epoch(1), false);
2704        hash_join.next_unwrap_ready_barrier()?;
2705
2706        // push the 1st left chunk
2707        tx_l.push_chunk(chunk_l1);
2708        hash_join.next_unwrap_pending();
2709
2710        // push a barrier to left side
2711        tx_l.push_barrier(test_epoch(2), false);
2712
2713        // push the 2nd left chunk
2714        tx_l.push_chunk(chunk_l2);
2715
2716        // join the first right chunk
2717        tx_r.push_chunk(chunk_r1);
2718
2719        // Consume stream chunk
2720        let chunk = hash_join.next_unwrap_ready_chunk()?;
2721        assert_eq!(
2722            chunk,
2723            StreamChunk::from_pretty(
2724                " I I I I
2725                + 2 . 2 7"
2726            )
2727        );
2728
2729        // push a barrier to right side
2730        tx_r.push_barrier(test_epoch(2), false);
2731
2732        // get the aligned barrier here
2733        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2734        assert!(matches!(
2735            hash_join.next_unwrap_ready_barrier()?,
2736            Barrier {
2737                epoch,
2738                mutation: None,
2739                ..
2740            } if epoch == expected_epoch
2741        ));
2742
2743        // join the 2nd left chunk
2744        let chunk = hash_join.next_unwrap_ready_chunk()?;
2745        assert_eq!(
2746            chunk,
2747            StreamChunk::from_pretty(
2748                " I I I I
2749                + 6 . 6 9"
2750            )
2751        );
2752
2753        // push the 2nd right chunk
2754        tx_r.push_chunk(chunk_r2);
2755        let chunk = hash_join.next_unwrap_ready_chunk()?;
2756        assert_eq!(
2757            chunk,
2758            StreamChunk::from_pretty(
2759                " I I I I
2760                + 3 8 3 10
2761                + 3 . 3 10
2762                + 6 . 6 11"
2763            )
2764        );
2765
2766        Ok(())
2767    }
2768
2769    #[tokio::test]
2770    async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2771        let chunk_l1 = StreamChunk::from_pretty(
2772            "  I I
2773             + 1 4
2774             + 2 5
2775             + 3 6",
2776        );
2777        let chunk_l2 = StreamChunk::from_pretty(
2778            "  I I
2779             + 3 8
2780             - 3 8",
2781        );
2782        let chunk_r1 = StreamChunk::from_pretty(
2783            "  I I
2784             + 2 7
2785             + 4 8
2786             + 6 9",
2787        );
2788        let chunk_r2 = StreamChunk::from_pretty(
2789            "  I  I
2790             + 3 10
2791             + 6 11",
2792        );
2793        let (mut tx_l, mut tx_r, mut hash_join) =
2794            create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2795
2796        // push the init barrier for left and right
2797        tx_l.push_barrier(test_epoch(1), false);
2798        tx_r.push_barrier(test_epoch(1), false);
2799        hash_join.next_unwrap_ready_barrier()?;
2800
2801        // push the 1st left chunk
2802        tx_l.push_chunk(chunk_l1);
2803        let chunk = hash_join.next_unwrap_ready_chunk()?;
2804        assert_eq!(
2805            chunk,
2806            StreamChunk::from_pretty(
2807                " I I I I
2808                + 1 4 . .
2809                + 2 5 . .
2810                + 3 6 . ."
2811            )
2812        );
2813
2814        // push the 2nd left chunk
2815        tx_l.push_chunk(chunk_l2);
2816        let chunk = hash_join.next_unwrap_ready_chunk()?;
2817        assert_eq!(
2818            chunk,
2819            StreamChunk::from_pretty(
2820                " I I I I
2821                + 3 8 . .
2822                - 3 8 . ."
2823            )
2824        );
2825
2826        // push the 1st right chunk
2827        tx_r.push_chunk(chunk_r1);
2828        let chunk = hash_join.next_unwrap_ready_chunk()?;
2829        assert_eq!(
2830            chunk,
2831            StreamChunk::from_pretty(
2832                "  I I I I
2833                U- 2 5 . .
2834                U+ 2 5 2 7"
2835            )
2836        );
2837
2838        // push the 2nd right chunk
2839        tx_r.push_chunk(chunk_r2);
2840        let chunk = hash_join.next_unwrap_ready_chunk()?;
2841        assert_eq!(
2842            chunk,
2843            StreamChunk::from_pretty(
2844                "  I I I I
2845                U- 3 6 . .
2846                U+ 3 6 3 10"
2847            )
2848        );
2849
2850        Ok(())
2851    }
2852
2853    #[tokio::test]
2854    async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2855        let chunk_l1 = StreamChunk::from_pretty(
2856            "  I I
2857             + 1 4
2858             + 2 5
2859             + . 6",
2860        );
2861        let chunk_l2 = StreamChunk::from_pretty(
2862            "  I I
2863             + . 8
2864             - . 8",
2865        );
2866        let chunk_r1 = StreamChunk::from_pretty(
2867            "  I I
2868             + 2 7
2869             + 4 8
2870             + 6 9",
2871        );
2872        let chunk_r2 = StreamChunk::from_pretty(
2873            "  I  I
2874             + . 10
2875             + 6 11",
2876        );
2877        let (mut tx_l, mut tx_r, mut hash_join) =
2878            create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2879
2880        // push the init barrier for left and right
2881        tx_l.push_barrier(test_epoch(1), false);
2882        tx_r.push_barrier(test_epoch(1), false);
2883        hash_join.next_unwrap_ready_barrier()?;
2884
2885        // push the 1st left chunk
2886        tx_l.push_chunk(chunk_l1);
2887        let chunk = hash_join.next_unwrap_ready_chunk()?;
2888        assert_eq!(
2889            chunk,
2890            StreamChunk::from_pretty(
2891                " I I I I
2892                + 1 4 . .
2893                + 2 5 . .
2894                + . 6 . ."
2895            )
2896        );
2897
2898        // push the 2nd left chunk
2899        tx_l.push_chunk(chunk_l2);
2900        let chunk = hash_join.next_unwrap_ready_chunk()?;
2901        assert_eq!(
2902            chunk,
2903            StreamChunk::from_pretty(
2904                " I I I I
2905                + . 8 . .
2906                - . 8 . ."
2907            )
2908        );
2909
2910        // push the 1st right chunk
2911        tx_r.push_chunk(chunk_r1);
2912        let chunk = hash_join.next_unwrap_ready_chunk()?;
2913        assert_eq!(
2914            chunk,
2915            StreamChunk::from_pretty(
2916                "  I I I I
2917                U- 2 5 . .
2918                U+ 2 5 2 7"
2919            )
2920        );
2921
2922        // push the 2nd right chunk
2923        tx_r.push_chunk(chunk_r2);
2924        let chunk = hash_join.next_unwrap_ready_chunk()?;
2925        assert_eq!(
2926            chunk,
2927            StreamChunk::from_pretty(
2928                "  I I I I
2929                U- . 6 . .
2930                U+ . 6 . 10"
2931            )
2932        );
2933
2934        Ok(())
2935    }
2936
2937    #[tokio::test]
2938    async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2939        let chunk_l1 = StreamChunk::from_pretty(
2940            "  I I
2941             + 1 4
2942             + 2 5
2943             + 3 6",
2944        );
2945        let chunk_l2 = StreamChunk::from_pretty(
2946            "  I I
2947             + 3 8
2948             - 3 8",
2949        );
2950        let chunk_r1 = StreamChunk::from_pretty(
2951            "  I I
2952             + 2 7
2953             + 4 8
2954             + 6 9",
2955        );
2956        let chunk_r2 = StreamChunk::from_pretty(
2957            "  I  I
2958             + 5 10
2959             - 5 10",
2960        );
2961        let (mut tx_l, mut tx_r, mut hash_join) =
2962            create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
2963
2964        // push the init barrier for left and right
2965        tx_l.push_barrier(test_epoch(1), false);
2966        tx_r.push_barrier(test_epoch(1), false);
2967        hash_join.next_unwrap_ready_barrier()?;
2968
2969        // push the 1st left chunk
2970        tx_l.push_chunk(chunk_l1);
2971        hash_join.next_unwrap_pending();
2972
2973        // push the 2nd left chunk
2974        tx_l.push_chunk(chunk_l2);
2975        hash_join.next_unwrap_pending();
2976
2977        // push the 1st right chunk
2978        tx_r.push_chunk(chunk_r1);
2979        let chunk = hash_join.next_unwrap_ready_chunk()?;
2980        assert_eq!(
2981            chunk,
2982            StreamChunk::from_pretty(
2983                " I I I I
2984                + 2 5 2 7
2985                + . . 4 8
2986                + . . 6 9"
2987            )
2988        );
2989
2990        // push the 2nd right chunk
2991        tx_r.push_chunk(chunk_r2);
2992        let chunk = hash_join.next_unwrap_ready_chunk()?;
2993        assert_eq!(
2994            chunk,
2995            StreamChunk::from_pretty(
2996                " I I I I
2997                + . . 5 10
2998                - . . 5 10"
2999            )
3000        );
3001
3002        Ok(())
3003    }
3004
3005    #[tokio::test]
3006    async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3007        let chunk_l1 = StreamChunk::from_pretty(
3008            "  I I I
3009             + 1 4 1
3010             + 2 5 2
3011             + 3 6 3",
3012        );
3013        let chunk_l2 = StreamChunk::from_pretty(
3014            "  I I I
3015             + 4 9 4
3016             + 5 10 5",
3017        );
3018        let chunk_r1 = StreamChunk::from_pretty(
3019            "  I I I
3020             + 2 5 1
3021             + 4 9 2
3022             + 6 9 3",
3023        );
3024        let chunk_r2 = StreamChunk::from_pretty(
3025            "  I I I
3026             + 1 4 4
3027             + 3 6 5",
3028        );
3029
3030        let (mut tx_l, mut tx_r, mut hash_join) =
3031            create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3032
3033        // push the init barrier for left and right
3034        tx_l.push_barrier(test_epoch(1), false);
3035        tx_r.push_barrier(test_epoch(1), false);
3036        hash_join.next_unwrap_ready_barrier()?;
3037
3038        // push the 1st left chunk
3039        tx_l.push_chunk(chunk_l1);
3040        let chunk = hash_join.next_unwrap_ready_chunk()?;
3041        assert_eq!(
3042            chunk,
3043            StreamChunk::from_pretty(
3044                " I I I I I I
3045                + 1 4 1 . . .
3046                + 2 5 2 . . .
3047                + 3 6 3 . . ."
3048            )
3049        );
3050
3051        // push the 2nd left chunk
3052        tx_l.push_chunk(chunk_l2);
3053        let chunk = hash_join.next_unwrap_ready_chunk()?;
3054        assert_eq!(
3055            chunk,
3056            StreamChunk::from_pretty(
3057                " I I I I I I
3058                + 4 9 4 . . .
3059                + 5 10 5 . . ."
3060            )
3061        );
3062
3063        // push the 1st right chunk
3064        tx_r.push_chunk(chunk_r1);
3065        let chunk = hash_join.next_unwrap_ready_chunk()?;
3066        assert_eq!(
3067            chunk,
3068            StreamChunk::from_pretty(
3069                "  I I I I I I
3070                U- 2 5 2 . . .
3071                U+ 2 5 2 2 5 1
3072                U- 4 9 4 . . .
3073                U+ 4 9 4 4 9 2"
3074            )
3075        );
3076
3077        // push the 2nd right chunk
3078        tx_r.push_chunk(chunk_r2);
3079        let chunk = hash_join.next_unwrap_ready_chunk()?;
3080        assert_eq!(
3081            chunk,
3082            StreamChunk::from_pretty(
3083                "  I I I I I I
3084                U- 1 4 1 . . .
3085                U+ 1 4 1 1 4 4
3086                U- 3 6 3 . . .
3087                U+ 3 6 3 3 6 5"
3088            )
3089        );
3090
3091        Ok(())
3092    }
3093
3094    #[tokio::test]
3095    async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3096        let chunk_l1 = StreamChunk::from_pretty(
3097            "  I I I
3098             + 1 4 1
3099             + 2 5 2
3100             + 3 6 3",
3101        );
3102        let chunk_l2 = StreamChunk::from_pretty(
3103            "  I I I
3104             + 4 9 4
3105             + 5 10 5",
3106        );
3107        let chunk_r1 = StreamChunk::from_pretty(
3108            "  I I I
3109             + 2 5 1
3110             + 4 9 2
3111             + 6 9 3",
3112        );
3113        let chunk_r2 = StreamChunk::from_pretty(
3114            "  I I I
3115             + 1 4 4
3116             + 3 6 5
3117             + 7 7 6",
3118        );
3119
3120        let (mut tx_l, mut tx_r, mut hash_join) =
3121            create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3122
3123        // push the init barrier for left and right
3124        tx_l.push_barrier(test_epoch(1), false);
3125        tx_r.push_barrier(test_epoch(1), false);
3126        hash_join.next_unwrap_ready_barrier()?;
3127
3128        // push the 1st left chunk
3129        tx_l.push_chunk(chunk_l1);
3130        hash_join.next_unwrap_pending();
3131
3132        // push the 2nd left chunk
3133        tx_l.push_chunk(chunk_l2);
3134        hash_join.next_unwrap_pending();
3135
3136        // push the 1st right chunk
3137        tx_r.push_chunk(chunk_r1);
3138        let chunk = hash_join.next_unwrap_ready_chunk()?;
3139        assert_eq!(
3140            chunk,
3141            StreamChunk::from_pretty(
3142                "  I I I I I I
3143                + 2 5 2 2 5 1
3144                + 4 9 4 4 9 2
3145                + . . . 6 9 3"
3146            )
3147        );
3148
3149        // push the 2nd right chunk
3150        tx_r.push_chunk(chunk_r2);
3151        let chunk = hash_join.next_unwrap_ready_chunk()?;
3152        assert_eq!(
3153            chunk,
3154            StreamChunk::from_pretty(
3155                "  I I I I I I
3156                + 1 4 1 1 4 4
3157                + 3 6 3 3 6 5
3158                + . . . 7 7 6"
3159            )
3160        );
3161
3162        Ok(())
3163    }
3164
3165    #[tokio::test]
3166    async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3167        let chunk_l1 = StreamChunk::from_pretty(
3168            "  I I
3169             + 1 4
3170             + 2 5
3171             + 3 6",
3172        );
3173        let chunk_l2 = StreamChunk::from_pretty(
3174            "  I I
3175             + 3 8
3176             - 3 8",
3177        );
3178        let chunk_r1 = StreamChunk::from_pretty(
3179            "  I I
3180             + 2 7
3181             + 4 8
3182             + 6 9",
3183        );
3184        let chunk_r2 = StreamChunk::from_pretty(
3185            "  I  I
3186             + 5 10
3187             - 5 10",
3188        );
3189        let (mut tx_l, mut tx_r, mut hash_join) =
3190            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3191
3192        // push the init barrier for left and right
3193        tx_l.push_barrier(test_epoch(1), false);
3194        tx_r.push_barrier(test_epoch(1), false);
3195        hash_join.next_unwrap_ready_barrier()?;
3196
3197        // push the 1st left chunk
3198        tx_l.push_chunk(chunk_l1);
3199        let chunk = hash_join.next_unwrap_ready_chunk()?;
3200        assert_eq!(
3201            chunk,
3202            StreamChunk::from_pretty(
3203                " I I I I
3204                + 1 4 . .
3205                + 2 5 . .
3206                + 3 6 . ."
3207            )
3208        );
3209
3210        // push the 2nd left chunk
3211        tx_l.push_chunk(chunk_l2);
3212        let chunk = hash_join.next_unwrap_ready_chunk()?;
3213        assert_eq!(
3214            chunk,
3215            StreamChunk::from_pretty(
3216                " I I I I
3217                + 3 8 . .
3218                - 3 8 . ."
3219            )
3220        );
3221
3222        // push the 1st right chunk
3223        tx_r.push_chunk(chunk_r1);
3224        let chunk = hash_join.next_unwrap_ready_chunk()?;
3225        assert_eq!(
3226            chunk,
3227            StreamChunk::from_pretty(
3228                "  I I I I
3229                U-  2 5 . .
3230                U+  2 5 2 7
3231                +  . . 4 8
3232                +  . . 6 9"
3233            )
3234        );
3235
3236        // push the 2nd right chunk
3237        tx_r.push_chunk(chunk_r2);
3238        let chunk = hash_join.next_unwrap_ready_chunk()?;
3239        assert_eq!(
3240            chunk,
3241            StreamChunk::from_pretty(
3242                " I I I I
3243                + . . 5 10
3244                - . . 5 10"
3245            )
3246        );
3247
3248        Ok(())
3249    }
3250
3251    #[tokio::test]
3252    async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3253        let (mut tx_l, mut tx_r, mut hash_join) =
3254            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3255
3256        // push the init barrier for left and right
3257        tx_l.push_barrier(test_epoch(1), false);
3258        tx_r.push_barrier(test_epoch(1), false);
3259        hash_join.next_unwrap_ready_barrier()?;
3260
3261        tx_l.push_chunk(StreamChunk::from_pretty(
3262            "  I I
3263             + 1 1
3264            ",
3265        ));
3266        let chunk = hash_join.next_unwrap_ready_chunk()?;
3267        assert_eq!(
3268            chunk,
3269            StreamChunk::from_pretty(
3270                " I I I I
3271                + 1 1 . ."
3272            )
3273        );
3274
3275        tx_r.push_chunk(StreamChunk::from_pretty(
3276            "  I I
3277             + 1 1
3278            ",
3279        ));
3280        let chunk = hash_join.next_unwrap_ready_chunk()?;
3281
3282        assert_eq!(
3283            chunk,
3284            StreamChunk::from_pretty(
3285                " I I I I
3286                U- 1 1 . .
3287                U+ 1 1 1 1"
3288            )
3289        );
3290
3291        tx_l.push_chunk(StreamChunk::from_pretty(
3292            "   I I
3293              - 1 1
3294              + 1 2
3295            ",
3296        ));
3297        let chunk = hash_join.next_unwrap_ready_chunk()?;
3298        let chunk = chunk.compact();
3299        assert_eq!(
3300            chunk,
3301            StreamChunk::from_pretty(
3302                " I I I I
3303                - 1 1 1 1
3304                + 1 2 1 1
3305                "
3306            )
3307        );
3308
3309        Ok(())
3310    }
3311
3312    #[tokio::test]
3313    async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3314    {
3315        let chunk_l1 = StreamChunk::from_pretty(
3316            "  I I
3317             + 1 4
3318             + 2 5
3319             + 3 6
3320             + 3 7",
3321        );
3322        let chunk_l2 = StreamChunk::from_pretty(
3323            "  I I
3324             + 3 8
3325             - 3 8
3326             - 1 4", // delete row to cause an empty JoinHashEntry
3327        );
3328        let chunk_r1 = StreamChunk::from_pretty(
3329            "  I I
3330             + 2 6
3331             + 4 8
3332             + 3 4",
3333        );
3334        let chunk_r2 = StreamChunk::from_pretty(
3335            "  I  I
3336             + 5 10
3337             - 5 10
3338             + 1 2",
3339        );
3340        let (mut tx_l, mut tx_r, mut hash_join) =
3341            create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3342
3343        // push the init barrier for left and right
3344        tx_l.push_barrier(test_epoch(1), false);
3345        tx_r.push_barrier(test_epoch(1), false);
3346        hash_join.next_unwrap_ready_barrier()?;
3347
3348        // push the 1st left chunk
3349        tx_l.push_chunk(chunk_l1);
3350        let chunk = hash_join.next_unwrap_ready_chunk()?;
3351        assert_eq!(
3352            chunk,
3353            StreamChunk::from_pretty(
3354                " I I I I
3355                + 1 4 . .
3356                + 2 5 . .
3357                + 3 6 . .
3358                + 3 7 . ."
3359            )
3360        );
3361
3362        // push the 2nd left chunk
3363        tx_l.push_chunk(chunk_l2);
3364        let chunk = hash_join.next_unwrap_ready_chunk()?;
3365        assert_eq!(
3366            chunk,
3367            StreamChunk::from_pretty(
3368                " I I I I
3369                + 3 8 . .
3370                - 3 8 . .
3371                - 1 4 . ."
3372            )
3373        );
3374
3375        // push the 1st right chunk
3376        tx_r.push_chunk(chunk_r1);
3377        let chunk = hash_join.next_unwrap_ready_chunk()?;
3378        assert_eq!(
3379            chunk,
3380            StreamChunk::from_pretty(
3381                "  I I I I
3382                U-  2 5 . .
3383                U+  2 5 2 6
3384                +  . . 4 8
3385                +  . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once
3386                             * despite matching on eq join on 2
3387                             * entries */
3388            )
3389        );
3390
3391        // push the 2nd right chunk
3392        tx_r.push_chunk(chunk_r2);
3393        let chunk = hash_join.next_unwrap_ready_chunk()?;
3394        assert_eq!(
3395            chunk,
3396            StreamChunk::from_pretty(
3397                " I I I I
3398                + . . 5 10
3399                - . . 5 10
3400                + . . 1 2" /* regression test (#2420): 1 2 forwarded even if matches on an empty
3401                            * join entry */
3402            )
3403        );
3404
3405        Ok(())
3406    }
3407
3408    #[tokio::test]
3409    async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3410        let chunk_l1 = StreamChunk::from_pretty(
3411            "  I I
3412             + 1 4
3413             + 2 10
3414             + 3 6",
3415        );
3416        let chunk_l2 = StreamChunk::from_pretty(
3417            "  I I
3418             + 3 8
3419             - 3 8",
3420        );
3421        let chunk_r1 = StreamChunk::from_pretty(
3422            "  I I
3423             + 2 7
3424             + 4 8
3425             + 6 9",
3426        );
3427        let chunk_r2 = StreamChunk::from_pretty(
3428            "  I  I
3429             + 3 10
3430             + 6 11",
3431        );
3432        let (mut tx_l, mut tx_r, mut hash_join) =
3433            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3434
3435        // push the init barrier for left and right
3436        tx_l.push_barrier(test_epoch(1), false);
3437        tx_r.push_barrier(test_epoch(1), false);
3438        hash_join.next_unwrap_ready_barrier()?;
3439
3440        // push the 1st left chunk
3441        tx_l.push_chunk(chunk_l1);
3442        hash_join.next_unwrap_pending();
3443
3444        // push the 2nd left chunk
3445        tx_l.push_chunk(chunk_l2);
3446        hash_join.next_unwrap_pending();
3447
3448        // push the 1st right chunk
3449        tx_r.push_chunk(chunk_r1);
3450        hash_join.next_unwrap_pending();
3451
3452        // push the 2nd right chunk
3453        tx_r.push_chunk(chunk_r2);
3454        let chunk = hash_join.next_unwrap_ready_chunk()?;
3455        assert_eq!(
3456            chunk,
3457            StreamChunk::from_pretty(
3458                " I I I I
3459                + 3 6 3 10"
3460            )
3461        );
3462
3463        Ok(())
3464    }
3465
3466    #[tokio::test]
3467    async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3468        let (mut tx_l, mut tx_r, mut hash_join) =
3469            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3470
3471        // push the init barrier for left and right
3472        tx_l.push_barrier(test_epoch(1), false);
3473        tx_r.push_barrier(test_epoch(1), false);
3474        hash_join.next_unwrap_ready_barrier()?;
3475
3476        tx_l.push_int64_watermark(0, 100);
3477
3478        tx_l.push_int64_watermark(0, 200);
3479
3480        tx_l.push_barrier(test_epoch(2), false);
3481        tx_r.push_barrier(test_epoch(2), false);
3482        hash_join.next_unwrap_ready_barrier()?;
3483
3484        tx_r.push_int64_watermark(0, 50);
3485
3486        let w1 = hash_join.next().await.unwrap().unwrap();
3487        let w1 = w1.as_watermark().unwrap();
3488
3489        let w2 = hash_join.next().await.unwrap().unwrap();
3490        let w2 = w2.as_watermark().unwrap();
3491
3492        tx_r.push_int64_watermark(0, 100);
3493
3494        let w3 = hash_join.next().await.unwrap().unwrap();
3495        let w3 = w3.as_watermark().unwrap();
3496
3497        let w4 = hash_join.next().await.unwrap().unwrap();
3498        let w4 = w4.as_watermark().unwrap();
3499
3500        assert_eq!(
3501            w1,
3502            &Watermark {
3503                col_idx: 2,
3504                data_type: DataType::Int64,
3505                val: ScalarImpl::Int64(50)
3506            }
3507        );
3508
3509        assert_eq!(
3510            w2,
3511            &Watermark {
3512                col_idx: 0,
3513                data_type: DataType::Int64,
3514                val: ScalarImpl::Int64(50)
3515            }
3516        );
3517
3518        assert_eq!(
3519            w3,
3520            &Watermark {
3521                col_idx: 2,
3522                data_type: DataType::Int64,
3523                val: ScalarImpl::Int64(100)
3524            }
3525        );
3526
3527        assert_eq!(
3528            w4,
3529            &Watermark {
3530                col_idx: 0,
3531                data_type: DataType::Int64,
3532                val: ScalarImpl::Int64(100)
3533            }
3534        );
3535
3536        Ok(())
3537    }
3538}