risingwave_stream/executor/
hash_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::assert_matches::assert_matches;
15use std::collections::{BTreeMap, HashSet};
16use std::time::Duration;
17
18use anyhow::Context;
19use itertools::Itertools;
20use multimap::MultiMap;
21use risingwave_common::array::Op;
22use risingwave_common::hash::{HashKey, NullBitmap};
23use risingwave_common::row::RowExt;
24use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
25use risingwave_common::util::epoch::EpochPair;
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_expr::ExprError;
28use risingwave_expr::expr::NonStrictExpression;
29use tokio::time::Instant;
30
31use self::builder::JoinChunkBuilder;
32use super::barrier_align::*;
33use super::join::hash_join::*;
34use super::join::row::JoinRow;
35use super::join::*;
36use super::watermark::*;
37use crate::executor::join::builder::JoinStreamChunkBuilder;
38use crate::executor::join::hash_join::CacheResult;
39use crate::executor::prelude::*;
40
41/// Evict the cache every n rows.
42const EVICT_EVERY_N_ROWS: u32 = 16;
43
44fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
45    HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
46}
47
48pub struct JoinParams {
49    /// Indices of the join keys
50    pub join_key_indices: Vec<usize>,
51    /// Indices of the input pk after dedup
52    pub deduped_pk_indices: Vec<usize>,
53}
54
55impl JoinParams {
56    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
57        Self {
58            join_key_indices,
59            deduped_pk_indices,
60        }
61    }
62}
63
64struct JoinSide<K: HashKey, S: StateStore> {
65    /// Store all data from a one side stream
66    ht: JoinHashMap<K, S>,
67    /// Indices of the join key columns
68    join_key_indices: Vec<usize>,
69    /// The data type of all columns without degree.
70    all_data_types: Vec<DataType>,
71    /// The start position for the side in output new columns
72    start_pos: usize,
73    /// The mapping from input indices of a side to output columes.
74    i2o_mapping: Vec<(usize, usize)>,
75    i2o_mapping_indexed: MultiMap<usize, usize>,
76    /// The first field of the ith element indicates that when a watermark at the ith column of
77    /// this side comes, what band join conditions should be updated in order to possibly
78    /// generate a new watermark at that column or the corresponding column in the counterpart
79    /// join side.
80    ///
81    /// The second field indicates that whether the column is required less than the
82    /// the corresponding column in the counterpart join side in the band join condition.
83    input2inequality_index: Vec<Vec<(usize, bool)>>,
84    /// Some fields which are required non null to match due to inequalities.
85    non_null_fields: Vec<usize>,
86    /// (i, j) in this `Vec` means that state data in this join side can be cleaned if the value of
87    /// its ith column is less than the synthetic watermark of the jth band join condition.
88    state_clean_columns: Vec<(usize, usize)>,
89    /// Whether degree table is needed for this side.
90    need_degree_table: bool,
91}
92
93impl<K: HashKey, S: StateStore> std::fmt::Debug for JoinSide<K, S> {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        f.debug_struct("JoinSide")
96            .field("join_key_indices", &self.join_key_indices)
97            .field("col_types", &self.all_data_types)
98            .field("start_pos", &self.start_pos)
99            .field("i2o_mapping", &self.i2o_mapping)
100            .field("need_degree_table", &self.need_degree_table)
101            .finish()
102    }
103}
104
105impl<K: HashKey, S: StateStore> JoinSide<K, S> {
106    // WARNING: Please do not call this until we implement it.
107    fn is_dirty(&self) -> bool {
108        unimplemented!()
109    }
110
111    #[expect(dead_code)]
112    fn clear_cache(&mut self) {
113        assert!(
114            !self.is_dirty(),
115            "cannot clear cache while states of hash join are dirty"
116        );
117
118        // TODO: not working with rearranged chain
119        // self.ht.clear();
120    }
121
122    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
123        self.ht.init(epoch).await
124    }
125}
126
127/// `HashJoinExecutor` takes two input streams and runs equal hash join on them.
128/// The output columns are the concatenation of left and right columns.
129pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive> {
130    ctx: ActorContextRef,
131    info: ExecutorInfo,
132
133    /// Left input executor
134    input_l: Option<Executor>,
135    /// Right input executor
136    input_r: Option<Executor>,
137    /// The data types of the formed new columns
138    actual_output_data_types: Vec<DataType>,
139    /// The parameters of the left join executor
140    side_l: JoinSide<K, S>,
141    /// The parameters of the right join executor
142    side_r: JoinSide<K, S>,
143    /// Optional non-equi join conditions
144    cond: Option<NonStrictExpression>,
145    /// Column indices of watermark output and offset expression of each inequality, respectively.
146    inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
147    /// The output watermark of each inequality condition and its value is the minimum of the
148    /// calculation result of both side. It will be used to generate watermark into downstream
149    /// and do state cleaning if `clean_state` field of that inequality is `true`.
150    inequality_watermarks: Vec<Option<Watermark>>,
151
152    /// Whether the logic can be optimized for append-only stream
153    append_only_optimize: bool,
154
155    metrics: Arc<StreamingMetrics>,
156    /// The maximum size of the chunk produced by executor at a time
157    chunk_size: usize,
158    /// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
159    cnt_rows_received: u32,
160
161    /// watermark column index -> `BufferedWatermarks`
162    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
163
164    /// When to alert high join amplification
165    high_join_amplification_threshold: usize,
166
167    /// Max number of rows that will be cached in the entry state.
168    entry_state_max_rows: usize,
169}
170
171impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> std::fmt::Debug
172    for HashJoinExecutor<K, S, T>
173{
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        f.debug_struct("HashJoinExecutor")
176            .field("join_type", &T)
177            .field("input_left", &self.input_l.as_ref().unwrap().identity())
178            .field("input_right", &self.input_r.as_ref().unwrap().identity())
179            .field("side_l", &self.side_l)
180            .field("side_r", &self.side_r)
181            .field("pk_indices", &self.info.pk_indices)
182            .field("schema", &self.info.schema)
183            .field("actual_output_data_types", &self.actual_output_data_types)
184            .finish()
185    }
186}
187
188impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> Execute for HashJoinExecutor<K, S, T> {
189    fn execute(self: Box<Self>) -> BoxedMessageStream {
190        self.into_stream().boxed()
191    }
192}
193
194struct EqJoinArgs<'a, K: HashKey, S: StateStore> {
195    ctx: &'a ActorContextRef,
196    side_l: &'a mut JoinSide<K, S>,
197    side_r: &'a mut JoinSide<K, S>,
198    actual_output_data_types: &'a [DataType],
199    cond: &'a mut Option<NonStrictExpression>,
200    inequality_watermarks: &'a [Option<Watermark>],
201    chunk: StreamChunk,
202    append_only_optimize: bool,
203    chunk_size: usize,
204    cnt_rows_received: &'a mut u32,
205    high_join_amplification_threshold: usize,
206    entry_state_max_rows: usize,
207}
208
209impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K, S, T> {
210    #[allow(clippy::too_many_arguments)]
211    pub fn new(
212        ctx: ActorContextRef,
213        info: ExecutorInfo,
214        input_l: Executor,
215        input_r: Executor,
216        params_l: JoinParams,
217        params_r: JoinParams,
218        null_safe: Vec<bool>,
219        output_indices: Vec<usize>,
220        cond: Option<NonStrictExpression>,
221        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
222        state_table_l: StateTable<S>,
223        degree_state_table_l: StateTable<S>,
224        state_table_r: StateTable<S>,
225        degree_state_table_r: StateTable<S>,
226        watermark_epoch: AtomicU64Ref,
227        is_append_only: bool,
228        metrics: Arc<StreamingMetrics>,
229        chunk_size: usize,
230        high_join_amplification_threshold: usize,
231    ) -> Self {
232        Self::new_with_cache_size(
233            ctx,
234            info,
235            input_l,
236            input_r,
237            params_l,
238            params_r,
239            null_safe,
240            output_indices,
241            cond,
242            inequality_pairs,
243            state_table_l,
244            degree_state_table_l,
245            state_table_r,
246            degree_state_table_r,
247            watermark_epoch,
248            is_append_only,
249            metrics,
250            chunk_size,
251            high_join_amplification_threshold,
252            None,
253        )
254    }
255
256    #[allow(clippy::too_many_arguments)]
257    pub fn new_with_cache_size(
258        ctx: ActorContextRef,
259        info: ExecutorInfo,
260        input_l: Executor,
261        input_r: Executor,
262        params_l: JoinParams,
263        params_r: JoinParams,
264        null_safe: Vec<bool>,
265        output_indices: Vec<usize>,
266        cond: Option<NonStrictExpression>,
267        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
268        state_table_l: StateTable<S>,
269        degree_state_table_l: StateTable<S>,
270        state_table_r: StateTable<S>,
271        degree_state_table_r: StateTable<S>,
272        watermark_epoch: AtomicU64Ref,
273        is_append_only: bool,
274        metrics: Arc<StreamingMetrics>,
275        chunk_size: usize,
276        high_join_amplification_threshold: usize,
277        entry_state_max_rows: Option<usize>,
278    ) -> Self {
279        let entry_state_max_rows = match entry_state_max_rows {
280            None => {
281                ctx.streaming_config
282                    .developer
283                    .hash_join_entry_state_max_rows
284            }
285            Some(entry_state_max_rows) => entry_state_max_rows,
286        };
287        let side_l_column_n = input_l.schema().len();
288
289        let schema_fields = match T {
290            JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
291            JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
292            _ => [
293                input_l.schema().fields.clone(),
294                input_r.schema().fields.clone(),
295            ]
296            .concat(),
297        };
298
299        let original_output_data_types = schema_fields
300            .iter()
301            .map(|field| field.data_type())
302            .collect_vec();
303        let actual_output_data_types = output_indices
304            .iter()
305            .map(|&idx| original_output_data_types[idx].clone())
306            .collect_vec();
307
308        // Data types of of hash join state.
309        let state_all_data_types_l = input_l.schema().data_types();
310        let state_all_data_types_r = input_r.schema().data_types();
311
312        let state_pk_indices_l = input_l.pk_indices().to_vec();
313        let state_pk_indices_r = input_r.pk_indices().to_vec();
314
315        let state_join_key_indices_l = params_l.join_key_indices;
316        let state_join_key_indices_r = params_r.join_key_indices;
317
318        let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
319        let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
320
321        let degree_pk_indices_l = (state_join_key_indices_l.len()
322            ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
323            .collect_vec();
324        let degree_pk_indices_r = (state_join_key_indices_r.len()
325            ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
326            .collect_vec();
327
328        // If pk is contained in join key.
329        let pk_contained_in_jk_l =
330            is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
331        let pk_contained_in_jk_r =
332            is_subset(state_pk_indices_r.clone(), state_join_key_indices_r.clone());
333
334        // check whether join key contains pk in both side
335        let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
336
337        let join_key_data_types_l = state_join_key_indices_l
338            .iter()
339            .map(|idx| state_all_data_types_l[*idx].clone())
340            .collect_vec();
341
342        let join_key_data_types_r = state_join_key_indices_r
343            .iter()
344            .map(|idx| state_all_data_types_r[*idx].clone())
345            .collect_vec();
346
347        assert_eq!(join_key_data_types_l, join_key_data_types_r);
348
349        let null_matched = K::Bitmap::from_bool_vec(null_safe);
350
351        let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
352        let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
353
354        let degree_state_l = need_degree_table_l.then(|| {
355            TableInner::new(
356                degree_pk_indices_l,
357                degree_join_key_indices_l,
358                degree_state_table_l,
359            )
360        });
361        let degree_state_r = need_degree_table_r.then(|| {
362            TableInner::new(
363                degree_pk_indices_r,
364                degree_join_key_indices_r,
365                degree_state_table_r,
366            )
367        });
368
369        let (left_to_output, right_to_output) = {
370            let (left_len, right_len) = if is_left_semi_or_anti(T) {
371                (state_all_data_types_l.len(), 0usize)
372            } else if is_right_semi_or_anti(T) {
373                (0usize, state_all_data_types_r.len())
374            } else {
375                (state_all_data_types_l.len(), state_all_data_types_r.len())
376            };
377            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
378        };
379
380        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
381        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
382
383        let left_input_len = input_l.schema().len();
384        let right_input_len = input_r.schema().len();
385        let mut l2inequality_index = vec![vec![]; left_input_len];
386        let mut r2inequality_index = vec![vec![]; right_input_len];
387        let mut l_state_clean_columns = vec![];
388        let mut r_state_clean_columns = vec![];
389        let inequality_pairs = inequality_pairs
390            .into_iter()
391            .enumerate()
392            .map(
393                |(
394                    index,
395                    (key_required_larger, key_required_smaller, clean_state, delta_expression),
396                )| {
397                    let output_indices = if key_required_larger < key_required_smaller {
398                        if clean_state {
399                            l_state_clean_columns.push((key_required_larger, index));
400                        }
401                        l2inequality_index[key_required_larger].push((index, false));
402                        r2inequality_index[key_required_smaller - left_input_len]
403                            .push((index, true));
404                        l2o_indexed
405                            .get_vec(&key_required_larger)
406                            .cloned()
407                            .unwrap_or_default()
408                    } else {
409                        if clean_state {
410                            r_state_clean_columns
411                                .push((key_required_larger - left_input_len, index));
412                        }
413                        l2inequality_index[key_required_smaller].push((index, true));
414                        r2inequality_index[key_required_larger - left_input_len]
415                            .push((index, false));
416                        r2o_indexed
417                            .get_vec(&(key_required_larger - left_input_len))
418                            .cloned()
419                            .unwrap_or_default()
420                    };
421                    (output_indices, delta_expression)
422                },
423            )
424            .collect_vec();
425
426        let mut l_non_null_fields = l2inequality_index
427            .iter()
428            .positions(|inequalities| !inequalities.is_empty())
429            .collect_vec();
430        let mut r_non_null_fields = r2inequality_index
431            .iter()
432            .positions(|inequalities| !inequalities.is_empty())
433            .collect_vec();
434
435        if append_only_optimize {
436            l_state_clean_columns.clear();
437            r_state_clean_columns.clear();
438            l_non_null_fields.clear();
439            r_non_null_fields.clear();
440        }
441
442        let inequality_watermarks = vec![None; inequality_pairs.len()];
443        let watermark_buffers = BTreeMap::new();
444
445        Self {
446            ctx: ctx.clone(),
447            info,
448            input_l: Some(input_l),
449            input_r: Some(input_r),
450            actual_output_data_types,
451            side_l: JoinSide {
452                ht: JoinHashMap::new(
453                    watermark_epoch.clone(),
454                    join_key_data_types_l,
455                    state_join_key_indices_l.clone(),
456                    state_all_data_types_l.clone(),
457                    state_table_l,
458                    params_l.deduped_pk_indices,
459                    degree_state_l,
460                    null_matched.clone(),
461                    pk_contained_in_jk_l,
462                    None,
463                    metrics.clone(),
464                    ctx.id,
465                    ctx.fragment_id,
466                    "left",
467                ),
468                join_key_indices: state_join_key_indices_l,
469                all_data_types: state_all_data_types_l,
470                i2o_mapping: left_to_output,
471                i2o_mapping_indexed: l2o_indexed,
472                input2inequality_index: l2inequality_index,
473                non_null_fields: l_non_null_fields,
474                state_clean_columns: l_state_clean_columns,
475                start_pos: 0,
476                need_degree_table: need_degree_table_l,
477            },
478            side_r: JoinSide {
479                ht: JoinHashMap::new(
480                    watermark_epoch,
481                    join_key_data_types_r,
482                    state_join_key_indices_r.clone(),
483                    state_all_data_types_r.clone(),
484                    state_table_r,
485                    params_r.deduped_pk_indices,
486                    degree_state_r,
487                    null_matched,
488                    pk_contained_in_jk_r,
489                    None,
490                    metrics.clone(),
491                    ctx.id,
492                    ctx.fragment_id,
493                    "right",
494                ),
495                join_key_indices: state_join_key_indices_r,
496                all_data_types: state_all_data_types_r,
497                start_pos: side_l_column_n,
498                i2o_mapping: right_to_output,
499                i2o_mapping_indexed: r2o_indexed,
500                input2inequality_index: r2inequality_index,
501                non_null_fields: r_non_null_fields,
502                state_clean_columns: r_state_clean_columns,
503                need_degree_table: need_degree_table_r,
504            },
505            cond,
506            inequality_pairs,
507            inequality_watermarks,
508            append_only_optimize,
509            metrics,
510            chunk_size,
511            cnt_rows_received: 0,
512            watermark_buffers,
513            high_join_amplification_threshold,
514            entry_state_max_rows,
515        }
516    }
517
518    #[try_stream(ok = Message, error = StreamExecutorError)]
519    async fn into_stream(mut self) {
520        let input_l = self.input_l.take().unwrap();
521        let input_r = self.input_r.take().unwrap();
522        let aligned_stream = barrier_align(
523            input_l.execute(),
524            input_r.execute(),
525            self.ctx.id,
526            self.ctx.fragment_id,
527            self.metrics.clone(),
528            "Join",
529        );
530        pin_mut!(aligned_stream);
531
532        let actor_id = self.ctx.id;
533
534        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
535        let first_epoch = barrier.epoch;
536        // The first barrier message should be propagated.
537        yield Message::Barrier(barrier);
538        self.side_l.init(first_epoch).await?;
539        self.side_r.init(first_epoch).await?;
540
541        let actor_id_str = self.ctx.id.to_string();
542        let fragment_id_str = self.ctx.fragment_id.to_string();
543
544        // initialized some metrics
545        let join_actor_input_waiting_duration_ns = self
546            .metrics
547            .join_actor_input_waiting_duration_ns
548            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
549        let left_join_match_duration_ns = self
550            .metrics
551            .join_match_duration_ns
552            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
553        let right_join_match_duration_ns = self
554            .metrics
555            .join_match_duration_ns
556            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
557
558        let barrier_join_match_duration_ns = self
559            .metrics
560            .join_match_duration_ns
561            .with_guarded_label_values(&[
562                actor_id_str.as_str(),
563                fragment_id_str.as_str(),
564                "barrier",
565            ]);
566
567        let left_join_cached_entry_count = self
568            .metrics
569            .join_cached_entry_count
570            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
571
572        let right_join_cached_entry_count = self
573            .metrics
574            .join_cached_entry_count
575            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
576
577        let mut start_time = Instant::now();
578
579        while let Some(msg) = aligned_stream
580            .next()
581            .instrument_await("hash_join_barrier_align")
582            .await
583        {
584            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
585            match msg? {
586                AlignedMessage::WatermarkLeft(watermark) => {
587                    for watermark_to_emit in
588                        self.handle_watermark(SideType::Left, watermark).await?
589                    {
590                        yield Message::Watermark(watermark_to_emit);
591                    }
592                }
593                AlignedMessage::WatermarkRight(watermark) => {
594                    for watermark_to_emit in
595                        self.handle_watermark(SideType::Right, watermark).await?
596                    {
597                        yield Message::Watermark(watermark_to_emit);
598                    }
599                }
600                AlignedMessage::Left(chunk) => {
601                    let mut left_time = Duration::from_nanos(0);
602                    let mut left_start_time = Instant::now();
603                    #[for_await]
604                    for chunk in Self::eq_join_left(EqJoinArgs {
605                        ctx: &self.ctx,
606                        side_l: &mut self.side_l,
607                        side_r: &mut self.side_r,
608                        actual_output_data_types: &self.actual_output_data_types,
609                        cond: &mut self.cond,
610                        inequality_watermarks: &self.inequality_watermarks,
611                        chunk,
612                        append_only_optimize: self.append_only_optimize,
613                        chunk_size: self.chunk_size,
614                        cnt_rows_received: &mut self.cnt_rows_received,
615                        high_join_amplification_threshold: self.high_join_amplification_threshold,
616                        entry_state_max_rows: self.entry_state_max_rows,
617                    }) {
618                        left_time += left_start_time.elapsed();
619                        yield Message::Chunk(chunk?);
620                        left_start_time = Instant::now();
621                    }
622                    left_time += left_start_time.elapsed();
623                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
624                    self.try_flush_data().await?;
625                }
626                AlignedMessage::Right(chunk) => {
627                    let mut right_time = Duration::from_nanos(0);
628                    let mut right_start_time = Instant::now();
629                    #[for_await]
630                    for chunk in Self::eq_join_right(EqJoinArgs {
631                        ctx: &self.ctx,
632                        side_l: &mut self.side_l,
633                        side_r: &mut self.side_r,
634                        actual_output_data_types: &self.actual_output_data_types,
635                        cond: &mut self.cond,
636                        inequality_watermarks: &self.inequality_watermarks,
637                        chunk,
638                        append_only_optimize: self.append_only_optimize,
639                        chunk_size: self.chunk_size,
640                        cnt_rows_received: &mut self.cnt_rows_received,
641                        high_join_amplification_threshold: self.high_join_amplification_threshold,
642                        entry_state_max_rows: self.entry_state_max_rows,
643                    }) {
644                        right_time += right_start_time.elapsed();
645                        yield Message::Chunk(chunk?);
646                        right_start_time = Instant::now();
647                    }
648                    right_time += right_start_time.elapsed();
649                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
650                    self.try_flush_data().await?;
651                }
652                AlignedMessage::Barrier(barrier) => {
653                    let barrier_start_time = Instant::now();
654                    let (left_post_commit, right_post_commit) =
655                        self.flush_data(barrier.epoch).await?;
656
657                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
658                    yield Message::Barrier(barrier);
659
660                    // Update the vnode bitmap for state tables of both sides if asked.
661                    right_post_commit
662                        .post_yield_barrier(update_vnode_bitmap.clone())
663                        .await?;
664                    if left_post_commit
665                        .post_yield_barrier(update_vnode_bitmap)
666                        .await?
667                        .unwrap_or(false)
668                    {
669                        self.watermark_buffers
670                            .values_mut()
671                            .for_each(|buffers| buffers.clear());
672                        self.inequality_watermarks.fill(None);
673                    }
674
675                    // Report metrics of cached join rows/entries
676                    for (join_cached_entry_count, ht) in [
677                        (&left_join_cached_entry_count, &self.side_l.ht),
678                        (&right_join_cached_entry_count, &self.side_r.ht),
679                    ] {
680                        join_cached_entry_count.set(ht.entry_count() as i64);
681                    }
682
683                    barrier_join_match_duration_ns
684                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
685                }
686            }
687            start_time = Instant::now();
688        }
689    }
690
691    async fn flush_data(
692        &mut self,
693        epoch: EpochPair,
694    ) -> StreamExecutorResult<(
695        JoinHashMapPostCommit<'_, K, S>,
696        JoinHashMapPostCommit<'_, K, S>,
697    )> {
698        // All changes to the state has been buffered in the mem-table of the state table. Just
699        // `commit` them here.
700        let left = self.side_l.ht.flush(epoch).await?;
701        let right = self.side_r.ht.flush(epoch).await?;
702        Ok((left, right))
703    }
704
705    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
706        // All changes to the state has been buffered in the mem-table of the state table. Just
707        // `commit` them here.
708        self.side_l.ht.try_flush().await?;
709        self.side_r.ht.try_flush().await?;
710        Ok(())
711    }
712
713    // We need to manually evict the cache.
714    fn evict_cache(
715        side_update: &mut JoinSide<K, S>,
716        side_match: &mut JoinSide<K, S>,
717        cnt_rows_received: &mut u32,
718    ) {
719        *cnt_rows_received += 1;
720        if *cnt_rows_received == EVICT_EVERY_N_ROWS {
721            side_update.ht.evict();
722            side_match.ht.evict();
723            *cnt_rows_received = 0;
724        }
725    }
726
727    async fn handle_watermark(
728        &mut self,
729        side: SideTypePrimitive,
730        watermark: Watermark,
731    ) -> StreamExecutorResult<Vec<Watermark>> {
732        let (side_update, side_match) = if side == SideType::Left {
733            (&mut self.side_l, &mut self.side_r)
734        } else {
735            (&mut self.side_r, &mut self.side_l)
736        };
737
738        // State cleaning
739        if side_update.join_key_indices[0] == watermark.col_idx {
740            side_match.ht.update_watermark(watermark.val.clone());
741        }
742
743        // Select watermarks to yield.
744        let wm_in_jk = side_update
745            .join_key_indices
746            .iter()
747            .positions(|idx| *idx == watermark.col_idx);
748        let mut watermarks_to_emit = vec![];
749        for idx in wm_in_jk {
750            let buffers = self
751                .watermark_buffers
752                .entry(idx)
753                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
754            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
755                let empty_indices = vec![];
756                let output_indices = side_update
757                    .i2o_mapping_indexed
758                    .get_vec(&side_update.join_key_indices[idx])
759                    .unwrap_or(&empty_indices)
760                    .iter()
761                    .chain(
762                        side_match
763                            .i2o_mapping_indexed
764                            .get_vec(&side_match.join_key_indices[idx])
765                            .unwrap_or(&empty_indices),
766                    );
767                for output_idx in output_indices {
768                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
769                }
770            };
771        }
772        for (inequality_index, need_offset) in
773            &side_update.input2inequality_index[watermark.col_idx]
774        {
775            let buffers = self
776                .watermark_buffers
777                .entry(side_update.join_key_indices.len() + inequality_index)
778                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
779            let mut input_watermark = watermark.clone();
780            if *need_offset
781                && let Some(delta_expression) = self.inequality_pairs[*inequality_index].1.as_ref()
782            {
783                // allow since we will handle error manually.
784                #[allow(clippy::disallowed_methods)]
785                let eval_result = delta_expression
786                    .inner()
787                    .eval_row(&OwnedRow::new(vec![Some(input_watermark.val)]))
788                    .await;
789                match eval_result {
790                    Ok(value) => input_watermark.val = value.unwrap(),
791                    Err(err) => {
792                        if !matches!(err, ExprError::NumericOutOfRange) {
793                            self.ctx.on_compute_error(err, &self.info.identity);
794                        }
795                        continue;
796                    }
797                }
798            };
799            if let Some(selected_watermark) = buffers.handle_watermark(side, input_watermark) {
800                for output_idx in &self.inequality_pairs[*inequality_index].0 {
801                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
802                }
803                self.inequality_watermarks[*inequality_index] = Some(selected_watermark);
804            }
805        }
806        Ok(watermarks_to_emit)
807    }
808
809    fn row_concat(
810        row_update: impl Row,
811        update_start_pos: usize,
812        row_matched: impl Row,
813        matched_start_pos: usize,
814    ) -> OwnedRow {
815        let mut new_row = vec![None; row_update.len() + row_matched.len()];
816
817        for (i, datum_ref) in row_update.iter().enumerate() {
818            new_row[i + update_start_pos] = datum_ref.to_owned_datum();
819        }
820        for (i, datum_ref) in row_matched.iter().enumerate() {
821            new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
822        }
823        OwnedRow::new(new_row)
824    }
825
826    /// Used to forward `eq_join_oneside` to show join side in stack.
827    fn eq_join_left(
828        args: EqJoinArgs<'_, K, S>,
829    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
830        Self::eq_join_oneside::<{ SideType::Left }>(args)
831    }
832
833    /// Used to forward `eq_join_oneside` to show join side in stack.
834    fn eq_join_right(
835        args: EqJoinArgs<'_, K, S>,
836    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
837        Self::eq_join_oneside::<{ SideType::Right }>(args)
838    }
839
840    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
841    async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S>) {
842        let EqJoinArgs {
843            ctx,
844            side_l,
845            side_r,
846            actual_output_data_types,
847            cond,
848            inequality_watermarks,
849            chunk,
850            append_only_optimize,
851            chunk_size,
852            cnt_rows_received,
853            high_join_amplification_threshold,
854            entry_state_max_rows,
855            ..
856        } = args;
857
858        let (side_update, side_match) = if SIDE == SideType::Left {
859            (side_l, side_r)
860        } else {
861            (side_r, side_l)
862        };
863
864        let useful_state_clean_columns = side_match
865            .state_clean_columns
866            .iter()
867            .filter_map(|(column_idx, inequality_index)| {
868                inequality_watermarks[*inequality_index]
869                    .as_ref()
870                    .map(|watermark| (*column_idx, watermark))
871            })
872            .collect_vec();
873
874        let mut hashjoin_chunk_builder =
875            JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
876                chunk_size,
877                actual_output_data_types.to_vec(),
878                side_update.i2o_mapping.clone(),
879                side_match.i2o_mapping.clone(),
880            ));
881
882        let join_matched_join_keys = ctx
883            .streaming_metrics
884            .join_matched_join_keys
885            .with_guarded_label_values(&[
886                &ctx.id.to_string(),
887                &ctx.fragment_id.to_string(),
888                &side_update.ht.table_id().to_string(),
889            ]);
890
891        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
892        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
893            let Some((op, row)) = r else {
894                continue;
895            };
896            Self::evict_cache(side_update, side_match, cnt_rows_received);
897
898            let cache_lookup_result = {
899                let probe_non_null_requirement_satisfied = side_update
900                    .non_null_fields
901                    .iter()
902                    .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
903                let build_non_null_requirement_satisfied =
904                    key.null_bitmap().is_subset(side_match.ht.null_matched());
905                if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
906                    side_match.ht.take_state_opt(key)
907                } else {
908                    CacheResult::NeverMatch
909                }
910            };
911            let mut total_matches = 0;
912
913            macro_rules! match_rows {
914                ($op:ident) => {
915                    Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
916                        cache_lookup_result,
917                        row,
918                        key,
919                        &mut hashjoin_chunk_builder,
920                        side_match,
921                        side_update,
922                        &useful_state_clean_columns,
923                        cond,
924                        append_only_optimize,
925                        entry_state_max_rows,
926                    )
927                };
928            }
929
930            match op {
931                Op::Insert | Op::UpdateInsert =>
932                {
933                    #[for_await]
934                    for chunk in match_rows!(Insert) {
935                        let chunk = chunk?;
936                        total_matches += chunk.cardinality();
937                        yield chunk;
938                    }
939                }
940                Op::Delete | Op::UpdateDelete =>
941                {
942                    #[for_await]
943                    for chunk in match_rows!(Delete) {
944                        let chunk = chunk?;
945                        total_matches += chunk.cardinality();
946                        yield chunk;
947                    }
948                }
949            };
950
951            join_matched_join_keys.observe(total_matches as _);
952            if total_matches > high_join_amplification_threshold {
953                let join_key_data_types = side_update.ht.join_key_data_types();
954                let key = key.deserialize(join_key_data_types)?;
955                tracing::warn!(target: "high_join_amplification",
956                    matched_rows_len = total_matches,
957                    update_table_id = side_update.ht.table_id(),
958                    match_table_id = side_match.ht.table_id(),
959                    join_key = ?key,
960                    actor_id = ctx.id,
961                    fragment_id = ctx.fragment_id,
962                    "large rows matched for join key"
963                );
964            }
965        }
966        // NOTE(kwannoel): We don't track metrics for this last chunk.
967        if let Some(chunk) = hashjoin_chunk_builder.take() {
968            yield chunk;
969        }
970    }
971
972    /// For the probe-side row, we need to check if it has values in cache, if not, we need to
973    /// fetch the matched rows from the state table.
974    ///
975    /// Every matched build-side row being processed needs to go through the following phases:
976    /// 1. Handle join condition evaluation.
977    /// 2. Always do cache refill, if the state count is good.
978    /// 3. Handle state cleaning.
979    /// 4. Handle degree table update.
980    #[allow(clippy::too_many_arguments)]
981    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
982    async fn handle_match_rows<
983        'a,
984        const SIDE: SideTypePrimitive,
985        const JOIN_OP: JoinOpPrimitive,
986    >(
987        cached_lookup_result: CacheResult,
988        row: RowRef<'a>,
989        key: &'a K,
990        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
991        side_match: &'a mut JoinSide<K, S>,
992        side_update: &'a mut JoinSide<K, S>,
993        useful_state_clean_columns: &'a [(usize, &'a Watermark)],
994        cond: &'a mut Option<NonStrictExpression>,
995        append_only_optimize: bool,
996        entry_state_max_rows: usize,
997    ) {
998        let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
999        let mut entry_state = JoinEntryState::default();
1000        let mut entry_state_count = 0;
1001
1002        let mut degree = 0;
1003        let mut append_only_matched_row: Option<JoinRow<OwnedRow>> = None;
1004        let mut matched_rows_to_clean = vec![];
1005
1006        macro_rules! match_row {
1007            (
1008                $match_order_key_indices:expr,
1009                $degree_table:expr,
1010                $matched_row:expr,
1011                $matched_row_ref:expr,
1012                $from_cache:literal
1013            ) => {
1014                Self::handle_match_row::<SIDE, { JOIN_OP }, { $from_cache }>(
1015                    row,
1016                    $matched_row,
1017                    $matched_row_ref,
1018                    hashjoin_chunk_builder,
1019                    $match_order_key_indices,
1020                    $degree_table,
1021                    side_update.start_pos,
1022                    side_match.start_pos,
1023                    cond,
1024                    &mut degree,
1025                    useful_state_clean_columns,
1026                    append_only_optimize,
1027                    &mut append_only_matched_row,
1028                    &mut matched_rows_to_clean,
1029                )
1030            };
1031        }
1032
1033        let entry_state = match cached_lookup_result {
1034            CacheResult::NeverMatch => {
1035                let op = match JOIN_OP {
1036                    JoinOp::Insert => Op::Insert,
1037                    JoinOp::Delete => Op::Delete,
1038                };
1039                if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1040                    yield chunk;
1041                }
1042                return Ok(());
1043            }
1044            CacheResult::Hit(mut cached_rows) => {
1045                let (match_order_key_indices, match_degree_state) =
1046                    side_match.ht.get_degree_state_mut_ref();
1047                // Handle cached rows which match the probe-side row.
1048                for (matched_row_ref, matched_row) in
1049                    cached_rows.values_mut(&side_match.all_data_types)
1050                {
1051                    let matched_row = matched_row?;
1052                    if let Some(chunk) = match_row!(
1053                        match_order_key_indices,
1054                        match_degree_state,
1055                        matched_row,
1056                        Some(matched_row_ref),
1057                        true
1058                    )
1059                    .await
1060                    {
1061                        yield chunk;
1062                    }
1063                }
1064
1065                cached_rows
1066            }
1067            CacheResult::Miss => {
1068                // Handle rows which are not in cache.
1069                let (matched_rows, match_order_key_indices, degree_table) = side_match
1070                    .ht
1071                    .fetch_matched_rows_and_get_degree_table_ref(key)
1072                    .await?;
1073
1074                #[for_await]
1075                for matched_row in matched_rows {
1076                    let (encoded_pk, matched_row) = matched_row?;
1077
1078                    let mut matched_row_ref = None;
1079
1080                    // cache refill
1081                    if entry_state_count <= entry_state_max_rows {
1082                        let row_ref = entry_state
1083                            .insert(encoded_pk, matched_row.encode(), None) // TODO(kwannoel): handle ineq key for asof join.
1084                            .with_context(|| format!("row: {}", row.display(),))?;
1085                        matched_row_ref = Some(row_ref);
1086                        entry_state_count += 1;
1087                    }
1088                    if let Some(chunk) = match_row!(
1089                        match_order_key_indices,
1090                        degree_table,
1091                        matched_row,
1092                        matched_row_ref,
1093                        false
1094                    )
1095                    .await
1096                    {
1097                        yield chunk;
1098                    }
1099                }
1100                Box::new(entry_state)
1101            }
1102        };
1103
1104        // forward rows depending on join types
1105        let op = match JOIN_OP {
1106            JoinOp::Insert => Op::Insert,
1107            JoinOp::Delete => Op::Delete,
1108        };
1109        if degree == 0 {
1110            if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1111                yield chunk;
1112            }
1113        } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1114        {
1115            yield chunk;
1116        }
1117
1118        // cache refill
1119        if cache_hit || entry_state_count <= entry_state_max_rows {
1120            side_match.ht.update_state(key, entry_state);
1121        }
1122
1123        // watermark state cleaning
1124        for matched_row in matched_rows_to_clean {
1125            side_match.ht.delete_handle_degree(key, matched_row)?;
1126        }
1127
1128        // apply append_only optimization to clean matched_rows which have been persisted
1129        if append_only_optimize && let Some(row) = append_only_matched_row {
1130            assert_matches!(JOIN_OP, JoinOp::Insert);
1131            side_match.ht.delete_handle_degree(key, row)?;
1132            return Ok(());
1133        }
1134
1135        // no append_only optimization, update state table(s).
1136        match JOIN_OP {
1137            JoinOp::Insert => {
1138                side_update
1139                    .ht
1140                    .insert_handle_degree(key, JoinRow::new(row, degree))?;
1141            }
1142            JoinOp::Delete => {
1143                side_update
1144                    .ht
1145                    .delete_handle_degree(key, JoinRow::new(row, degree))?;
1146            }
1147        }
1148    }
1149
1150    #[allow(clippy::too_many_arguments)]
1151    #[inline]
1152    async fn handle_match_row<
1153        'a,
1154        const SIDE: SideTypePrimitive,
1155        const JOIN_OP: JoinOpPrimitive,
1156        const MATCHED_ROWS_FROM_CACHE: bool,
1157    >(
1158        update_row: RowRef<'a>,
1159        mut matched_row: JoinRow<OwnedRow>,
1160        mut matched_row_cache_ref: Option<&mut StateValueType>,
1161        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1162        match_order_key_indices: &[usize],
1163        match_degree_table: &mut Option<TableInner<S>>,
1164        side_update_start_pos: usize,
1165        side_match_start_pos: usize,
1166        cond: &Option<NonStrictExpression>,
1167        update_row_degree: &mut u64,
1168        useful_state_clean_columns: &[(usize, &'a Watermark)],
1169        append_only_optimize: bool,
1170        append_only_matched_row: &mut Option<JoinRow<OwnedRow>>,
1171        matched_rows_to_clean: &mut Vec<JoinRow<OwnedRow>>,
1172    ) -> Option<StreamChunk> {
1173        let mut need_state_clean = false;
1174        let mut chunk_opt = None;
1175
1176        // TODO(kwannoel): Instead of evaluating this every loop,
1177        // we can call this only if there's a non-equi expression.
1178        // check join cond
1179        let join_condition_satisfied = Self::check_join_condition(
1180            update_row,
1181            side_update_start_pos,
1182            &matched_row.row,
1183            side_match_start_pos,
1184            cond,
1185        )
1186        .await;
1187
1188        if join_condition_satisfied {
1189            // update degree
1190            *update_row_degree += 1;
1191            // send matched row downstream
1192
1193            // Inserts must happen before degrees are updated,
1194            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1195            if matches!(JOIN_OP, JoinOp::Insert) && !forward_exactly_once(T, SIDE) {
1196                if let Some(chunk) =
1197                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1198                {
1199                    chunk_opt = Some(chunk);
1200                }
1201            }
1202            // TODO(kwannoel): We can actually statically decide this, using join side + join type.
1203            if let Some(degree_table) = match_degree_table {
1204                update_degree::<S, { JOIN_OP }>(
1205                    match_order_key_indices,
1206                    degree_table,
1207                    &mut matched_row,
1208                );
1209                if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1210                    // update matched row in cache
1211                    match JOIN_OP {
1212                        JoinOp::Insert => {
1213                            matched_row_cache_ref.as_mut().unwrap().degree += 1;
1214                        }
1215                        JoinOp::Delete => {
1216                            matched_row_cache_ref.as_mut().unwrap().degree -= 1;
1217                        }
1218                    }
1219                }
1220            }
1221
1222            // Deletes must happen after degree table is updated.
1223            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1224            if matches!(JOIN_OP, JoinOp::Delete) && !forward_exactly_once(T, SIDE) {
1225                if let Some(chunk) =
1226                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1227                {
1228                    chunk_opt = Some(chunk);
1229                }
1230            }
1231        } else {
1232            // check if need state cleaning
1233            for (column_idx, watermark) in useful_state_clean_columns {
1234                if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1235                    scalar
1236                        .default_cmp(&watermark.val.as_scalar_ref_impl())
1237                        .is_lt()
1238                }) {
1239                    need_state_clean = true;
1240                    break;
1241                }
1242            }
1243        }
1244        // If the stream is append-only and the join key covers pk in both side,
1245        // then we can remove matched rows since pk is unique and will not be
1246        // inserted again
1247        if append_only_optimize {
1248            assert_matches!(JOIN_OP, JoinOp::Insert);
1249            // Since join key contains pk and pk is unique, there should be only
1250            // one row if matched.
1251            assert!(append_only_matched_row.is_none());
1252            *append_only_matched_row = Some(matched_row);
1253        } else if need_state_clean {
1254            // `append_only_optimize` and `need_state_clean` won't both be true.
1255            // 'else' here is only to suppress compiler error, otherwise
1256            // `matched_row` will be moved twice.
1257            matched_rows_to_clean.push(matched_row);
1258        }
1259
1260        chunk_opt
1261    }
1262
1263    // TODO(yuhao-su): We should find a better way to eval the expression
1264    // without concat two rows.
1265    // if there are non-equi expressions
1266    // NOTE(kwannoel): We can probably let `eval` use `impl Row` instead of `OwnedRow`.
1267    #[inline]
1268    async fn check_join_condition(
1269        row: impl Row,
1270        side_update_start_pos: usize,
1271        matched_row: impl Row,
1272        side_match_start_pos: usize,
1273        join_condition: &Option<NonStrictExpression>,
1274    ) -> bool {
1275        if let Some(join_condition) = join_condition {
1276            let new_row = Self::row_concat(
1277                row,
1278                side_update_start_pos,
1279                matched_row,
1280                side_match_start_pos,
1281            );
1282            join_condition
1283                .eval_row_infallible(&new_row)
1284                .await
1285                .map(|s| *s.as_bool())
1286                .unwrap_or(false)
1287        } else {
1288            true
1289        }
1290    }
1291}
1292
1293#[cfg(test)]
1294mod tests {
1295    use std::sync::atomic::AtomicU64;
1296
1297    use pretty_assertions::assert_eq;
1298    use risingwave_common::array::*;
1299    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1300    use risingwave_common::hash::{Key64, Key128};
1301    use risingwave_common::util::epoch::test_epoch;
1302    use risingwave_common::util::sort_util::OrderType;
1303    use risingwave_storage::memory::MemoryStateStore;
1304
1305    use super::*;
1306    use crate::common::table::test_utils::gen_pbtable;
1307    use crate::executor::test_utils::expr::build_from_pretty;
1308    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1309
1310    async fn create_in_memory_state_table(
1311        mem_state: MemoryStateStore,
1312        data_types: &[DataType],
1313        order_types: &[OrderType],
1314        pk_indices: &[usize],
1315        table_id: u32,
1316    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1317        let column_descs = data_types
1318            .iter()
1319            .enumerate()
1320            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1321            .collect_vec();
1322        let state_table = StateTable::from_table_catalog(
1323            &gen_pbtable(
1324                TableId::new(table_id),
1325                column_descs,
1326                order_types.to_vec(),
1327                pk_indices.to_vec(),
1328                0,
1329            ),
1330            mem_state.clone(),
1331            None,
1332        )
1333        .await;
1334
1335        // Create degree table
1336        let mut degree_table_column_descs = vec![];
1337        pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1338            degree_table_column_descs.push(ColumnDesc::unnamed(
1339                ColumnId::new(pk_id as i32),
1340                data_types[*idx].clone(),
1341            ))
1342        });
1343        degree_table_column_descs.push(ColumnDesc::unnamed(
1344            ColumnId::new(pk_indices.len() as i32),
1345            DataType::Int64,
1346        ));
1347        let degree_state_table = StateTable::from_table_catalog(
1348            &gen_pbtable(
1349                TableId::new(table_id + 1),
1350                degree_table_column_descs,
1351                order_types.to_vec(),
1352                pk_indices.to_vec(),
1353                0,
1354            ),
1355            mem_state,
1356            None,
1357        )
1358        .await;
1359        (state_table, degree_state_table)
1360    }
1361
1362    fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1363        build_from_pretty(
1364            condition_text
1365                .as_deref()
1366                .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1367        )
1368    }
1369
1370    async fn create_executor<const T: JoinTypePrimitive>(
1371        with_condition: bool,
1372        null_safe: bool,
1373        condition_text: Option<String>,
1374        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
1375    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1376        let schema = Schema {
1377            fields: vec![
1378                Field::unnamed(DataType::Int64), // join key
1379                Field::unnamed(DataType::Int64),
1380            ],
1381        };
1382        let (tx_l, source_l) = MockSource::channel();
1383        let source_l = source_l.into_executor(schema.clone(), vec![1]);
1384        let (tx_r, source_r) = MockSource::channel();
1385        let source_r = source_r.into_executor(schema, vec![1]);
1386        let params_l = JoinParams::new(vec![0], vec![1]);
1387        let params_r = JoinParams::new(vec![0], vec![1]);
1388        let cond = with_condition.then(|| create_cond(condition_text));
1389
1390        let mem_state = MemoryStateStore::new();
1391
1392        let (state_l, degree_state_l) = create_in_memory_state_table(
1393            mem_state.clone(),
1394            &[DataType::Int64, DataType::Int64],
1395            &[OrderType::ascending(), OrderType::ascending()],
1396            &[0, 1],
1397            0,
1398        )
1399        .await;
1400
1401        let (state_r, degree_state_r) = create_in_memory_state_table(
1402            mem_state,
1403            &[DataType::Int64, DataType::Int64],
1404            &[OrderType::ascending(), OrderType::ascending()],
1405            &[0, 1],
1406            2,
1407        )
1408        .await;
1409
1410        let schema = match T {
1411            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1412            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1413            _ => [source_l.schema().fields(), source_r.schema().fields()]
1414                .concat()
1415                .into_iter()
1416                .collect(),
1417        };
1418        let schema_len = schema.len();
1419        let info = ExecutorInfo::new(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1420
1421        let executor = HashJoinExecutor::<Key64, MemoryStateStore, T>::new(
1422            ActorContext::for_test(123),
1423            info,
1424            source_l,
1425            source_r,
1426            params_l,
1427            params_r,
1428            vec![null_safe],
1429            (0..schema_len).collect_vec(),
1430            cond,
1431            inequality_pairs,
1432            state_l,
1433            degree_state_l,
1434            state_r,
1435            degree_state_r,
1436            Arc::new(AtomicU64::new(0)),
1437            false,
1438            Arc::new(StreamingMetrics::unused()),
1439            1024,
1440            2048,
1441        );
1442        (tx_l, tx_r, executor.boxed().execute())
1443    }
1444
1445    async fn create_classical_executor<const T: JoinTypePrimitive>(
1446        with_condition: bool,
1447        null_safe: bool,
1448        condition_text: Option<String>,
1449    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1450        create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1451    }
1452
1453    async fn create_append_only_executor<const T: JoinTypePrimitive>(
1454        with_condition: bool,
1455    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1456        let schema = Schema {
1457            fields: vec![
1458                Field::unnamed(DataType::Int64),
1459                Field::unnamed(DataType::Int64),
1460                Field::unnamed(DataType::Int64),
1461            ],
1462        };
1463        let (tx_l, source_l) = MockSource::channel();
1464        let source_l = source_l.into_executor(schema.clone(), vec![0]);
1465        let (tx_r, source_r) = MockSource::channel();
1466        let source_r = source_r.into_executor(schema, vec![0]);
1467        let params_l = JoinParams::new(vec![0, 1], vec![]);
1468        let params_r = JoinParams::new(vec![0, 1], vec![]);
1469        let cond = with_condition.then(|| create_cond(None));
1470
1471        let mem_state = MemoryStateStore::new();
1472
1473        let (state_l, degree_state_l) = create_in_memory_state_table(
1474            mem_state.clone(),
1475            &[DataType::Int64, DataType::Int64, DataType::Int64],
1476            &[
1477                OrderType::ascending(),
1478                OrderType::ascending(),
1479                OrderType::ascending(),
1480            ],
1481            &[0, 1, 0],
1482            0,
1483        )
1484        .await;
1485
1486        let (state_r, degree_state_r) = create_in_memory_state_table(
1487            mem_state,
1488            &[DataType::Int64, DataType::Int64, DataType::Int64],
1489            &[
1490                OrderType::ascending(),
1491                OrderType::ascending(),
1492                OrderType::ascending(),
1493            ],
1494            &[0, 1, 1],
1495            1,
1496        )
1497        .await;
1498
1499        let schema = match T {
1500            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1501            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1502            _ => [source_l.schema().fields(), source_r.schema().fields()]
1503                .concat()
1504                .into_iter()
1505                .collect(),
1506        };
1507        let schema_len = schema.len();
1508        let info = ExecutorInfo::new(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1509
1510        let executor = HashJoinExecutor::<Key128, MemoryStateStore, T>::new(
1511            ActorContext::for_test(123),
1512            info,
1513            source_l,
1514            source_r,
1515            params_l,
1516            params_r,
1517            vec![false],
1518            (0..schema_len).collect_vec(),
1519            cond,
1520            vec![],
1521            state_l,
1522            degree_state_l,
1523            state_r,
1524            degree_state_r,
1525            Arc::new(AtomicU64::new(0)),
1526            true,
1527            Arc::new(StreamingMetrics::unused()),
1528            1024,
1529            2048,
1530        );
1531        (tx_l, tx_r, executor.boxed().execute())
1532    }
1533
1534    #[tokio::test]
1535    async fn test_interval_join() -> StreamExecutorResult<()> {
1536        let chunk_l1 = StreamChunk::from_pretty(
1537            "  I I
1538             + 1 4
1539             + 2 3
1540             + 2 5
1541             + 3 6",
1542        );
1543        let chunk_l2 = StreamChunk::from_pretty(
1544            "  I I
1545             + 3 8
1546             - 3 8",
1547        );
1548        let chunk_r1 = StreamChunk::from_pretty(
1549            "  I I
1550             + 2 6
1551             + 4 8
1552             + 6 9",
1553        );
1554        let chunk_r2 = StreamChunk::from_pretty(
1555            "  I  I
1556             + 2 3
1557             + 6 11",
1558        );
1559        let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1560            true,
1561            false,
1562            Some(String::from("(and:boolean (greater_than:boolean $1:int8 (subtract:int8 $3:int8 2:int8)) (greater_than:boolean $3:int8 (subtract:int8 $1:int8 2:int8)))")),
1563            vec![(1, 3, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)"))), (3, 1, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)")))],
1564        )
1565        .await;
1566
1567        // push the init barrier for left and right
1568        tx_l.push_barrier(test_epoch(1), false);
1569        tx_r.push_barrier(test_epoch(1), false);
1570        hash_join.next_unwrap_ready_barrier()?;
1571
1572        // push the 1st left chunk
1573        tx_l.push_chunk(chunk_l1);
1574        hash_join.next_unwrap_pending();
1575
1576        // push the init barrier for left and right
1577        tx_l.push_barrier(test_epoch(2), false);
1578        tx_r.push_barrier(test_epoch(2), false);
1579        hash_join.next_unwrap_ready_barrier()?;
1580
1581        // push the 2nd left chunk
1582        tx_l.push_chunk(chunk_l2);
1583        hash_join.next_unwrap_pending();
1584
1585        tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1586        hash_join.next_unwrap_pending();
1587
1588        tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1589        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1590        assert_eq!(
1591            output_watermark,
1592            Watermark::new(1, DataType::Int64, ScalarImpl::Int64(4))
1593        );
1594        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1595        assert_eq!(
1596            output_watermark,
1597            Watermark::new(3, DataType::Int64, ScalarImpl::Int64(6))
1598        );
1599
1600        // push the 1st right chunk
1601        tx_r.push_chunk(chunk_r1);
1602        let chunk = hash_join.next_unwrap_ready_chunk()?;
1603        // data "2 3" should have been cleaned
1604        assert_eq!(
1605            chunk,
1606            StreamChunk::from_pretty(
1607                " I I I I
1608                + 2 5 2 6"
1609            )
1610        );
1611
1612        // push the 2nd right chunk
1613        tx_r.push_chunk(chunk_r2);
1614        // pending means that state clean is successful, or the executor will yield a chunk "+ 2 3
1615        // 2 3" here.
1616        hash_join.next_unwrap_pending();
1617
1618        Ok(())
1619    }
1620
1621    #[tokio::test]
1622    async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1623        let chunk_l1 = StreamChunk::from_pretty(
1624            "  I I
1625             + 1 4
1626             + 2 5
1627             + 3 6",
1628        );
1629        let chunk_l2 = StreamChunk::from_pretty(
1630            "  I I
1631             + 3 8
1632             - 3 8",
1633        );
1634        let chunk_r1 = StreamChunk::from_pretty(
1635            "  I I
1636             + 2 7
1637             + 4 8
1638             + 6 9",
1639        );
1640        let chunk_r2 = StreamChunk::from_pretty(
1641            "  I  I
1642             + 3 10
1643             + 6 11",
1644        );
1645        let (mut tx_l, mut tx_r, mut hash_join) =
1646            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1647
1648        // push the init barrier for left and right
1649        tx_l.push_barrier(test_epoch(1), false);
1650        tx_r.push_barrier(test_epoch(1), false);
1651        hash_join.next_unwrap_ready_barrier()?;
1652
1653        // push the 1st left chunk
1654        tx_l.push_chunk(chunk_l1);
1655        hash_join.next_unwrap_pending();
1656
1657        // push the init barrier for left and right
1658        tx_l.push_barrier(test_epoch(2), false);
1659        tx_r.push_barrier(test_epoch(2), false);
1660        hash_join.next_unwrap_ready_barrier()?;
1661
1662        // push the 2nd left chunk
1663        tx_l.push_chunk(chunk_l2);
1664        hash_join.next_unwrap_pending();
1665
1666        // push the 1st right chunk
1667        tx_r.push_chunk(chunk_r1);
1668        let chunk = hash_join.next_unwrap_ready_chunk()?;
1669        assert_eq!(
1670            chunk,
1671            StreamChunk::from_pretty(
1672                " I I I I
1673                + 2 5 2 7"
1674            )
1675        );
1676
1677        // push the 2nd right chunk
1678        tx_r.push_chunk(chunk_r2);
1679        let chunk = hash_join.next_unwrap_ready_chunk()?;
1680        assert_eq!(
1681            chunk,
1682            StreamChunk::from_pretty(
1683                " I I I I
1684                + 3 6 3 10"
1685            )
1686        );
1687
1688        Ok(())
1689    }
1690
1691    #[tokio::test]
1692    async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1693        let chunk_l1 = StreamChunk::from_pretty(
1694            "  I I
1695             + 1 4
1696             + 2 5
1697             + . 6",
1698        );
1699        let chunk_l2 = StreamChunk::from_pretty(
1700            "  I I
1701             + . 8
1702             - . 8",
1703        );
1704        let chunk_r1 = StreamChunk::from_pretty(
1705            "  I I
1706             + 2 7
1707             + 4 8
1708             + 6 9",
1709        );
1710        let chunk_r2 = StreamChunk::from_pretty(
1711            "  I  I
1712             + . 10
1713             + 6 11",
1714        );
1715        let (mut tx_l, mut tx_r, mut hash_join) =
1716            create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1717
1718        // push the init barrier for left and right
1719        tx_l.push_barrier(test_epoch(1), false);
1720        tx_r.push_barrier(test_epoch(1), false);
1721        hash_join.next_unwrap_ready_barrier()?;
1722
1723        // push the 1st left chunk
1724        tx_l.push_chunk(chunk_l1);
1725        hash_join.next_unwrap_pending();
1726
1727        // push the init barrier for left and right
1728        tx_l.push_barrier(test_epoch(2), false);
1729        tx_r.push_barrier(test_epoch(2), false);
1730        hash_join.next_unwrap_ready_barrier()?;
1731
1732        // push the 2nd left chunk
1733        tx_l.push_chunk(chunk_l2);
1734        hash_join.next_unwrap_pending();
1735
1736        // push the 1st right chunk
1737        tx_r.push_chunk(chunk_r1);
1738        let chunk = hash_join.next_unwrap_ready_chunk()?;
1739        assert_eq!(
1740            chunk,
1741            StreamChunk::from_pretty(
1742                " I I I I
1743                + 2 5 2 7"
1744            )
1745        );
1746
1747        // push the 2nd right chunk
1748        tx_r.push_chunk(chunk_r2);
1749        let chunk = hash_join.next_unwrap_ready_chunk()?;
1750        assert_eq!(
1751            chunk,
1752            StreamChunk::from_pretty(
1753                " I I I I
1754                + . 6 . 10"
1755            )
1756        );
1757
1758        Ok(())
1759    }
1760
1761    #[tokio::test]
1762    async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1763        let chunk_l1 = StreamChunk::from_pretty(
1764            "  I I
1765             + 1 4
1766             + 2 5
1767             + 3 6",
1768        );
1769        let chunk_l2 = StreamChunk::from_pretty(
1770            "  I I
1771             + 3 8
1772             - 3 8",
1773        );
1774        let chunk_r1 = StreamChunk::from_pretty(
1775            "  I I
1776             + 2 7
1777             + 4 8
1778             + 6 9",
1779        );
1780        let chunk_r2 = StreamChunk::from_pretty(
1781            "  I  I
1782             + 3 10
1783             + 6 11",
1784        );
1785        let chunk_l3 = StreamChunk::from_pretty(
1786            "  I I
1787             + 6 10",
1788        );
1789        let chunk_r3 = StreamChunk::from_pretty(
1790            "  I  I
1791             - 6 11",
1792        );
1793        let chunk_r4 = StreamChunk::from_pretty(
1794            "  I  I
1795             - 6 9",
1796        );
1797        let (mut tx_l, mut tx_r, mut hash_join) =
1798            create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1799
1800        // push the init barrier for left and right
1801        tx_l.push_barrier(test_epoch(1), false);
1802        tx_r.push_barrier(test_epoch(1), false);
1803        hash_join.next_unwrap_ready_barrier()?;
1804
1805        // push the 1st left chunk
1806        tx_l.push_chunk(chunk_l1);
1807        hash_join.next_unwrap_pending();
1808
1809        // push the init barrier for left and right
1810        tx_l.push_barrier(test_epoch(2), false);
1811        tx_r.push_barrier(test_epoch(2), false);
1812        hash_join.next_unwrap_ready_barrier()?;
1813
1814        // push the 2nd left chunk
1815        tx_l.push_chunk(chunk_l2);
1816        hash_join.next_unwrap_pending();
1817
1818        // push the 1st right chunk
1819        tx_r.push_chunk(chunk_r1);
1820        let chunk = hash_join.next_unwrap_ready_chunk()?;
1821        assert_eq!(
1822            chunk,
1823            StreamChunk::from_pretty(
1824                " I I
1825                + 2 5"
1826            )
1827        );
1828
1829        // push the 2nd right chunk
1830        tx_r.push_chunk(chunk_r2);
1831        let chunk = hash_join.next_unwrap_ready_chunk()?;
1832        assert_eq!(
1833            chunk,
1834            StreamChunk::from_pretty(
1835                " I I
1836                + 3 6"
1837            )
1838        );
1839
1840        // push the 3rd left chunk (tests forward_exactly_once)
1841        tx_l.push_chunk(chunk_l3);
1842        let chunk = hash_join.next_unwrap_ready_chunk()?;
1843        assert_eq!(
1844            chunk,
1845            StreamChunk::from_pretty(
1846                " I I
1847                + 6 10"
1848            )
1849        );
1850
1851        // push the 3rd right chunk
1852        // (tests that no change if there are still matches)
1853        tx_r.push_chunk(chunk_r3);
1854        hash_join.next_unwrap_pending();
1855
1856        // push the 3rd left chunk
1857        // (tests that deletion occurs when there are no more matches)
1858        tx_r.push_chunk(chunk_r4);
1859        let chunk = hash_join.next_unwrap_ready_chunk()?;
1860        assert_eq!(
1861            chunk,
1862            StreamChunk::from_pretty(
1863                " I I
1864                - 6 10"
1865            )
1866        );
1867
1868        Ok(())
1869    }
1870
1871    #[tokio::test]
1872    async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1873        let chunk_l1 = StreamChunk::from_pretty(
1874            "  I I
1875             + 1 4
1876             + 2 5
1877             + . 6",
1878        );
1879        let chunk_l2 = StreamChunk::from_pretty(
1880            "  I I
1881             + . 8
1882             - . 8",
1883        );
1884        let chunk_r1 = StreamChunk::from_pretty(
1885            "  I I
1886             + 2 7
1887             + 4 8
1888             + 6 9",
1889        );
1890        let chunk_r2 = StreamChunk::from_pretty(
1891            "  I  I
1892             + . 10
1893             + 6 11",
1894        );
1895        let chunk_l3 = StreamChunk::from_pretty(
1896            "  I I
1897             + 6 10",
1898        );
1899        let chunk_r3 = StreamChunk::from_pretty(
1900            "  I  I
1901             - 6 11",
1902        );
1903        let chunk_r4 = StreamChunk::from_pretty(
1904            "  I  I
1905             - 6 9",
1906        );
1907        let (mut tx_l, mut tx_r, mut hash_join) =
1908            create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1909
1910        // push the init barrier for left and right
1911        tx_l.push_barrier(test_epoch(1), false);
1912        tx_r.push_barrier(test_epoch(1), false);
1913        hash_join.next_unwrap_ready_barrier()?;
1914
1915        // push the 1st left chunk
1916        tx_l.push_chunk(chunk_l1);
1917        hash_join.next_unwrap_pending();
1918
1919        // push the init barrier for left and right
1920        tx_l.push_barrier(test_epoch(2), false);
1921        tx_r.push_barrier(test_epoch(2), false);
1922        hash_join.next_unwrap_ready_barrier()?;
1923
1924        // push the 2nd left chunk
1925        tx_l.push_chunk(chunk_l2);
1926        hash_join.next_unwrap_pending();
1927
1928        // push the 1st right chunk
1929        tx_r.push_chunk(chunk_r1);
1930        let chunk = hash_join.next_unwrap_ready_chunk()?;
1931        assert_eq!(
1932            chunk,
1933            StreamChunk::from_pretty(
1934                " I I
1935                + 2 5"
1936            )
1937        );
1938
1939        // push the 2nd right chunk
1940        tx_r.push_chunk(chunk_r2);
1941        let chunk = hash_join.next_unwrap_ready_chunk()?;
1942        assert_eq!(
1943            chunk,
1944            StreamChunk::from_pretty(
1945                " I I
1946                + . 6"
1947            )
1948        );
1949
1950        // push the 3rd left chunk (tests forward_exactly_once)
1951        tx_l.push_chunk(chunk_l3);
1952        let chunk = hash_join.next_unwrap_ready_chunk()?;
1953        assert_eq!(
1954            chunk,
1955            StreamChunk::from_pretty(
1956                " I I
1957                + 6 10"
1958            )
1959        );
1960
1961        // push the 3rd right chunk
1962        // (tests that no change if there are still matches)
1963        tx_r.push_chunk(chunk_r3);
1964        hash_join.next_unwrap_pending();
1965
1966        // push the 3rd left chunk
1967        // (tests that deletion occurs when there are no more matches)
1968        tx_r.push_chunk(chunk_r4);
1969        let chunk = hash_join.next_unwrap_ready_chunk()?;
1970        assert_eq!(
1971            chunk,
1972            StreamChunk::from_pretty(
1973                " I I
1974                - 6 10"
1975            )
1976        );
1977
1978        Ok(())
1979    }
1980
1981    #[tokio::test]
1982    async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
1983        let chunk_l1 = StreamChunk::from_pretty(
1984            "  I I I
1985             + 1 4 1
1986             + 2 5 2
1987             + 3 6 3",
1988        );
1989        let chunk_l2 = StreamChunk::from_pretty(
1990            "  I I I
1991             + 4 9 4
1992             + 5 10 5",
1993        );
1994        let chunk_r1 = StreamChunk::from_pretty(
1995            "  I I I
1996             + 2 5 1
1997             + 4 9 2
1998             + 6 9 3",
1999        );
2000        let chunk_r2 = StreamChunk::from_pretty(
2001            "  I I I
2002             + 1 4 4
2003             + 3 6 5",
2004        );
2005
2006        let (mut tx_l, mut tx_r, mut hash_join) =
2007            create_append_only_executor::<{ JoinType::Inner }>(false).await;
2008
2009        // push the init barrier for left and right
2010        tx_l.push_barrier(test_epoch(1), false);
2011        tx_r.push_barrier(test_epoch(1), false);
2012        hash_join.next_unwrap_ready_barrier()?;
2013
2014        // push the 1st left chunk
2015        tx_l.push_chunk(chunk_l1);
2016        hash_join.next_unwrap_pending();
2017
2018        // push the init barrier for left and right
2019        tx_l.push_barrier(test_epoch(2), false);
2020        tx_r.push_barrier(test_epoch(2), false);
2021        hash_join.next_unwrap_ready_barrier()?;
2022
2023        // push the 2nd left chunk
2024        tx_l.push_chunk(chunk_l2);
2025        hash_join.next_unwrap_pending();
2026
2027        // push the 1st right chunk
2028        tx_r.push_chunk(chunk_r1);
2029        let chunk = hash_join.next_unwrap_ready_chunk()?;
2030        assert_eq!(
2031            chunk,
2032            StreamChunk::from_pretty(
2033                " I I I I I I
2034                + 2 5 2 2 5 1
2035                + 4 9 4 4 9 2"
2036            )
2037        );
2038
2039        // push the 2nd right chunk
2040        tx_r.push_chunk(chunk_r2);
2041        let chunk = hash_join.next_unwrap_ready_chunk()?;
2042        assert_eq!(
2043            chunk,
2044            StreamChunk::from_pretty(
2045                " I I I I I I
2046                + 1 4 1 1 4 4
2047                + 3 6 3 3 6 5"
2048            )
2049        );
2050
2051        Ok(())
2052    }
2053
2054    #[tokio::test]
2055    async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2056        let chunk_l1 = StreamChunk::from_pretty(
2057            "  I I I
2058             + 1 4 1
2059             + 2 5 2
2060             + 3 6 3",
2061        );
2062        let chunk_l2 = StreamChunk::from_pretty(
2063            "  I I I
2064             + 4 9 4
2065             + 5 10 5",
2066        );
2067        let chunk_r1 = StreamChunk::from_pretty(
2068            "  I I I
2069             + 2 5 1
2070             + 4 9 2
2071             + 6 9 3",
2072        );
2073        let chunk_r2 = StreamChunk::from_pretty(
2074            "  I I I
2075             + 1 4 4
2076             + 3 6 5",
2077        );
2078
2079        let (mut tx_l, mut tx_r, mut hash_join) =
2080            create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2081
2082        // push the init barrier for left and right
2083        tx_l.push_barrier(test_epoch(1), false);
2084        tx_r.push_barrier(test_epoch(1), false);
2085        hash_join.next_unwrap_ready_barrier()?;
2086
2087        // push the 1st left chunk
2088        tx_l.push_chunk(chunk_l1);
2089        hash_join.next_unwrap_pending();
2090
2091        // push the init barrier for left and right
2092        tx_l.push_barrier(test_epoch(2), false);
2093        tx_r.push_barrier(test_epoch(2), false);
2094        hash_join.next_unwrap_ready_barrier()?;
2095
2096        // push the 2nd left chunk
2097        tx_l.push_chunk(chunk_l2);
2098        hash_join.next_unwrap_pending();
2099
2100        // push the 1st right chunk
2101        tx_r.push_chunk(chunk_r1);
2102        let chunk = hash_join.next_unwrap_ready_chunk()?;
2103        assert_eq!(
2104            chunk,
2105            StreamChunk::from_pretty(
2106                " I I I
2107                + 2 5 2
2108                + 4 9 4"
2109            )
2110        );
2111
2112        // push the 2nd right chunk
2113        tx_r.push_chunk(chunk_r2);
2114        let chunk = hash_join.next_unwrap_ready_chunk()?;
2115        assert_eq!(
2116            chunk,
2117            StreamChunk::from_pretty(
2118                " I I I
2119                + 1 4 1
2120                + 3 6 3"
2121            )
2122        );
2123
2124        Ok(())
2125    }
2126
2127    #[tokio::test]
2128    async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2129        let chunk_l1 = StreamChunk::from_pretty(
2130            "  I I I
2131             + 1 4 1
2132             + 2 5 2
2133             + 3 6 3",
2134        );
2135        let chunk_l2 = StreamChunk::from_pretty(
2136            "  I I I
2137             + 4 9 4
2138             + 5 10 5",
2139        );
2140        let chunk_r1 = StreamChunk::from_pretty(
2141            "  I I I
2142             + 2 5 1
2143             + 4 9 2
2144             + 6 9 3",
2145        );
2146        let chunk_r2 = StreamChunk::from_pretty(
2147            "  I I I
2148             + 1 4 4
2149             + 3 6 5",
2150        );
2151
2152        let (mut tx_l, mut tx_r, mut hash_join) =
2153            create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2154
2155        // push the init barrier for left and right
2156        tx_l.push_barrier(test_epoch(1), false);
2157        tx_r.push_barrier(test_epoch(1), false);
2158        hash_join.next_unwrap_ready_barrier()?;
2159
2160        // push the 1st left chunk
2161        tx_l.push_chunk(chunk_l1);
2162        hash_join.next_unwrap_pending();
2163
2164        // push the init barrier for left and right
2165        tx_l.push_barrier(test_epoch(2), false);
2166        tx_r.push_barrier(test_epoch(2), false);
2167        hash_join.next_unwrap_ready_barrier()?;
2168
2169        // push the 2nd left chunk
2170        tx_l.push_chunk(chunk_l2);
2171        hash_join.next_unwrap_pending();
2172
2173        // push the 1st right chunk
2174        tx_r.push_chunk(chunk_r1);
2175        let chunk = hash_join.next_unwrap_ready_chunk()?;
2176        assert_eq!(
2177            chunk,
2178            StreamChunk::from_pretty(
2179                " I I I
2180                + 2 5 1
2181                + 4 9 2"
2182            )
2183        );
2184
2185        // push the 2nd right chunk
2186        tx_r.push_chunk(chunk_r2);
2187        let chunk = hash_join.next_unwrap_ready_chunk()?;
2188        assert_eq!(
2189            chunk,
2190            StreamChunk::from_pretty(
2191                " I I I
2192                + 1 4 4
2193                + 3 6 5"
2194            )
2195        );
2196
2197        Ok(())
2198    }
2199
2200    #[tokio::test]
2201    async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2202        let chunk_r1 = StreamChunk::from_pretty(
2203            "  I I
2204             + 1 4
2205             + 2 5
2206             + 3 6",
2207        );
2208        let chunk_r2 = StreamChunk::from_pretty(
2209            "  I I
2210             + 3 8
2211             - 3 8",
2212        );
2213        let chunk_l1 = StreamChunk::from_pretty(
2214            "  I I
2215             + 2 7
2216             + 4 8
2217             + 6 9",
2218        );
2219        let chunk_l2 = StreamChunk::from_pretty(
2220            "  I  I
2221             + 3 10
2222             + 6 11",
2223        );
2224        let chunk_r3 = StreamChunk::from_pretty(
2225            "  I I
2226             + 6 10",
2227        );
2228        let chunk_l3 = StreamChunk::from_pretty(
2229            "  I  I
2230             - 6 11",
2231        );
2232        let chunk_l4 = StreamChunk::from_pretty(
2233            "  I  I
2234             - 6 9",
2235        );
2236        let (mut tx_l, mut tx_r, mut hash_join) =
2237            create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2238
2239        // push the init barrier for left and right
2240        tx_l.push_barrier(test_epoch(1), false);
2241        tx_r.push_barrier(test_epoch(1), false);
2242        hash_join.next_unwrap_ready_barrier()?;
2243
2244        // push the 1st right chunk
2245        tx_r.push_chunk(chunk_r1);
2246        hash_join.next_unwrap_pending();
2247
2248        // push the init barrier for left and right
2249        tx_l.push_barrier(test_epoch(2), false);
2250        tx_r.push_barrier(test_epoch(2), false);
2251        hash_join.next_unwrap_ready_barrier()?;
2252
2253        // push the 2nd right chunk
2254        tx_r.push_chunk(chunk_r2);
2255        hash_join.next_unwrap_pending();
2256
2257        // push the 1st left chunk
2258        tx_l.push_chunk(chunk_l1);
2259        let chunk = hash_join.next_unwrap_ready_chunk()?;
2260        assert_eq!(
2261            chunk,
2262            StreamChunk::from_pretty(
2263                " I I
2264                + 2 5"
2265            )
2266        );
2267
2268        // push the 2nd left chunk
2269        tx_l.push_chunk(chunk_l2);
2270        let chunk = hash_join.next_unwrap_ready_chunk()?;
2271        assert_eq!(
2272            chunk,
2273            StreamChunk::from_pretty(
2274                " I I
2275                + 3 6"
2276            )
2277        );
2278
2279        // push the 3rd right chunk (tests forward_exactly_once)
2280        tx_r.push_chunk(chunk_r3);
2281        let chunk = hash_join.next_unwrap_ready_chunk()?;
2282        assert_eq!(
2283            chunk,
2284            StreamChunk::from_pretty(
2285                " I I
2286                + 6 10"
2287            )
2288        );
2289
2290        // push the 3rd left chunk
2291        // (tests that no change if there are still matches)
2292        tx_l.push_chunk(chunk_l3);
2293        hash_join.next_unwrap_pending();
2294
2295        // push the 3rd right chunk
2296        // (tests that deletion occurs when there are no more matches)
2297        tx_l.push_chunk(chunk_l4);
2298        let chunk = hash_join.next_unwrap_ready_chunk()?;
2299        assert_eq!(
2300            chunk,
2301            StreamChunk::from_pretty(
2302                " I I
2303                - 6 10"
2304            )
2305        );
2306
2307        Ok(())
2308    }
2309
2310    #[tokio::test]
2311    async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2312        let chunk_l1 = StreamChunk::from_pretty(
2313            "  I I
2314             + 1 4
2315             + 2 5
2316             + 3 6",
2317        );
2318        let chunk_l2 = StreamChunk::from_pretty(
2319            "  I I
2320             + 3 8
2321             - 3 8",
2322        );
2323        let chunk_r1 = StreamChunk::from_pretty(
2324            "  I I
2325             + 2 7
2326             + 4 8
2327             + 6 9",
2328        );
2329        let chunk_r2 = StreamChunk::from_pretty(
2330            "  I  I
2331             + 3 10
2332             + 6 11
2333             + 1 2
2334             + 1 3",
2335        );
2336        let chunk_l3 = StreamChunk::from_pretty(
2337            "  I I
2338             + 9 10",
2339        );
2340        let chunk_r3 = StreamChunk::from_pretty(
2341            "  I I
2342             - 1 2",
2343        );
2344        let chunk_r4 = StreamChunk::from_pretty(
2345            "  I I
2346             - 1 3",
2347        );
2348        let (mut tx_l, mut tx_r, mut hash_join) =
2349            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2350
2351        // push the init barrier for left and right
2352        tx_l.push_barrier(test_epoch(1), false);
2353        tx_r.push_barrier(test_epoch(1), false);
2354        hash_join.next_unwrap_ready_barrier()?;
2355
2356        // push the 1st left chunk
2357        tx_l.push_chunk(chunk_l1);
2358        let chunk = hash_join.next_unwrap_ready_chunk()?;
2359        assert_eq!(
2360            chunk,
2361            StreamChunk::from_pretty(
2362                " I I
2363                + 1 4
2364                + 2 5
2365                + 3 6",
2366            )
2367        );
2368
2369        // push the init barrier for left and right
2370        tx_l.push_barrier(test_epoch(2), false);
2371        tx_r.push_barrier(test_epoch(2), false);
2372        hash_join.next_unwrap_ready_barrier()?;
2373
2374        // push the 2nd left chunk
2375        tx_l.push_chunk(chunk_l2);
2376        let chunk = hash_join.next_unwrap_ready_chunk()?;
2377        assert_eq!(
2378            chunk,
2379            StreamChunk::from_pretty(
2380                "  I I
2381                 + 3 8
2382                 - 3 8",
2383            )
2384        );
2385
2386        // push the 1st right chunk
2387        tx_r.push_chunk(chunk_r1);
2388        let chunk = hash_join.next_unwrap_ready_chunk()?;
2389        assert_eq!(
2390            chunk,
2391            StreamChunk::from_pretty(
2392                " I I
2393                - 2 5"
2394            )
2395        );
2396
2397        // push the 2nd right chunk
2398        tx_r.push_chunk(chunk_r2);
2399        let chunk = hash_join.next_unwrap_ready_chunk()?;
2400        assert_eq!(
2401            chunk,
2402            StreamChunk::from_pretty(
2403                " I I
2404                - 3 6
2405                - 1 4"
2406            )
2407        );
2408
2409        // push the 3rd left chunk (tests forward_exactly_once)
2410        tx_l.push_chunk(chunk_l3);
2411        let chunk = hash_join.next_unwrap_ready_chunk()?;
2412        assert_eq!(
2413            chunk,
2414            StreamChunk::from_pretty(
2415                " I I
2416                + 9 10"
2417            )
2418        );
2419
2420        // push the 3rd right chunk
2421        // (tests that no change if there are still matches)
2422        tx_r.push_chunk(chunk_r3);
2423        hash_join.next_unwrap_pending();
2424
2425        // push the 4th right chunk
2426        // (tests that insertion occurs when there are no more matches)
2427        tx_r.push_chunk(chunk_r4);
2428        let chunk = hash_join.next_unwrap_ready_chunk()?;
2429        assert_eq!(
2430            chunk,
2431            StreamChunk::from_pretty(
2432                " I I
2433                + 1 4"
2434            )
2435        );
2436
2437        Ok(())
2438    }
2439
2440    #[tokio::test]
2441    async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2442        let chunk_r1 = StreamChunk::from_pretty(
2443            "  I I
2444             + 1 4
2445             + 2 5
2446             + 3 6",
2447        );
2448        let chunk_r2 = StreamChunk::from_pretty(
2449            "  I I
2450             + 3 8
2451             - 3 8",
2452        );
2453        let chunk_l1 = StreamChunk::from_pretty(
2454            "  I I
2455             + 2 7
2456             + 4 8
2457             + 6 9",
2458        );
2459        let chunk_l2 = StreamChunk::from_pretty(
2460            "  I  I
2461             + 3 10
2462             + 6 11
2463             + 1 2
2464             + 1 3",
2465        );
2466        let chunk_r3 = StreamChunk::from_pretty(
2467            "  I I
2468             + 9 10",
2469        );
2470        let chunk_l3 = StreamChunk::from_pretty(
2471            "  I I
2472             - 1 2",
2473        );
2474        let chunk_l4 = StreamChunk::from_pretty(
2475            "  I I
2476             - 1 3",
2477        );
2478        let (mut tx_r, mut tx_l, mut hash_join) =
2479            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2480
2481        // push the init barrier for left and right
2482        tx_r.push_barrier(test_epoch(1), false);
2483        tx_l.push_barrier(test_epoch(1), false);
2484        hash_join.next_unwrap_ready_barrier()?;
2485
2486        // push the 1st right chunk
2487        tx_r.push_chunk(chunk_r1);
2488        let chunk = hash_join.next_unwrap_ready_chunk()?;
2489        assert_eq!(
2490            chunk,
2491            StreamChunk::from_pretty(
2492                " I I
2493                + 1 4
2494                + 2 5
2495                + 3 6",
2496            )
2497        );
2498
2499        // push the init barrier for left and right
2500        tx_r.push_barrier(test_epoch(2), false);
2501        tx_l.push_barrier(test_epoch(2), false);
2502        hash_join.next_unwrap_ready_barrier()?;
2503
2504        // push the 2nd right chunk
2505        tx_r.push_chunk(chunk_r2);
2506        let chunk = hash_join.next_unwrap_ready_chunk()?;
2507        assert_eq!(
2508            chunk,
2509            StreamChunk::from_pretty(
2510                "  I I
2511                 + 3 8
2512                 - 3 8",
2513            )
2514        );
2515
2516        // push the 1st left chunk
2517        tx_l.push_chunk(chunk_l1);
2518        let chunk = hash_join.next_unwrap_ready_chunk()?;
2519        assert_eq!(
2520            chunk,
2521            StreamChunk::from_pretty(
2522                " I I
2523                - 2 5"
2524            )
2525        );
2526
2527        // push the 2nd left chunk
2528        tx_l.push_chunk(chunk_l2);
2529        let chunk = hash_join.next_unwrap_ready_chunk()?;
2530        assert_eq!(
2531            chunk,
2532            StreamChunk::from_pretty(
2533                " I I
2534                - 3 6
2535                - 1 4"
2536            )
2537        );
2538
2539        // push the 3rd right chunk (tests forward_exactly_once)
2540        tx_r.push_chunk(chunk_r3);
2541        let chunk = hash_join.next_unwrap_ready_chunk()?;
2542        assert_eq!(
2543            chunk,
2544            StreamChunk::from_pretty(
2545                " I I
2546                + 9 10"
2547            )
2548        );
2549
2550        // push the 3rd left chunk
2551        // (tests that no change if there are still matches)
2552        tx_l.push_chunk(chunk_l3);
2553        hash_join.next_unwrap_pending();
2554
2555        // push the 4th left chunk
2556        // (tests that insertion occurs when there are no more matches)
2557        tx_l.push_chunk(chunk_l4);
2558        let chunk = hash_join.next_unwrap_ready_chunk()?;
2559        assert_eq!(
2560            chunk,
2561            StreamChunk::from_pretty(
2562                " I I
2563                + 1 4"
2564            )
2565        );
2566
2567        Ok(())
2568    }
2569
2570    #[tokio::test]
2571    async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2572        let chunk_l1 = StreamChunk::from_pretty(
2573            "  I I
2574             + 1 4
2575             + 2 5
2576             + 3 6",
2577        );
2578        let chunk_l2 = StreamChunk::from_pretty(
2579            "  I I
2580             + 6 8
2581             + 3 8",
2582        );
2583        let chunk_r1 = StreamChunk::from_pretty(
2584            "  I I
2585             + 2 7
2586             + 4 8
2587             + 6 9",
2588        );
2589        let chunk_r2 = StreamChunk::from_pretty(
2590            "  I  I
2591             + 3 10
2592             + 6 11",
2593        );
2594        let (mut tx_l, mut tx_r, mut hash_join) =
2595            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2596
2597        // push the init barrier for left and right
2598        tx_l.push_barrier(test_epoch(1), false);
2599        tx_r.push_barrier(test_epoch(1), false);
2600        hash_join.next_unwrap_ready_barrier()?;
2601
2602        // push the 1st left chunk
2603        tx_l.push_chunk(chunk_l1);
2604        hash_join.next_unwrap_pending();
2605
2606        // push a barrier to left side
2607        tx_l.push_barrier(test_epoch(2), false);
2608
2609        // push the 2nd left chunk
2610        tx_l.push_chunk(chunk_l2);
2611
2612        // join the first right chunk
2613        tx_r.push_chunk(chunk_r1);
2614
2615        // Consume stream chunk
2616        let chunk = hash_join.next_unwrap_ready_chunk()?;
2617        assert_eq!(
2618            chunk,
2619            StreamChunk::from_pretty(
2620                " I I I I
2621                + 2 5 2 7"
2622            )
2623        );
2624
2625        // push a barrier to right side
2626        tx_r.push_barrier(test_epoch(2), false);
2627
2628        // get the aligned barrier here
2629        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2630        assert!(matches!(
2631            hash_join.next_unwrap_ready_barrier()?,
2632            Barrier {
2633                epoch,
2634                mutation: None,
2635                ..
2636            } if epoch == expected_epoch
2637        ));
2638
2639        // join the 2nd left chunk
2640        let chunk = hash_join.next_unwrap_ready_chunk()?;
2641        assert_eq!(
2642            chunk,
2643            StreamChunk::from_pretty(
2644                " I I I I
2645                + 6 8 6 9"
2646            )
2647        );
2648
2649        // push the 2nd right chunk
2650        tx_r.push_chunk(chunk_r2);
2651        let chunk = hash_join.next_unwrap_ready_chunk()?;
2652        assert_eq!(
2653            chunk,
2654            StreamChunk::from_pretty(
2655                " I I I I
2656                + 3 6 3 10
2657                + 3 8 3 10
2658                + 6 8 6 11"
2659            )
2660        );
2661
2662        Ok(())
2663    }
2664
2665    #[tokio::test]
2666    async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2667        let chunk_l1 = StreamChunk::from_pretty(
2668            "  I I
2669             + 1 4
2670             + 2 .
2671             + 3 .",
2672        );
2673        let chunk_l2 = StreamChunk::from_pretty(
2674            "  I I
2675             + 6 .
2676             + 3 8",
2677        );
2678        let chunk_r1 = StreamChunk::from_pretty(
2679            "  I I
2680             + 2 7
2681             + 4 8
2682             + 6 9",
2683        );
2684        let chunk_r2 = StreamChunk::from_pretty(
2685            "  I  I
2686             + 3 10
2687             + 6 11",
2688        );
2689        let (mut tx_l, mut tx_r, mut hash_join) =
2690            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2691
2692        // push the init barrier for left and right
2693        tx_l.push_barrier(test_epoch(1), false);
2694        tx_r.push_barrier(test_epoch(1), false);
2695        hash_join.next_unwrap_ready_barrier()?;
2696
2697        // push the 1st left chunk
2698        tx_l.push_chunk(chunk_l1);
2699        hash_join.next_unwrap_pending();
2700
2701        // push a barrier to left side
2702        tx_l.push_barrier(test_epoch(2), false);
2703
2704        // push the 2nd left chunk
2705        tx_l.push_chunk(chunk_l2);
2706
2707        // join the first right chunk
2708        tx_r.push_chunk(chunk_r1);
2709
2710        // Consume stream chunk
2711        let chunk = hash_join.next_unwrap_ready_chunk()?;
2712        assert_eq!(
2713            chunk,
2714            StreamChunk::from_pretty(
2715                " I I I I
2716                + 2 . 2 7"
2717            )
2718        );
2719
2720        // push a barrier to right side
2721        tx_r.push_barrier(test_epoch(2), false);
2722
2723        // get the aligned barrier here
2724        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2725        assert!(matches!(
2726            hash_join.next_unwrap_ready_barrier()?,
2727            Barrier {
2728                epoch,
2729                mutation: None,
2730                ..
2731            } if epoch == expected_epoch
2732        ));
2733
2734        // join the 2nd left chunk
2735        let chunk = hash_join.next_unwrap_ready_chunk()?;
2736        assert_eq!(
2737            chunk,
2738            StreamChunk::from_pretty(
2739                " I I I I
2740                + 6 . 6 9"
2741            )
2742        );
2743
2744        // push the 2nd right chunk
2745        tx_r.push_chunk(chunk_r2);
2746        let chunk = hash_join.next_unwrap_ready_chunk()?;
2747        assert_eq!(
2748            chunk,
2749            StreamChunk::from_pretty(
2750                " I I I I
2751                + 3 8 3 10
2752                + 3 . 3 10
2753                + 6 . 6 11"
2754            )
2755        );
2756
2757        Ok(())
2758    }
2759
2760    #[tokio::test]
2761    async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2762        let chunk_l1 = StreamChunk::from_pretty(
2763            "  I I
2764             + 1 4
2765             + 2 5
2766             + 3 6",
2767        );
2768        let chunk_l2 = StreamChunk::from_pretty(
2769            "  I I
2770             + 3 8
2771             - 3 8",
2772        );
2773        let chunk_r1 = StreamChunk::from_pretty(
2774            "  I I
2775             + 2 7
2776             + 4 8
2777             + 6 9",
2778        );
2779        let chunk_r2 = StreamChunk::from_pretty(
2780            "  I  I
2781             + 3 10
2782             + 6 11",
2783        );
2784        let (mut tx_l, mut tx_r, mut hash_join) =
2785            create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2786
2787        // push the init barrier for left and right
2788        tx_l.push_barrier(test_epoch(1), false);
2789        tx_r.push_barrier(test_epoch(1), false);
2790        hash_join.next_unwrap_ready_barrier()?;
2791
2792        // push the 1st left chunk
2793        tx_l.push_chunk(chunk_l1);
2794        let chunk = hash_join.next_unwrap_ready_chunk()?;
2795        assert_eq!(
2796            chunk,
2797            StreamChunk::from_pretty(
2798                " I I I I
2799                + 1 4 . .
2800                + 2 5 . .
2801                + 3 6 . ."
2802            )
2803        );
2804
2805        // push the 2nd left chunk
2806        tx_l.push_chunk(chunk_l2);
2807        let chunk = hash_join.next_unwrap_ready_chunk()?;
2808        assert_eq!(
2809            chunk,
2810            StreamChunk::from_pretty(
2811                " I I I I
2812                + 3 8 . .
2813                - 3 8 . ."
2814            )
2815        );
2816
2817        // push the 1st right chunk
2818        tx_r.push_chunk(chunk_r1);
2819        let chunk = hash_join.next_unwrap_ready_chunk()?;
2820        assert_eq!(
2821            chunk,
2822            StreamChunk::from_pretty(
2823                "  I I I I
2824                U- 2 5 . .
2825                U+ 2 5 2 7"
2826            )
2827        );
2828
2829        // push the 2nd right chunk
2830        tx_r.push_chunk(chunk_r2);
2831        let chunk = hash_join.next_unwrap_ready_chunk()?;
2832        assert_eq!(
2833            chunk,
2834            StreamChunk::from_pretty(
2835                "  I I I I
2836                U- 3 6 . .
2837                U+ 3 6 3 10"
2838            )
2839        );
2840
2841        Ok(())
2842    }
2843
2844    #[tokio::test]
2845    async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2846        let chunk_l1 = StreamChunk::from_pretty(
2847            "  I I
2848             + 1 4
2849             + 2 5
2850             + . 6",
2851        );
2852        let chunk_l2 = StreamChunk::from_pretty(
2853            "  I I
2854             + . 8
2855             - . 8",
2856        );
2857        let chunk_r1 = StreamChunk::from_pretty(
2858            "  I I
2859             + 2 7
2860             + 4 8
2861             + 6 9",
2862        );
2863        let chunk_r2 = StreamChunk::from_pretty(
2864            "  I  I
2865             + . 10
2866             + 6 11",
2867        );
2868        let (mut tx_l, mut tx_r, mut hash_join) =
2869            create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2870
2871        // push the init barrier for left and right
2872        tx_l.push_barrier(test_epoch(1), false);
2873        tx_r.push_barrier(test_epoch(1), false);
2874        hash_join.next_unwrap_ready_barrier()?;
2875
2876        // push the 1st left chunk
2877        tx_l.push_chunk(chunk_l1);
2878        let chunk = hash_join.next_unwrap_ready_chunk()?;
2879        assert_eq!(
2880            chunk,
2881            StreamChunk::from_pretty(
2882                " I I I I
2883                + 1 4 . .
2884                + 2 5 . .
2885                + . 6 . ."
2886            )
2887        );
2888
2889        // push the 2nd left chunk
2890        tx_l.push_chunk(chunk_l2);
2891        let chunk = hash_join.next_unwrap_ready_chunk()?;
2892        assert_eq!(
2893            chunk,
2894            StreamChunk::from_pretty(
2895                " I I I I
2896                + . 8 . .
2897                - . 8 . ."
2898            )
2899        );
2900
2901        // push the 1st right chunk
2902        tx_r.push_chunk(chunk_r1);
2903        let chunk = hash_join.next_unwrap_ready_chunk()?;
2904        assert_eq!(
2905            chunk,
2906            StreamChunk::from_pretty(
2907                "  I I I I
2908                U- 2 5 . .
2909                U+ 2 5 2 7"
2910            )
2911        );
2912
2913        // push the 2nd right chunk
2914        tx_r.push_chunk(chunk_r2);
2915        let chunk = hash_join.next_unwrap_ready_chunk()?;
2916        assert_eq!(
2917            chunk,
2918            StreamChunk::from_pretty(
2919                "  I I I I
2920                U- . 6 . .
2921                U+ . 6 . 10"
2922            )
2923        );
2924
2925        Ok(())
2926    }
2927
2928    #[tokio::test]
2929    async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2930        let chunk_l1 = StreamChunk::from_pretty(
2931            "  I I
2932             + 1 4
2933             + 2 5
2934             + 3 6",
2935        );
2936        let chunk_l2 = StreamChunk::from_pretty(
2937            "  I I
2938             + 3 8
2939             - 3 8",
2940        );
2941        let chunk_r1 = StreamChunk::from_pretty(
2942            "  I I
2943             + 2 7
2944             + 4 8
2945             + 6 9",
2946        );
2947        let chunk_r2 = StreamChunk::from_pretty(
2948            "  I  I
2949             + 5 10
2950             - 5 10",
2951        );
2952        let (mut tx_l, mut tx_r, mut hash_join) =
2953            create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
2954
2955        // push the init barrier for left and right
2956        tx_l.push_barrier(test_epoch(1), false);
2957        tx_r.push_barrier(test_epoch(1), false);
2958        hash_join.next_unwrap_ready_barrier()?;
2959
2960        // push the 1st left chunk
2961        tx_l.push_chunk(chunk_l1);
2962        hash_join.next_unwrap_pending();
2963
2964        // push the 2nd left chunk
2965        tx_l.push_chunk(chunk_l2);
2966        hash_join.next_unwrap_pending();
2967
2968        // push the 1st right chunk
2969        tx_r.push_chunk(chunk_r1);
2970        let chunk = hash_join.next_unwrap_ready_chunk()?;
2971        assert_eq!(
2972            chunk,
2973            StreamChunk::from_pretty(
2974                " I I I I
2975                + 2 5 2 7
2976                + . . 4 8
2977                + . . 6 9"
2978            )
2979        );
2980
2981        // push the 2nd right chunk
2982        tx_r.push_chunk(chunk_r2);
2983        let chunk = hash_join.next_unwrap_ready_chunk()?;
2984        assert_eq!(
2985            chunk,
2986            StreamChunk::from_pretty(
2987                " I I I I
2988                + . . 5 10
2989                - . . 5 10"
2990            )
2991        );
2992
2993        Ok(())
2994    }
2995
2996    #[tokio::test]
2997    async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
2998        let chunk_l1 = StreamChunk::from_pretty(
2999            "  I I I
3000             + 1 4 1
3001             + 2 5 2
3002             + 3 6 3",
3003        );
3004        let chunk_l2 = StreamChunk::from_pretty(
3005            "  I I I
3006             + 4 9 4
3007             + 5 10 5",
3008        );
3009        let chunk_r1 = StreamChunk::from_pretty(
3010            "  I I I
3011             + 2 5 1
3012             + 4 9 2
3013             + 6 9 3",
3014        );
3015        let chunk_r2 = StreamChunk::from_pretty(
3016            "  I I I
3017             + 1 4 4
3018             + 3 6 5",
3019        );
3020
3021        let (mut tx_l, mut tx_r, mut hash_join) =
3022            create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3023
3024        // push the init barrier for left and right
3025        tx_l.push_barrier(test_epoch(1), false);
3026        tx_r.push_barrier(test_epoch(1), false);
3027        hash_join.next_unwrap_ready_barrier()?;
3028
3029        // push the 1st left chunk
3030        tx_l.push_chunk(chunk_l1);
3031        let chunk = hash_join.next_unwrap_ready_chunk()?;
3032        assert_eq!(
3033            chunk,
3034            StreamChunk::from_pretty(
3035                " I I I I I I
3036                + 1 4 1 . . .
3037                + 2 5 2 . . .
3038                + 3 6 3 . . ."
3039            )
3040        );
3041
3042        // push the 2nd left chunk
3043        tx_l.push_chunk(chunk_l2);
3044        let chunk = hash_join.next_unwrap_ready_chunk()?;
3045        assert_eq!(
3046            chunk,
3047            StreamChunk::from_pretty(
3048                " I I I I I I
3049                + 4 9 4 . . .
3050                + 5 10 5 . . ."
3051            )
3052        );
3053
3054        // push the 1st right chunk
3055        tx_r.push_chunk(chunk_r1);
3056        let chunk = hash_join.next_unwrap_ready_chunk()?;
3057        assert_eq!(
3058            chunk,
3059            StreamChunk::from_pretty(
3060                "  I I I I I I
3061                U- 2 5 2 . . .
3062                U+ 2 5 2 2 5 1
3063                U- 4 9 4 . . .
3064                U+ 4 9 4 4 9 2"
3065            )
3066        );
3067
3068        // push the 2nd right chunk
3069        tx_r.push_chunk(chunk_r2);
3070        let chunk = hash_join.next_unwrap_ready_chunk()?;
3071        assert_eq!(
3072            chunk,
3073            StreamChunk::from_pretty(
3074                "  I I I I I I
3075                U- 1 4 1 . . .
3076                U+ 1 4 1 1 4 4
3077                U- 3 6 3 . . .
3078                U+ 3 6 3 3 6 5"
3079            )
3080        );
3081
3082        Ok(())
3083    }
3084
3085    #[tokio::test]
3086    async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3087        let chunk_l1 = StreamChunk::from_pretty(
3088            "  I I I
3089             + 1 4 1
3090             + 2 5 2
3091             + 3 6 3",
3092        );
3093        let chunk_l2 = StreamChunk::from_pretty(
3094            "  I I I
3095             + 4 9 4
3096             + 5 10 5",
3097        );
3098        let chunk_r1 = StreamChunk::from_pretty(
3099            "  I I I
3100             + 2 5 1
3101             + 4 9 2
3102             + 6 9 3",
3103        );
3104        let chunk_r2 = StreamChunk::from_pretty(
3105            "  I I I
3106             + 1 4 4
3107             + 3 6 5
3108             + 7 7 6",
3109        );
3110
3111        let (mut tx_l, mut tx_r, mut hash_join) =
3112            create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3113
3114        // push the init barrier for left and right
3115        tx_l.push_barrier(test_epoch(1), false);
3116        tx_r.push_barrier(test_epoch(1), false);
3117        hash_join.next_unwrap_ready_barrier()?;
3118
3119        // push the 1st left chunk
3120        tx_l.push_chunk(chunk_l1);
3121        hash_join.next_unwrap_pending();
3122
3123        // push the 2nd left chunk
3124        tx_l.push_chunk(chunk_l2);
3125        hash_join.next_unwrap_pending();
3126
3127        // push the 1st right chunk
3128        tx_r.push_chunk(chunk_r1);
3129        let chunk = hash_join.next_unwrap_ready_chunk()?;
3130        assert_eq!(
3131            chunk,
3132            StreamChunk::from_pretty(
3133                "  I I I I I I
3134                + 2 5 2 2 5 1
3135                + 4 9 4 4 9 2
3136                + . . . 6 9 3"
3137            )
3138        );
3139
3140        // push the 2nd right chunk
3141        tx_r.push_chunk(chunk_r2);
3142        let chunk = hash_join.next_unwrap_ready_chunk()?;
3143        assert_eq!(
3144            chunk,
3145            StreamChunk::from_pretty(
3146                "  I I I I I I
3147                + 1 4 1 1 4 4
3148                + 3 6 3 3 6 5
3149                + . . . 7 7 6"
3150            )
3151        );
3152
3153        Ok(())
3154    }
3155
3156    #[tokio::test]
3157    async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3158        let chunk_l1 = StreamChunk::from_pretty(
3159            "  I I
3160             + 1 4
3161             + 2 5
3162             + 3 6",
3163        );
3164        let chunk_l2 = StreamChunk::from_pretty(
3165            "  I I
3166             + 3 8
3167             - 3 8",
3168        );
3169        let chunk_r1 = StreamChunk::from_pretty(
3170            "  I I
3171             + 2 7
3172             + 4 8
3173             + 6 9",
3174        );
3175        let chunk_r2 = StreamChunk::from_pretty(
3176            "  I  I
3177             + 5 10
3178             - 5 10",
3179        );
3180        let (mut tx_l, mut tx_r, mut hash_join) =
3181            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3182
3183        // push the init barrier for left and right
3184        tx_l.push_barrier(test_epoch(1), false);
3185        tx_r.push_barrier(test_epoch(1), false);
3186        hash_join.next_unwrap_ready_barrier()?;
3187
3188        // push the 1st left chunk
3189        tx_l.push_chunk(chunk_l1);
3190        let chunk = hash_join.next_unwrap_ready_chunk()?;
3191        assert_eq!(
3192            chunk,
3193            StreamChunk::from_pretty(
3194                " I I I I
3195                + 1 4 . .
3196                + 2 5 . .
3197                + 3 6 . ."
3198            )
3199        );
3200
3201        // push the 2nd left chunk
3202        tx_l.push_chunk(chunk_l2);
3203        let chunk = hash_join.next_unwrap_ready_chunk()?;
3204        assert_eq!(
3205            chunk,
3206            StreamChunk::from_pretty(
3207                " I I I I
3208                + 3 8 . .
3209                - 3 8 . ."
3210            )
3211        );
3212
3213        // push the 1st right chunk
3214        tx_r.push_chunk(chunk_r1);
3215        let chunk = hash_join.next_unwrap_ready_chunk()?;
3216        assert_eq!(
3217            chunk,
3218            StreamChunk::from_pretty(
3219                "  I I I I
3220                U-  2 5 . .
3221                U+  2 5 2 7
3222                +  . . 4 8
3223                +  . . 6 9"
3224            )
3225        );
3226
3227        // push the 2nd right chunk
3228        tx_r.push_chunk(chunk_r2);
3229        let chunk = hash_join.next_unwrap_ready_chunk()?;
3230        assert_eq!(
3231            chunk,
3232            StreamChunk::from_pretty(
3233                " I I I I
3234                + . . 5 10
3235                - . . 5 10"
3236            )
3237        );
3238
3239        Ok(())
3240    }
3241
3242    #[tokio::test]
3243    async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3244        let (mut tx_l, mut tx_r, mut hash_join) =
3245            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3246
3247        // push the init barrier for left and right
3248        tx_l.push_barrier(test_epoch(1), false);
3249        tx_r.push_barrier(test_epoch(1), false);
3250        hash_join.next_unwrap_ready_barrier()?;
3251
3252        tx_l.push_chunk(StreamChunk::from_pretty(
3253            "  I I
3254             + 1 1
3255            ",
3256        ));
3257        let chunk = hash_join.next_unwrap_ready_chunk()?;
3258        assert_eq!(
3259            chunk,
3260            StreamChunk::from_pretty(
3261                " I I I I
3262                + 1 1 . ."
3263            )
3264        );
3265
3266        tx_r.push_chunk(StreamChunk::from_pretty(
3267            "  I I
3268             + 1 1
3269            ",
3270        ));
3271        let chunk = hash_join.next_unwrap_ready_chunk()?;
3272
3273        assert_eq!(
3274            chunk,
3275            StreamChunk::from_pretty(
3276                " I I I I
3277                U- 1 1 . .
3278                U+ 1 1 1 1"
3279            )
3280        );
3281
3282        tx_l.push_chunk(StreamChunk::from_pretty(
3283            "   I I
3284              - 1 1
3285              + 1 2
3286            ",
3287        ));
3288        let chunk = hash_join.next_unwrap_ready_chunk()?;
3289        let chunk = chunk.compact();
3290        assert_eq!(
3291            chunk,
3292            StreamChunk::from_pretty(
3293                " I I I I
3294                - 1 1 1 1
3295                + 1 2 1 1
3296                "
3297            )
3298        );
3299
3300        Ok(())
3301    }
3302
3303    #[tokio::test]
3304    async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3305    {
3306        let chunk_l1 = StreamChunk::from_pretty(
3307            "  I I
3308             + 1 4
3309             + 2 5
3310             + 3 6
3311             + 3 7",
3312        );
3313        let chunk_l2 = StreamChunk::from_pretty(
3314            "  I I
3315             + 3 8
3316             - 3 8
3317             - 1 4", // delete row to cause an empty JoinHashEntry
3318        );
3319        let chunk_r1 = StreamChunk::from_pretty(
3320            "  I I
3321             + 2 6
3322             + 4 8
3323             + 3 4",
3324        );
3325        let chunk_r2 = StreamChunk::from_pretty(
3326            "  I  I
3327             + 5 10
3328             - 5 10
3329             + 1 2",
3330        );
3331        let (mut tx_l, mut tx_r, mut hash_join) =
3332            create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3333
3334        // push the init barrier for left and right
3335        tx_l.push_barrier(test_epoch(1), false);
3336        tx_r.push_barrier(test_epoch(1), false);
3337        hash_join.next_unwrap_ready_barrier()?;
3338
3339        // push the 1st left chunk
3340        tx_l.push_chunk(chunk_l1);
3341        let chunk = hash_join.next_unwrap_ready_chunk()?;
3342        assert_eq!(
3343            chunk,
3344            StreamChunk::from_pretty(
3345                " I I I I
3346                + 1 4 . .
3347                + 2 5 . .
3348                + 3 6 . .
3349                + 3 7 . ."
3350            )
3351        );
3352
3353        // push the 2nd left chunk
3354        tx_l.push_chunk(chunk_l2);
3355        let chunk = hash_join.next_unwrap_ready_chunk()?;
3356        assert_eq!(
3357            chunk,
3358            StreamChunk::from_pretty(
3359                " I I I I
3360                + 3 8 . .
3361                - 3 8 . .
3362                - 1 4 . ."
3363            )
3364        );
3365
3366        // push the 1st right chunk
3367        tx_r.push_chunk(chunk_r1);
3368        let chunk = hash_join.next_unwrap_ready_chunk()?;
3369        assert_eq!(
3370            chunk,
3371            StreamChunk::from_pretty(
3372                "  I I I I
3373                U-  2 5 . .
3374                U+  2 5 2 6
3375                +  . . 4 8
3376                +  . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once
3377                             * despite matching on eq join on 2
3378                             * entries */
3379            )
3380        );
3381
3382        // push the 2nd right chunk
3383        tx_r.push_chunk(chunk_r2);
3384        let chunk = hash_join.next_unwrap_ready_chunk()?;
3385        assert_eq!(
3386            chunk,
3387            StreamChunk::from_pretty(
3388                " I I I I
3389                + . . 5 10
3390                - . . 5 10
3391                + . . 1 2" /* regression test (#2420): 1 2 forwarded even if matches on an empty
3392                            * join entry */
3393            )
3394        );
3395
3396        Ok(())
3397    }
3398
3399    #[tokio::test]
3400    async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3401        let chunk_l1 = StreamChunk::from_pretty(
3402            "  I I
3403             + 1 4
3404             + 2 10
3405             + 3 6",
3406        );
3407        let chunk_l2 = StreamChunk::from_pretty(
3408            "  I I
3409             + 3 8
3410             - 3 8",
3411        );
3412        let chunk_r1 = StreamChunk::from_pretty(
3413            "  I I
3414             + 2 7
3415             + 4 8
3416             + 6 9",
3417        );
3418        let chunk_r2 = StreamChunk::from_pretty(
3419            "  I  I
3420             + 3 10
3421             + 6 11",
3422        );
3423        let (mut tx_l, mut tx_r, mut hash_join) =
3424            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3425
3426        // push the init barrier for left and right
3427        tx_l.push_barrier(test_epoch(1), false);
3428        tx_r.push_barrier(test_epoch(1), false);
3429        hash_join.next_unwrap_ready_barrier()?;
3430
3431        // push the 1st left chunk
3432        tx_l.push_chunk(chunk_l1);
3433        hash_join.next_unwrap_pending();
3434
3435        // push the 2nd left chunk
3436        tx_l.push_chunk(chunk_l2);
3437        hash_join.next_unwrap_pending();
3438
3439        // push the 1st right chunk
3440        tx_r.push_chunk(chunk_r1);
3441        hash_join.next_unwrap_pending();
3442
3443        // push the 2nd right chunk
3444        tx_r.push_chunk(chunk_r2);
3445        let chunk = hash_join.next_unwrap_ready_chunk()?;
3446        assert_eq!(
3447            chunk,
3448            StreamChunk::from_pretty(
3449                " I I I I
3450                + 3 6 3 10"
3451            )
3452        );
3453
3454        Ok(())
3455    }
3456
3457    #[tokio::test]
3458    async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3459        let (mut tx_l, mut tx_r, mut hash_join) =
3460            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3461
3462        // push the init barrier for left and right
3463        tx_l.push_barrier(test_epoch(1), false);
3464        tx_r.push_barrier(test_epoch(1), false);
3465        hash_join.next_unwrap_ready_barrier()?;
3466
3467        tx_l.push_int64_watermark(0, 100);
3468
3469        tx_l.push_int64_watermark(0, 200);
3470
3471        tx_l.push_barrier(test_epoch(2), false);
3472        tx_r.push_barrier(test_epoch(2), false);
3473        hash_join.next_unwrap_ready_barrier()?;
3474
3475        tx_r.push_int64_watermark(0, 50);
3476
3477        let w1 = hash_join.next().await.unwrap().unwrap();
3478        let w1 = w1.as_watermark().unwrap();
3479
3480        let w2 = hash_join.next().await.unwrap().unwrap();
3481        let w2 = w2.as_watermark().unwrap();
3482
3483        tx_r.push_int64_watermark(0, 100);
3484
3485        let w3 = hash_join.next().await.unwrap().unwrap();
3486        let w3 = w3.as_watermark().unwrap();
3487
3488        let w4 = hash_join.next().await.unwrap().unwrap();
3489        let w4 = w4.as_watermark().unwrap();
3490
3491        assert_eq!(
3492            w1,
3493            &Watermark {
3494                col_idx: 2,
3495                data_type: DataType::Int64,
3496                val: ScalarImpl::Int64(50)
3497            }
3498        );
3499
3500        assert_eq!(
3501            w2,
3502            &Watermark {
3503                col_idx: 0,
3504                data_type: DataType::Int64,
3505                val: ScalarImpl::Int64(50)
3506            }
3507        );
3508
3509        assert_eq!(
3510            w3,
3511            &Watermark {
3512                col_idx: 2,
3513                data_type: DataType::Int64,
3514                val: ScalarImpl::Int64(100)
3515            }
3516        );
3517
3518        assert_eq!(
3519            w4,
3520            &Watermark {
3521                col_idx: 0,
3522                data_type: DataType::Int64,
3523                val: ScalarImpl::Int64(100)
3524            }
3525        );
3526
3527        Ok(())
3528    }
3529}