risingwave_stream/executor/
hash_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::assert_matches::assert_matches;
15use std::collections::{BTreeMap, HashSet};
16use std::time::Duration;
17
18use anyhow::Context;
19use itertools::Itertools;
20use multimap::MultiMap;
21use risingwave_common::array::Op;
22use risingwave_common::hash::{HashKey, NullBitmap};
23use risingwave_common::row::RowExt;
24use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
25use risingwave_common::util::epoch::EpochPair;
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_expr::ExprError;
28use risingwave_expr::expr::NonStrictExpression;
29use tokio::time::Instant;
30
31use self::builder::JoinChunkBuilder;
32use super::barrier_align::*;
33use super::join::hash_join::*;
34use super::join::row::JoinRow;
35use super::join::*;
36use super::watermark::*;
37use crate::executor::join::builder::JoinStreamChunkBuilder;
38use crate::executor::join::hash_join::CacheResult;
39use crate::executor::prelude::*;
40
41/// Evict the cache every n rows.
42const EVICT_EVERY_N_ROWS: u32 = 16;
43
44fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
45    HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
46}
47
48pub struct JoinParams {
49    /// Indices of the join keys
50    pub join_key_indices: Vec<usize>,
51    /// Indices of the input pk after dedup
52    pub deduped_pk_indices: Vec<usize>,
53}
54
55impl JoinParams {
56    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
57        Self {
58            join_key_indices,
59            deduped_pk_indices,
60        }
61    }
62}
63
64struct JoinSide<K: HashKey, S: StateStore> {
65    /// Store all data from a one side stream
66    ht: JoinHashMap<K, S>,
67    /// Indices of the join key columns
68    join_key_indices: Vec<usize>,
69    /// The data type of all columns without degree.
70    all_data_types: Vec<DataType>,
71    /// The start position for the side in output new columns
72    start_pos: usize,
73    /// The mapping from input indices of a side to output columes.
74    i2o_mapping: Vec<(usize, usize)>,
75    i2o_mapping_indexed: MultiMap<usize, usize>,
76    /// The first field of the ith element indicates that when a watermark at the ith column of
77    /// this side comes, what band join conditions should be updated in order to possibly
78    /// generate a new watermark at that column or the corresponding column in the counterpart
79    /// join side.
80    ///
81    /// The second field indicates that whether the column is required less than the
82    /// the corresponding column in the counterpart join side in the band join condition.
83    input2inequality_index: Vec<Vec<(usize, bool)>>,
84    /// Some fields which are required non null to match due to inequalities.
85    non_null_fields: Vec<usize>,
86    /// (i, j) in this `Vec` means that state data in this join side can be cleaned if the value of
87    /// its ith column is less than the synthetic watermark of the jth band join condition.
88    state_clean_columns: Vec<(usize, usize)>,
89    /// Whether degree table is needed for this side.
90    need_degree_table: bool,
91}
92
93impl<K: HashKey, S: StateStore> std::fmt::Debug for JoinSide<K, S> {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        f.debug_struct("JoinSide")
96            .field("join_key_indices", &self.join_key_indices)
97            .field("col_types", &self.all_data_types)
98            .field("start_pos", &self.start_pos)
99            .field("i2o_mapping", &self.i2o_mapping)
100            .field("need_degree_table", &self.need_degree_table)
101            .finish()
102    }
103}
104
105impl<K: HashKey, S: StateStore> JoinSide<K, S> {
106    // WARNING: Please do not call this until we implement it.
107    fn is_dirty(&self) -> bool {
108        unimplemented!()
109    }
110
111    #[expect(dead_code)]
112    fn clear_cache(&mut self) {
113        assert!(
114            !self.is_dirty(),
115            "cannot clear cache while states of hash join are dirty"
116        );
117
118        // TODO: not working with rearranged chain
119        // self.ht.clear();
120    }
121
122    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
123        self.ht.init(epoch).await
124    }
125}
126
127/// `HashJoinExecutor` takes two input streams and runs equal hash join on them.
128/// The output columns are the concatenation of left and right columns.
129pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive> {
130    ctx: ActorContextRef,
131    info: ExecutorInfo,
132
133    /// Left input executor
134    input_l: Option<Executor>,
135    /// Right input executor
136    input_r: Option<Executor>,
137    /// The data types of the formed new columns
138    actual_output_data_types: Vec<DataType>,
139    /// The parameters of the left join executor
140    side_l: JoinSide<K, S>,
141    /// The parameters of the right join executor
142    side_r: JoinSide<K, S>,
143    /// Optional non-equi join conditions
144    cond: Option<NonStrictExpression>,
145    /// Column indices of watermark output and offset expression of each inequality, respectively.
146    inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
147    /// The output watermark of each inequality condition and its value is the minimum of the
148    /// calculation result of both side. It will be used to generate watermark into downstream
149    /// and do state cleaning if `clean_state` field of that inequality is `true`.
150    inequality_watermarks: Vec<Option<Watermark>>,
151
152    /// Whether the logic can be optimized for append-only stream
153    append_only_optimize: bool,
154
155    metrics: Arc<StreamingMetrics>,
156    /// The maximum size of the chunk produced by executor at a time
157    chunk_size: usize,
158    /// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
159    cnt_rows_received: u32,
160
161    /// watermark column index -> `BufferedWatermarks`
162    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
163
164    /// When to alert high join amplification
165    high_join_amplification_threshold: usize,
166
167    /// Max number of rows that will be cached in the entry state.
168    entry_state_max_rows: usize,
169}
170
171impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> std::fmt::Debug
172    for HashJoinExecutor<K, S, T>
173{
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        f.debug_struct("HashJoinExecutor")
176            .field("join_type", &T)
177            .field("input_left", &self.input_l.as_ref().unwrap().identity())
178            .field("input_right", &self.input_r.as_ref().unwrap().identity())
179            .field("side_l", &self.side_l)
180            .field("side_r", &self.side_r)
181            .field("pk_indices", &self.info.pk_indices)
182            .field("schema", &self.info.schema)
183            .field("actual_output_data_types", &self.actual_output_data_types)
184            .finish()
185    }
186}
187
188impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> Execute for HashJoinExecutor<K, S, T> {
189    fn execute(self: Box<Self>) -> BoxedMessageStream {
190        self.into_stream().boxed()
191    }
192}
193
194struct EqJoinArgs<'a, K: HashKey, S: StateStore> {
195    ctx: &'a ActorContextRef,
196    side_l: &'a mut JoinSide<K, S>,
197    side_r: &'a mut JoinSide<K, S>,
198    actual_output_data_types: &'a [DataType],
199    cond: &'a mut Option<NonStrictExpression>,
200    inequality_watermarks: &'a [Option<Watermark>],
201    chunk: StreamChunk,
202    append_only_optimize: bool,
203    chunk_size: usize,
204    cnt_rows_received: &'a mut u32,
205    high_join_amplification_threshold: usize,
206    entry_state_max_rows: usize,
207}
208
209impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K, S, T> {
210    #[allow(clippy::too_many_arguments)]
211    pub fn new(
212        ctx: ActorContextRef,
213        info: ExecutorInfo,
214        input_l: Executor,
215        input_r: Executor,
216        params_l: JoinParams,
217        params_r: JoinParams,
218        null_safe: Vec<bool>,
219        output_indices: Vec<usize>,
220        cond: Option<NonStrictExpression>,
221        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
222        state_table_l: StateTable<S>,
223        degree_state_table_l: StateTable<S>,
224        state_table_r: StateTable<S>,
225        degree_state_table_r: StateTable<S>,
226        watermark_epoch: AtomicU64Ref,
227        is_append_only: bool,
228        metrics: Arc<StreamingMetrics>,
229        chunk_size: usize,
230        high_join_amplification_threshold: usize,
231    ) -> Self {
232        Self::new_with_cache_size(
233            ctx,
234            info,
235            input_l,
236            input_r,
237            params_l,
238            params_r,
239            null_safe,
240            output_indices,
241            cond,
242            inequality_pairs,
243            state_table_l,
244            degree_state_table_l,
245            state_table_r,
246            degree_state_table_r,
247            watermark_epoch,
248            is_append_only,
249            metrics,
250            chunk_size,
251            high_join_amplification_threshold,
252            None,
253        )
254    }
255
256    #[allow(clippy::too_many_arguments)]
257    pub fn new_with_cache_size(
258        ctx: ActorContextRef,
259        info: ExecutorInfo,
260        input_l: Executor,
261        input_r: Executor,
262        params_l: JoinParams,
263        params_r: JoinParams,
264        null_safe: Vec<bool>,
265        output_indices: Vec<usize>,
266        cond: Option<NonStrictExpression>,
267        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
268        state_table_l: StateTable<S>,
269        degree_state_table_l: StateTable<S>,
270        state_table_r: StateTable<S>,
271        degree_state_table_r: StateTable<S>,
272        watermark_epoch: AtomicU64Ref,
273        is_append_only: bool,
274        metrics: Arc<StreamingMetrics>,
275        chunk_size: usize,
276        high_join_amplification_threshold: usize,
277        entry_state_max_rows: Option<usize>,
278    ) -> Self {
279        let entry_state_max_rows = match entry_state_max_rows {
280            None => {
281                ctx.streaming_config
282                    .developer
283                    .hash_join_entry_state_max_rows
284            }
285            Some(entry_state_max_rows) => entry_state_max_rows,
286        };
287        let side_l_column_n = input_l.schema().len();
288
289        let schema_fields = match T {
290            JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
291            JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
292            _ => [
293                input_l.schema().fields.clone(),
294                input_r.schema().fields.clone(),
295            ]
296            .concat(),
297        };
298
299        let original_output_data_types = schema_fields
300            .iter()
301            .map(|field| field.data_type())
302            .collect_vec();
303        let actual_output_data_types = output_indices
304            .iter()
305            .map(|&idx| original_output_data_types[idx].clone())
306            .collect_vec();
307
308        // Data types of of hash join state.
309        let state_all_data_types_l = input_l.schema().data_types();
310        let state_all_data_types_r = input_r.schema().data_types();
311
312        let state_pk_indices_l = input_l.pk_indices().to_vec();
313        let state_pk_indices_r = input_r.pk_indices().to_vec();
314
315        let state_join_key_indices_l = params_l.join_key_indices;
316        let state_join_key_indices_r = params_r.join_key_indices;
317
318        let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
319        let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
320
321        let degree_pk_indices_l = (state_join_key_indices_l.len()
322            ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
323            .collect_vec();
324        let degree_pk_indices_r = (state_join_key_indices_r.len()
325            ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
326            .collect_vec();
327
328        // If pk is contained in join key.
329        let pk_contained_in_jk_l =
330            is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
331        let pk_contained_in_jk_r =
332            is_subset(state_pk_indices_r.clone(), state_join_key_indices_r.clone());
333
334        // check whether join key contains pk in both side
335        let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
336
337        let join_key_data_types_l = state_join_key_indices_l
338            .iter()
339            .map(|idx| state_all_data_types_l[*idx].clone())
340            .collect_vec();
341
342        let join_key_data_types_r = state_join_key_indices_r
343            .iter()
344            .map(|idx| state_all_data_types_r[*idx].clone())
345            .collect_vec();
346
347        assert_eq!(join_key_data_types_l, join_key_data_types_r);
348
349        let null_matched = K::Bitmap::from_bool_vec(null_safe);
350
351        let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
352        let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
353
354        let degree_state_l = need_degree_table_l.then(|| {
355            TableInner::new(
356                degree_pk_indices_l,
357                degree_join_key_indices_l,
358                degree_state_table_l,
359            )
360        });
361        let degree_state_r = need_degree_table_r.then(|| {
362            TableInner::new(
363                degree_pk_indices_r,
364                degree_join_key_indices_r,
365                degree_state_table_r,
366            )
367        });
368
369        let (left_to_output, right_to_output) = {
370            let (left_len, right_len) = if is_left_semi_or_anti(T) {
371                (state_all_data_types_l.len(), 0usize)
372            } else if is_right_semi_or_anti(T) {
373                (0usize, state_all_data_types_r.len())
374            } else {
375                (state_all_data_types_l.len(), state_all_data_types_r.len())
376            };
377            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
378        };
379
380        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
381        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
382
383        let left_input_len = input_l.schema().len();
384        let right_input_len = input_r.schema().len();
385        let mut l2inequality_index = vec![vec![]; left_input_len];
386        let mut r2inequality_index = vec![vec![]; right_input_len];
387        let mut l_state_clean_columns = vec![];
388        let mut r_state_clean_columns = vec![];
389        let inequality_pairs = inequality_pairs
390            .into_iter()
391            .enumerate()
392            .map(
393                |(
394                    index,
395                    (key_required_larger, key_required_smaller, clean_state, delta_expression),
396                )| {
397                    let output_indices = if key_required_larger < key_required_smaller {
398                        if clean_state {
399                            l_state_clean_columns.push((key_required_larger, index));
400                        }
401                        l2inequality_index[key_required_larger].push((index, false));
402                        r2inequality_index[key_required_smaller - left_input_len]
403                            .push((index, true));
404                        l2o_indexed
405                            .get_vec(&key_required_larger)
406                            .cloned()
407                            .unwrap_or_default()
408                    } else {
409                        if clean_state {
410                            r_state_clean_columns
411                                .push((key_required_larger - left_input_len, index));
412                        }
413                        l2inequality_index[key_required_smaller].push((index, true));
414                        r2inequality_index[key_required_larger - left_input_len]
415                            .push((index, false));
416                        r2o_indexed
417                            .get_vec(&(key_required_larger - left_input_len))
418                            .cloned()
419                            .unwrap_or_default()
420                    };
421                    (output_indices, delta_expression)
422                },
423            )
424            .collect_vec();
425
426        let mut l_non_null_fields = l2inequality_index
427            .iter()
428            .positions(|inequalities| !inequalities.is_empty())
429            .collect_vec();
430        let mut r_non_null_fields = r2inequality_index
431            .iter()
432            .positions(|inequalities| !inequalities.is_empty())
433            .collect_vec();
434
435        if append_only_optimize {
436            l_state_clean_columns.clear();
437            r_state_clean_columns.clear();
438            l_non_null_fields.clear();
439            r_non_null_fields.clear();
440        }
441
442        let inequality_watermarks = vec![None; inequality_pairs.len()];
443        let watermark_buffers = BTreeMap::new();
444
445        Self {
446            ctx: ctx.clone(),
447            info,
448            input_l: Some(input_l),
449            input_r: Some(input_r),
450            actual_output_data_types,
451            side_l: JoinSide {
452                ht: JoinHashMap::new(
453                    watermark_epoch.clone(),
454                    join_key_data_types_l,
455                    state_join_key_indices_l.clone(),
456                    state_all_data_types_l.clone(),
457                    state_table_l,
458                    params_l.deduped_pk_indices,
459                    degree_state_l,
460                    null_matched.clone(),
461                    pk_contained_in_jk_l,
462                    None,
463                    metrics.clone(),
464                    ctx.id,
465                    ctx.fragment_id,
466                    "left",
467                ),
468                join_key_indices: state_join_key_indices_l,
469                all_data_types: state_all_data_types_l,
470                i2o_mapping: left_to_output,
471                i2o_mapping_indexed: l2o_indexed,
472                input2inequality_index: l2inequality_index,
473                non_null_fields: l_non_null_fields,
474                state_clean_columns: l_state_clean_columns,
475                start_pos: 0,
476                need_degree_table: need_degree_table_l,
477            },
478            side_r: JoinSide {
479                ht: JoinHashMap::new(
480                    watermark_epoch,
481                    join_key_data_types_r,
482                    state_join_key_indices_r.clone(),
483                    state_all_data_types_r.clone(),
484                    state_table_r,
485                    params_r.deduped_pk_indices,
486                    degree_state_r,
487                    null_matched,
488                    pk_contained_in_jk_r,
489                    None,
490                    metrics.clone(),
491                    ctx.id,
492                    ctx.fragment_id,
493                    "right",
494                ),
495                join_key_indices: state_join_key_indices_r,
496                all_data_types: state_all_data_types_r,
497                start_pos: side_l_column_n,
498                i2o_mapping: right_to_output,
499                i2o_mapping_indexed: r2o_indexed,
500                input2inequality_index: r2inequality_index,
501                non_null_fields: r_non_null_fields,
502                state_clean_columns: r_state_clean_columns,
503                need_degree_table: need_degree_table_r,
504            },
505            cond,
506            inequality_pairs,
507            inequality_watermarks,
508            append_only_optimize,
509            metrics,
510            chunk_size,
511            cnt_rows_received: 0,
512            watermark_buffers,
513            high_join_amplification_threshold,
514            entry_state_max_rows,
515        }
516    }
517
518    #[try_stream(ok = Message, error = StreamExecutorError)]
519    async fn into_stream(mut self) {
520        let input_l = self.input_l.take().unwrap();
521        let input_r = self.input_r.take().unwrap();
522        let aligned_stream = barrier_align(
523            input_l.execute(),
524            input_r.execute(),
525            self.ctx.id,
526            self.ctx.fragment_id,
527            self.metrics.clone(),
528            "Join",
529        );
530        pin_mut!(aligned_stream);
531
532        let actor_id = self.ctx.id;
533
534        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
535        let first_epoch = barrier.epoch;
536        // The first barrier message should be propagated.
537        yield Message::Barrier(barrier);
538        self.side_l.init(first_epoch).await?;
539        self.side_r.init(first_epoch).await?;
540
541        let actor_id_str = self.ctx.id.to_string();
542        let fragment_id_str = self.ctx.fragment_id.to_string();
543
544        // initialized some metrics
545        let join_actor_input_waiting_duration_ns = self
546            .metrics
547            .join_actor_input_waiting_duration_ns
548            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
549        let left_join_match_duration_ns = self
550            .metrics
551            .join_match_duration_ns
552            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, "left"]);
553        let right_join_match_duration_ns = self
554            .metrics
555            .join_match_duration_ns
556            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, "right"]);
557
558        let barrier_join_match_duration_ns = self
559            .metrics
560            .join_match_duration_ns
561            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, "barrier"]);
562
563        let left_join_cached_entry_count = self
564            .metrics
565            .join_cached_entry_count
566            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, "left"]);
567
568        let right_join_cached_entry_count = self
569            .metrics
570            .join_cached_entry_count
571            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, "right"]);
572
573        let mut start_time = Instant::now();
574
575        while let Some(msg) = aligned_stream
576            .next()
577            .instrument_await("hash_join_barrier_align")
578            .await
579        {
580            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
581            match msg? {
582                AlignedMessage::WatermarkLeft(watermark) => {
583                    for watermark_to_emit in
584                        self.handle_watermark(SideType::Left, watermark).await?
585                    {
586                        yield Message::Watermark(watermark_to_emit);
587                    }
588                }
589                AlignedMessage::WatermarkRight(watermark) => {
590                    for watermark_to_emit in
591                        self.handle_watermark(SideType::Right, watermark).await?
592                    {
593                        yield Message::Watermark(watermark_to_emit);
594                    }
595                }
596                AlignedMessage::Left(chunk) => {
597                    let mut left_time = Duration::from_nanos(0);
598                    let mut left_start_time = Instant::now();
599                    #[for_await]
600                    for chunk in Self::eq_join_left(EqJoinArgs {
601                        ctx: &self.ctx,
602                        side_l: &mut self.side_l,
603                        side_r: &mut self.side_r,
604                        actual_output_data_types: &self.actual_output_data_types,
605                        cond: &mut self.cond,
606                        inequality_watermarks: &self.inequality_watermarks,
607                        chunk,
608                        append_only_optimize: self.append_only_optimize,
609                        chunk_size: self.chunk_size,
610                        cnt_rows_received: &mut self.cnt_rows_received,
611                        high_join_amplification_threshold: self.high_join_amplification_threshold,
612                        entry_state_max_rows: self.entry_state_max_rows,
613                    }) {
614                        left_time += left_start_time.elapsed();
615                        yield Message::Chunk(chunk?);
616                        left_start_time = Instant::now();
617                    }
618                    left_time += left_start_time.elapsed();
619                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
620                    self.try_flush_data().await?;
621                }
622                AlignedMessage::Right(chunk) => {
623                    let mut right_time = Duration::from_nanos(0);
624                    let mut right_start_time = Instant::now();
625                    #[for_await]
626                    for chunk in Self::eq_join_right(EqJoinArgs {
627                        ctx: &self.ctx,
628                        side_l: &mut self.side_l,
629                        side_r: &mut self.side_r,
630                        actual_output_data_types: &self.actual_output_data_types,
631                        cond: &mut self.cond,
632                        inequality_watermarks: &self.inequality_watermarks,
633                        chunk,
634                        append_only_optimize: self.append_only_optimize,
635                        chunk_size: self.chunk_size,
636                        cnt_rows_received: &mut self.cnt_rows_received,
637                        high_join_amplification_threshold: self.high_join_amplification_threshold,
638                        entry_state_max_rows: self.entry_state_max_rows,
639                    }) {
640                        right_time += right_start_time.elapsed();
641                        yield Message::Chunk(chunk?);
642                        right_start_time = Instant::now();
643                    }
644                    right_time += right_start_time.elapsed();
645                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
646                    self.try_flush_data().await?;
647                }
648                AlignedMessage::Barrier(barrier) => {
649                    let barrier_start_time = Instant::now();
650                    let (left_post_commit, right_post_commit) =
651                        self.flush_data(barrier.epoch).await?;
652
653                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
654                    yield Message::Barrier(barrier);
655
656                    // Update the vnode bitmap for state tables of both sides if asked.
657                    right_post_commit
658                        .post_yield_barrier(update_vnode_bitmap.clone())
659                        .await?;
660                    if left_post_commit
661                        .post_yield_barrier(update_vnode_bitmap)
662                        .await?
663                        .unwrap_or(false)
664                    {
665                        self.watermark_buffers
666                            .values_mut()
667                            .for_each(|buffers| buffers.clear());
668                        self.inequality_watermarks.fill(None);
669                    }
670
671                    // Report metrics of cached join rows/entries
672                    for (join_cached_entry_count, ht) in [
673                        (&left_join_cached_entry_count, &self.side_l.ht),
674                        (&right_join_cached_entry_count, &self.side_r.ht),
675                    ] {
676                        join_cached_entry_count.set(ht.entry_count() as i64);
677                    }
678
679                    barrier_join_match_duration_ns
680                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
681                }
682            }
683            start_time = Instant::now();
684        }
685    }
686
687    async fn flush_data(
688        &mut self,
689        epoch: EpochPair,
690    ) -> StreamExecutorResult<(
691        JoinHashMapPostCommit<'_, K, S>,
692        JoinHashMapPostCommit<'_, K, S>,
693    )> {
694        // All changes to the state has been buffered in the mem-table of the state table. Just
695        // `commit` them here.
696        let left = self.side_l.ht.flush(epoch).await?;
697        let right = self.side_r.ht.flush(epoch).await?;
698        Ok((left, right))
699    }
700
701    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
702        // All changes to the state has been buffered in the mem-table of the state table. Just
703        // `commit` them here.
704        self.side_l.ht.try_flush().await?;
705        self.side_r.ht.try_flush().await?;
706        Ok(())
707    }
708
709    // We need to manually evict the cache.
710    fn evict_cache(
711        side_update: &mut JoinSide<K, S>,
712        side_match: &mut JoinSide<K, S>,
713        cnt_rows_received: &mut u32,
714    ) {
715        *cnt_rows_received += 1;
716        if *cnt_rows_received == EVICT_EVERY_N_ROWS {
717            side_update.ht.evict();
718            side_match.ht.evict();
719            *cnt_rows_received = 0;
720        }
721    }
722
723    async fn handle_watermark(
724        &mut self,
725        side: SideTypePrimitive,
726        watermark: Watermark,
727    ) -> StreamExecutorResult<Vec<Watermark>> {
728        let (side_update, side_match) = if side == SideType::Left {
729            (&mut self.side_l, &mut self.side_r)
730        } else {
731            (&mut self.side_r, &mut self.side_l)
732        };
733
734        // State cleaning
735        if side_update.join_key_indices[0] == watermark.col_idx {
736            side_match.ht.update_watermark(watermark.val.clone());
737        }
738
739        // Select watermarks to yield.
740        let wm_in_jk = side_update
741            .join_key_indices
742            .iter()
743            .positions(|idx| *idx == watermark.col_idx);
744        let mut watermarks_to_emit = vec![];
745        for idx in wm_in_jk {
746            let buffers = self
747                .watermark_buffers
748                .entry(idx)
749                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
750            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
751                let empty_indices = vec![];
752                let output_indices = side_update
753                    .i2o_mapping_indexed
754                    .get_vec(&side_update.join_key_indices[idx])
755                    .unwrap_or(&empty_indices)
756                    .iter()
757                    .chain(
758                        side_match
759                            .i2o_mapping_indexed
760                            .get_vec(&side_match.join_key_indices[idx])
761                            .unwrap_or(&empty_indices),
762                    );
763                for output_idx in output_indices {
764                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
765                }
766            };
767        }
768        for (inequality_index, need_offset) in
769            &side_update.input2inequality_index[watermark.col_idx]
770        {
771            let buffers = self
772                .watermark_buffers
773                .entry(side_update.join_key_indices.len() + inequality_index)
774                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
775            let mut input_watermark = watermark.clone();
776            if *need_offset
777                && let Some(delta_expression) = self.inequality_pairs[*inequality_index].1.as_ref()
778            {
779                // allow since we will handle error manually.
780                #[allow(clippy::disallowed_methods)]
781                let eval_result = delta_expression
782                    .inner()
783                    .eval_row(&OwnedRow::new(vec![Some(input_watermark.val)]))
784                    .await;
785                match eval_result {
786                    Ok(value) => input_watermark.val = value.unwrap(),
787                    Err(err) => {
788                        if !matches!(err, ExprError::NumericOutOfRange) {
789                            self.ctx.on_compute_error(err, &self.info.identity);
790                        }
791                        continue;
792                    }
793                }
794            };
795            if let Some(selected_watermark) = buffers.handle_watermark(side, input_watermark) {
796                for output_idx in &self.inequality_pairs[*inequality_index].0 {
797                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
798                }
799                self.inequality_watermarks[*inequality_index] = Some(selected_watermark);
800            }
801        }
802        Ok(watermarks_to_emit)
803    }
804
805    fn row_concat(
806        row_update: impl Row,
807        update_start_pos: usize,
808        row_matched: impl Row,
809        matched_start_pos: usize,
810    ) -> OwnedRow {
811        let mut new_row = vec![None; row_update.len() + row_matched.len()];
812
813        for (i, datum_ref) in row_update.iter().enumerate() {
814            new_row[i + update_start_pos] = datum_ref.to_owned_datum();
815        }
816        for (i, datum_ref) in row_matched.iter().enumerate() {
817            new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
818        }
819        OwnedRow::new(new_row)
820    }
821
822    /// Used to forward `eq_join_oneside` to show join side in stack.
823    fn eq_join_left(
824        args: EqJoinArgs<'_, K, S>,
825    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
826        Self::eq_join_oneside::<{ SideType::Left }>(args)
827    }
828
829    /// Used to forward `eq_join_oneside` to show join side in stack.
830    fn eq_join_right(
831        args: EqJoinArgs<'_, K, S>,
832    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
833        Self::eq_join_oneside::<{ SideType::Right }>(args)
834    }
835
836    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
837    async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S>) {
838        let EqJoinArgs {
839            ctx,
840            side_l,
841            side_r,
842            actual_output_data_types,
843            cond,
844            inequality_watermarks,
845            chunk,
846            append_only_optimize,
847            chunk_size,
848            cnt_rows_received,
849            high_join_amplification_threshold,
850            entry_state_max_rows,
851            ..
852        } = args;
853
854        let (side_update, side_match) = if SIDE == SideType::Left {
855            (side_l, side_r)
856        } else {
857            (side_r, side_l)
858        };
859
860        let useful_state_clean_columns = side_match
861            .state_clean_columns
862            .iter()
863            .filter_map(|(column_idx, inequality_index)| {
864                inequality_watermarks[*inequality_index]
865                    .as_ref()
866                    .map(|watermark| (*column_idx, watermark))
867            })
868            .collect_vec();
869
870        let mut hashjoin_chunk_builder =
871            JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
872                chunk_size,
873                actual_output_data_types.to_vec(),
874                side_update.i2o_mapping.clone(),
875                side_match.i2o_mapping.clone(),
876            ));
877
878        let join_matched_join_keys = ctx
879            .streaming_metrics
880            .join_matched_join_keys
881            .with_guarded_label_values(&[
882                &ctx.id.to_string(),
883                &ctx.fragment_id.to_string(),
884                &side_update.ht.table_id().to_string(),
885            ]);
886
887        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
888        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
889            let Some((op, row)) = r else {
890                continue;
891            };
892            Self::evict_cache(side_update, side_match, cnt_rows_received);
893
894            let cache_lookup_result = {
895                let probe_non_null_requirement_satisfied = side_update
896                    .non_null_fields
897                    .iter()
898                    .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
899                let build_non_null_requirement_satisfied =
900                    key.null_bitmap().is_subset(side_match.ht.null_matched());
901                if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
902                    side_match.ht.take_state_opt(key)
903                } else {
904                    CacheResult::NeverMatch
905                }
906            };
907            let mut total_matches = 0;
908
909            macro_rules! match_rows {
910                ($op:ident) => {
911                    Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
912                        cache_lookup_result,
913                        row,
914                        key,
915                        &mut hashjoin_chunk_builder,
916                        side_match,
917                        side_update,
918                        &useful_state_clean_columns,
919                        cond,
920                        append_only_optimize,
921                        entry_state_max_rows,
922                    )
923                };
924            }
925
926            match op {
927                Op::Insert | Op::UpdateInsert =>
928                {
929                    #[for_await]
930                    for chunk in match_rows!(Insert) {
931                        let chunk = chunk?;
932                        total_matches += chunk.cardinality();
933                        yield chunk;
934                    }
935                }
936                Op::Delete | Op::UpdateDelete =>
937                {
938                    #[for_await]
939                    for chunk in match_rows!(Delete) {
940                        let chunk = chunk?;
941                        total_matches += chunk.cardinality();
942                        yield chunk;
943                    }
944                }
945            };
946
947            join_matched_join_keys.observe(total_matches as _);
948            if total_matches > high_join_amplification_threshold {
949                let join_key_data_types = side_update.ht.join_key_data_types();
950                let key = key.deserialize(join_key_data_types)?;
951                tracing::warn!(target: "high_join_amplification",
952                    matched_rows_len = total_matches,
953                    update_table_id = side_update.ht.table_id(),
954                    match_table_id = side_match.ht.table_id(),
955                    join_key = ?key,
956                    actor_id = ctx.id,
957                    fragment_id = ctx.fragment_id,
958                    "large rows matched for join key"
959                );
960            }
961        }
962        // NOTE(kwannoel): We don't track metrics for this last chunk.
963        if let Some(chunk) = hashjoin_chunk_builder.take() {
964            yield chunk;
965        }
966    }
967
968    /// For the probe-side row, we need to check if it has values in cache, if not, we need to
969    /// fetch the matched rows from the state table.
970    ///
971    /// Every matched build-side row being processed needs to go through the following phases:
972    /// 1. Handle join condition evaluation.
973    /// 2. Always do cache refill, if the state count is good.
974    /// 3. Handle state cleaning.
975    /// 4. Handle degree table update.
976    #[allow(clippy::too_many_arguments)]
977    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
978    async fn handle_match_rows<
979        'a,
980        const SIDE: SideTypePrimitive,
981        const JOIN_OP: JoinOpPrimitive,
982    >(
983        cached_lookup_result: CacheResult,
984        row: RowRef<'a>,
985        key: &'a K,
986        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
987        side_match: &'a mut JoinSide<K, S>,
988        side_update: &'a mut JoinSide<K, S>,
989        useful_state_clean_columns: &'a [(usize, &'a Watermark)],
990        cond: &'a mut Option<NonStrictExpression>,
991        append_only_optimize: bool,
992        entry_state_max_rows: usize,
993    ) {
994        let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
995        let mut entry_state = JoinEntryState::default();
996        let mut entry_state_count = 0;
997
998        let mut degree = 0;
999        let mut append_only_matched_row: Option<JoinRow<OwnedRow>> = None;
1000        let mut matched_rows_to_clean = vec![];
1001
1002        macro_rules! match_row {
1003            (
1004                $match_order_key_indices:expr,
1005                $degree_table:expr,
1006                $matched_row:expr,
1007                $matched_row_ref:expr,
1008                $from_cache:literal
1009            ) => {
1010                Self::handle_match_row::<SIDE, { JOIN_OP }, { $from_cache }>(
1011                    row,
1012                    $matched_row,
1013                    $matched_row_ref,
1014                    hashjoin_chunk_builder,
1015                    $match_order_key_indices,
1016                    $degree_table,
1017                    side_update.start_pos,
1018                    side_match.start_pos,
1019                    cond,
1020                    &mut degree,
1021                    useful_state_clean_columns,
1022                    append_only_optimize,
1023                    &mut append_only_matched_row,
1024                    &mut matched_rows_to_clean,
1025                )
1026            };
1027        }
1028
1029        let entry_state = match cached_lookup_result {
1030            CacheResult::NeverMatch => {
1031                let op = match JOIN_OP {
1032                    JoinOp::Insert => Op::Insert,
1033                    JoinOp::Delete => Op::Delete,
1034                };
1035                if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1036                    yield chunk;
1037                }
1038                return Ok(());
1039            }
1040            CacheResult::Hit(mut cached_rows) => {
1041                let (match_order_key_indices, match_degree_state) =
1042                    side_match.ht.get_degree_state_mut_ref();
1043                // Handle cached rows which match the probe-side row.
1044                for (matched_row_ref, matched_row) in
1045                    cached_rows.values_mut(&side_match.all_data_types)
1046                {
1047                    let matched_row = matched_row?;
1048                    if let Some(chunk) = match_row!(
1049                        match_order_key_indices,
1050                        match_degree_state,
1051                        matched_row,
1052                        Some(matched_row_ref),
1053                        true
1054                    )
1055                    .await
1056                    {
1057                        yield chunk;
1058                    }
1059                }
1060
1061                cached_rows
1062            }
1063            CacheResult::Miss => {
1064                // Handle rows which are not in cache.
1065                let (matched_rows, match_order_key_indices, degree_table) = side_match
1066                    .ht
1067                    .fetch_matched_rows_and_get_degree_table_ref(key)
1068                    .await?;
1069
1070                #[for_await]
1071                for matched_row in matched_rows {
1072                    let (encoded_pk, matched_row) = matched_row?;
1073
1074                    let mut matched_row_ref = None;
1075
1076                    // cache refill
1077                    if entry_state_count <= entry_state_max_rows {
1078                        let row_ref = entry_state
1079                            .insert(encoded_pk, matched_row.encode(), None) // TODO(kwannoel): handle ineq key for asof join.
1080                            .with_context(|| format!("row: {}", row.display(),))?;
1081                        matched_row_ref = Some(row_ref);
1082                        entry_state_count += 1;
1083                    }
1084                    if let Some(chunk) = match_row!(
1085                        match_order_key_indices,
1086                        degree_table,
1087                        matched_row,
1088                        matched_row_ref,
1089                        false
1090                    )
1091                    .await
1092                    {
1093                        yield chunk;
1094                    }
1095                }
1096                Box::new(entry_state)
1097            }
1098        };
1099
1100        // forward rows depending on join types
1101        let op = match JOIN_OP {
1102            JoinOp::Insert => Op::Insert,
1103            JoinOp::Delete => Op::Delete,
1104        };
1105        if degree == 0 {
1106            if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1107                yield chunk;
1108            }
1109        } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1110        {
1111            yield chunk;
1112        }
1113
1114        // cache refill
1115        if cache_hit || entry_state_count <= entry_state_max_rows {
1116            side_match.ht.update_state(key, entry_state);
1117        }
1118
1119        // watermark state cleaning
1120        for matched_row in matched_rows_to_clean {
1121            side_match.ht.delete_handle_degree(key, matched_row)?;
1122        }
1123
1124        // apply append_only optimization to clean matched_rows which have been persisted
1125        if append_only_optimize && let Some(row) = append_only_matched_row {
1126            assert_matches!(JOIN_OP, JoinOp::Insert);
1127            side_match.ht.delete_handle_degree(key, row)?;
1128            return Ok(());
1129        }
1130
1131        // no append_only optimization, update state table(s).
1132        match JOIN_OP {
1133            JoinOp::Insert => {
1134                side_update
1135                    .ht
1136                    .insert_handle_degree(key, JoinRow::new(row, degree))?;
1137            }
1138            JoinOp::Delete => {
1139                side_update
1140                    .ht
1141                    .delete_handle_degree(key, JoinRow::new(row, degree))?;
1142            }
1143        }
1144    }
1145
1146    #[allow(clippy::too_many_arguments)]
1147    #[inline]
1148    async fn handle_match_row<
1149        'a,
1150        const SIDE: SideTypePrimitive,
1151        const JOIN_OP: JoinOpPrimitive,
1152        const MATCHED_ROWS_FROM_CACHE: bool,
1153    >(
1154        update_row: RowRef<'a>,
1155        mut matched_row: JoinRow<OwnedRow>,
1156        mut matched_row_cache_ref: Option<&mut StateValueType>,
1157        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1158        match_order_key_indices: &[usize],
1159        match_degree_table: &mut Option<TableInner<S>>,
1160        side_update_start_pos: usize,
1161        side_match_start_pos: usize,
1162        cond: &Option<NonStrictExpression>,
1163        update_row_degree: &mut u64,
1164        useful_state_clean_columns: &[(usize, &'a Watermark)],
1165        append_only_optimize: bool,
1166        append_only_matched_row: &mut Option<JoinRow<OwnedRow>>,
1167        matched_rows_to_clean: &mut Vec<JoinRow<OwnedRow>>,
1168    ) -> Option<StreamChunk> {
1169        let mut need_state_clean = false;
1170        let mut chunk_opt = None;
1171
1172        // TODO(kwannoel): Instead of evaluating this every loop,
1173        // we can call this only if there's a non-equi expression.
1174        // check join cond
1175        let join_condition_satisfied = Self::check_join_condition(
1176            update_row,
1177            side_update_start_pos,
1178            &matched_row.row,
1179            side_match_start_pos,
1180            cond,
1181        )
1182        .await;
1183
1184        if join_condition_satisfied {
1185            // update degree
1186            *update_row_degree += 1;
1187            // send matched row downstream
1188
1189            // Inserts must happen before degrees are updated,
1190            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1191            if matches!(JOIN_OP, JoinOp::Insert) && !forward_exactly_once(T, SIDE) {
1192                if let Some(chunk) =
1193                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1194                {
1195                    chunk_opt = Some(chunk);
1196                }
1197            }
1198            // TODO(kwannoel): We can actually statically decide this, using join side + join type.
1199            if let Some(degree_table) = match_degree_table {
1200                update_degree::<S, { JOIN_OP }>(
1201                    match_order_key_indices,
1202                    degree_table,
1203                    &mut matched_row,
1204                );
1205                if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1206                    // update matched row in cache
1207                    match JOIN_OP {
1208                        JoinOp::Insert => {
1209                            matched_row_cache_ref.as_mut().unwrap().degree += 1;
1210                        }
1211                        JoinOp::Delete => {
1212                            matched_row_cache_ref.as_mut().unwrap().degree -= 1;
1213                        }
1214                    }
1215                }
1216            }
1217
1218            // Deletes must happen after degree table is updated.
1219            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1220            if matches!(JOIN_OP, JoinOp::Delete) && !forward_exactly_once(T, SIDE) {
1221                if let Some(chunk) =
1222                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1223                {
1224                    chunk_opt = Some(chunk);
1225                }
1226            }
1227        } else {
1228            // check if need state cleaning
1229            for (column_idx, watermark) in useful_state_clean_columns {
1230                if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1231                    scalar
1232                        .default_cmp(&watermark.val.as_scalar_ref_impl())
1233                        .is_lt()
1234                }) {
1235                    need_state_clean = true;
1236                    break;
1237                }
1238            }
1239        }
1240        // If the stream is append-only and the join key covers pk in both side,
1241        // then we can remove matched rows since pk is unique and will not be
1242        // inserted again
1243        if append_only_optimize {
1244            assert_matches!(JOIN_OP, JoinOp::Insert);
1245            // Since join key contains pk and pk is unique, there should be only
1246            // one row if matched.
1247            assert!(append_only_matched_row.is_none());
1248            *append_only_matched_row = Some(matched_row);
1249        } else if need_state_clean {
1250            // `append_only_optimize` and `need_state_clean` won't both be true.
1251            // 'else' here is only to suppress compiler error, otherwise
1252            // `matched_row` will be moved twice.
1253            matched_rows_to_clean.push(matched_row);
1254        }
1255
1256        chunk_opt
1257    }
1258
1259    // TODO(yuhao-su): We should find a better way to eval the expression
1260    // without concat two rows.
1261    // if there are non-equi expressions
1262    // NOTE(kwannoel): We can probably let `eval` use `impl Row` instead of `OwnedRow`.
1263    #[inline]
1264    async fn check_join_condition(
1265        row: impl Row,
1266        side_update_start_pos: usize,
1267        matched_row: impl Row,
1268        side_match_start_pos: usize,
1269        join_condition: &Option<NonStrictExpression>,
1270    ) -> bool {
1271        if let Some(join_condition) = join_condition {
1272            let new_row = Self::row_concat(
1273                row,
1274                side_update_start_pos,
1275                matched_row,
1276                side_match_start_pos,
1277            );
1278            join_condition
1279                .eval_row_infallible(&new_row)
1280                .await
1281                .map(|s| *s.as_bool())
1282                .unwrap_or(false)
1283        } else {
1284            true
1285        }
1286    }
1287}
1288
1289#[cfg(test)]
1290mod tests {
1291    use std::sync::atomic::AtomicU64;
1292
1293    use pretty_assertions::assert_eq;
1294    use risingwave_common::array::*;
1295    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1296    use risingwave_common::hash::{Key64, Key128};
1297    use risingwave_common::util::epoch::test_epoch;
1298    use risingwave_common::util::sort_util::OrderType;
1299    use risingwave_storage::memory::MemoryStateStore;
1300
1301    use super::*;
1302    use crate::common::table::test_utils::gen_pbtable;
1303    use crate::executor::test_utils::expr::build_from_pretty;
1304    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1305
1306    async fn create_in_memory_state_table(
1307        mem_state: MemoryStateStore,
1308        data_types: &[DataType],
1309        order_types: &[OrderType],
1310        pk_indices: &[usize],
1311        table_id: u32,
1312    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1313        let column_descs = data_types
1314            .iter()
1315            .enumerate()
1316            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1317            .collect_vec();
1318        let state_table = StateTable::from_table_catalog(
1319            &gen_pbtable(
1320                TableId::new(table_id),
1321                column_descs,
1322                order_types.to_vec(),
1323                pk_indices.to_vec(),
1324                0,
1325            ),
1326            mem_state.clone(),
1327            None,
1328        )
1329        .await;
1330
1331        // Create degree table
1332        let mut degree_table_column_descs = vec![];
1333        pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1334            degree_table_column_descs.push(ColumnDesc::unnamed(
1335                ColumnId::new(pk_id as i32),
1336                data_types[*idx].clone(),
1337            ))
1338        });
1339        degree_table_column_descs.push(ColumnDesc::unnamed(
1340            ColumnId::new(pk_indices.len() as i32),
1341            DataType::Int64,
1342        ));
1343        let degree_state_table = StateTable::from_table_catalog(
1344            &gen_pbtable(
1345                TableId::new(table_id + 1),
1346                degree_table_column_descs,
1347                order_types.to_vec(),
1348                pk_indices.to_vec(),
1349                0,
1350            ),
1351            mem_state,
1352            None,
1353        )
1354        .await;
1355        (state_table, degree_state_table)
1356    }
1357
1358    fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1359        build_from_pretty(
1360            condition_text
1361                .as_deref()
1362                .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1363        )
1364    }
1365
1366    async fn create_executor<const T: JoinTypePrimitive>(
1367        with_condition: bool,
1368        null_safe: bool,
1369        condition_text: Option<String>,
1370        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
1371    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1372        let schema = Schema {
1373            fields: vec![
1374                Field::unnamed(DataType::Int64), // join key
1375                Field::unnamed(DataType::Int64),
1376            ],
1377        };
1378        let (tx_l, source_l) = MockSource::channel();
1379        let source_l = source_l.into_executor(schema.clone(), vec![1]);
1380        let (tx_r, source_r) = MockSource::channel();
1381        let source_r = source_r.into_executor(schema, vec![1]);
1382        let params_l = JoinParams::new(vec![0], vec![1]);
1383        let params_r = JoinParams::new(vec![0], vec![1]);
1384        let cond = with_condition.then(|| create_cond(condition_text));
1385
1386        let mem_state = MemoryStateStore::new();
1387
1388        let (state_l, degree_state_l) = create_in_memory_state_table(
1389            mem_state.clone(),
1390            &[DataType::Int64, DataType::Int64],
1391            &[OrderType::ascending(), OrderType::ascending()],
1392            &[0, 1],
1393            0,
1394        )
1395        .await;
1396
1397        let (state_r, degree_state_r) = create_in_memory_state_table(
1398            mem_state,
1399            &[DataType::Int64, DataType::Int64],
1400            &[OrderType::ascending(), OrderType::ascending()],
1401            &[0, 1],
1402            2,
1403        )
1404        .await;
1405
1406        let schema = match T {
1407            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1408            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1409            _ => [source_l.schema().fields(), source_r.schema().fields()]
1410                .concat()
1411                .into_iter()
1412                .collect(),
1413        };
1414        let schema_len = schema.len();
1415        let info = ExecutorInfo::new(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1416
1417        let executor = HashJoinExecutor::<Key64, MemoryStateStore, T>::new(
1418            ActorContext::for_test(123),
1419            info,
1420            source_l,
1421            source_r,
1422            params_l,
1423            params_r,
1424            vec![null_safe],
1425            (0..schema_len).collect_vec(),
1426            cond,
1427            inequality_pairs,
1428            state_l,
1429            degree_state_l,
1430            state_r,
1431            degree_state_r,
1432            Arc::new(AtomicU64::new(0)),
1433            false,
1434            Arc::new(StreamingMetrics::unused()),
1435            1024,
1436            2048,
1437        );
1438        (tx_l, tx_r, executor.boxed().execute())
1439    }
1440
1441    async fn create_classical_executor<const T: JoinTypePrimitive>(
1442        with_condition: bool,
1443        null_safe: bool,
1444        condition_text: Option<String>,
1445    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1446        create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1447    }
1448
1449    async fn create_append_only_executor<const T: JoinTypePrimitive>(
1450        with_condition: bool,
1451    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1452        let schema = Schema {
1453            fields: vec![
1454                Field::unnamed(DataType::Int64),
1455                Field::unnamed(DataType::Int64),
1456                Field::unnamed(DataType::Int64),
1457            ],
1458        };
1459        let (tx_l, source_l) = MockSource::channel();
1460        let source_l = source_l.into_executor(schema.clone(), vec![0]);
1461        let (tx_r, source_r) = MockSource::channel();
1462        let source_r = source_r.into_executor(schema, vec![0]);
1463        let params_l = JoinParams::new(vec![0, 1], vec![]);
1464        let params_r = JoinParams::new(vec![0, 1], vec![]);
1465        let cond = with_condition.then(|| create_cond(None));
1466
1467        let mem_state = MemoryStateStore::new();
1468
1469        let (state_l, degree_state_l) = create_in_memory_state_table(
1470            mem_state.clone(),
1471            &[DataType::Int64, DataType::Int64, DataType::Int64],
1472            &[
1473                OrderType::ascending(),
1474                OrderType::ascending(),
1475                OrderType::ascending(),
1476            ],
1477            &[0, 1, 0],
1478            0,
1479        )
1480        .await;
1481
1482        let (state_r, degree_state_r) = create_in_memory_state_table(
1483            mem_state,
1484            &[DataType::Int64, DataType::Int64, DataType::Int64],
1485            &[
1486                OrderType::ascending(),
1487                OrderType::ascending(),
1488                OrderType::ascending(),
1489            ],
1490            &[0, 1, 1],
1491            1,
1492        )
1493        .await;
1494
1495        let schema = match T {
1496            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1497            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1498            _ => [source_l.schema().fields(), source_r.schema().fields()]
1499                .concat()
1500                .into_iter()
1501                .collect(),
1502        };
1503        let schema_len = schema.len();
1504        let info = ExecutorInfo::new(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1505
1506        let executor = HashJoinExecutor::<Key128, MemoryStateStore, T>::new(
1507            ActorContext::for_test(123),
1508            info,
1509            source_l,
1510            source_r,
1511            params_l,
1512            params_r,
1513            vec![false],
1514            (0..schema_len).collect_vec(),
1515            cond,
1516            vec![],
1517            state_l,
1518            degree_state_l,
1519            state_r,
1520            degree_state_r,
1521            Arc::new(AtomicU64::new(0)),
1522            true,
1523            Arc::new(StreamingMetrics::unused()),
1524            1024,
1525            2048,
1526        );
1527        (tx_l, tx_r, executor.boxed().execute())
1528    }
1529
1530    #[tokio::test]
1531    async fn test_interval_join() -> StreamExecutorResult<()> {
1532        let chunk_l1 = StreamChunk::from_pretty(
1533            "  I I
1534             + 1 4
1535             + 2 3
1536             + 2 5
1537             + 3 6",
1538        );
1539        let chunk_l2 = StreamChunk::from_pretty(
1540            "  I I
1541             + 3 8
1542             - 3 8",
1543        );
1544        let chunk_r1 = StreamChunk::from_pretty(
1545            "  I I
1546             + 2 6
1547             + 4 8
1548             + 6 9",
1549        );
1550        let chunk_r2 = StreamChunk::from_pretty(
1551            "  I  I
1552             + 2 3
1553             + 6 11",
1554        );
1555        let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1556            true,
1557            false,
1558            Some(String::from("(and:boolean (greater_than:boolean $1:int8 (subtract:int8 $3:int8 2:int8)) (greater_than:boolean $3:int8 (subtract:int8 $1:int8 2:int8)))")),
1559            vec![(1, 3, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)"))), (3, 1, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)")))],
1560        )
1561        .await;
1562
1563        // push the init barrier for left and right
1564        tx_l.push_barrier(test_epoch(1), false);
1565        tx_r.push_barrier(test_epoch(1), false);
1566        hash_join.next_unwrap_ready_barrier()?;
1567
1568        // push the 1st left chunk
1569        tx_l.push_chunk(chunk_l1);
1570        hash_join.next_unwrap_pending();
1571
1572        // push the init barrier for left and right
1573        tx_l.push_barrier(test_epoch(2), false);
1574        tx_r.push_barrier(test_epoch(2), false);
1575        hash_join.next_unwrap_ready_barrier()?;
1576
1577        // push the 2nd left chunk
1578        tx_l.push_chunk(chunk_l2);
1579        hash_join.next_unwrap_pending();
1580
1581        tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1582        hash_join.next_unwrap_pending();
1583
1584        tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1585        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1586        assert_eq!(
1587            output_watermark,
1588            Watermark::new(1, DataType::Int64, ScalarImpl::Int64(4))
1589        );
1590        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1591        assert_eq!(
1592            output_watermark,
1593            Watermark::new(3, DataType::Int64, ScalarImpl::Int64(6))
1594        );
1595
1596        // push the 1st right chunk
1597        tx_r.push_chunk(chunk_r1);
1598        let chunk = hash_join.next_unwrap_ready_chunk()?;
1599        // data "2 3" should have been cleaned
1600        assert_eq!(
1601            chunk,
1602            StreamChunk::from_pretty(
1603                " I I I I
1604                + 2 5 2 6"
1605            )
1606        );
1607
1608        // push the 2nd right chunk
1609        tx_r.push_chunk(chunk_r2);
1610        // pending means that state clean is successful, or the executor will yield a chunk "+ 2 3
1611        // 2 3" here.
1612        hash_join.next_unwrap_pending();
1613
1614        Ok(())
1615    }
1616
1617    #[tokio::test]
1618    async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1619        let chunk_l1 = StreamChunk::from_pretty(
1620            "  I I
1621             + 1 4
1622             + 2 5
1623             + 3 6",
1624        );
1625        let chunk_l2 = StreamChunk::from_pretty(
1626            "  I I
1627             + 3 8
1628             - 3 8",
1629        );
1630        let chunk_r1 = StreamChunk::from_pretty(
1631            "  I I
1632             + 2 7
1633             + 4 8
1634             + 6 9",
1635        );
1636        let chunk_r2 = StreamChunk::from_pretty(
1637            "  I  I
1638             + 3 10
1639             + 6 11",
1640        );
1641        let (mut tx_l, mut tx_r, mut hash_join) =
1642            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1643
1644        // push the init barrier for left and right
1645        tx_l.push_barrier(test_epoch(1), false);
1646        tx_r.push_barrier(test_epoch(1), false);
1647        hash_join.next_unwrap_ready_barrier()?;
1648
1649        // push the 1st left chunk
1650        tx_l.push_chunk(chunk_l1);
1651        hash_join.next_unwrap_pending();
1652
1653        // push the init barrier for left and right
1654        tx_l.push_barrier(test_epoch(2), false);
1655        tx_r.push_barrier(test_epoch(2), false);
1656        hash_join.next_unwrap_ready_barrier()?;
1657
1658        // push the 2nd left chunk
1659        tx_l.push_chunk(chunk_l2);
1660        hash_join.next_unwrap_pending();
1661
1662        // push the 1st right chunk
1663        tx_r.push_chunk(chunk_r1);
1664        let chunk = hash_join.next_unwrap_ready_chunk()?;
1665        assert_eq!(
1666            chunk,
1667            StreamChunk::from_pretty(
1668                " I I I I
1669                + 2 5 2 7"
1670            )
1671        );
1672
1673        // push the 2nd right chunk
1674        tx_r.push_chunk(chunk_r2);
1675        let chunk = hash_join.next_unwrap_ready_chunk()?;
1676        assert_eq!(
1677            chunk,
1678            StreamChunk::from_pretty(
1679                " I I I I
1680                + 3 6 3 10"
1681            )
1682        );
1683
1684        Ok(())
1685    }
1686
1687    #[tokio::test]
1688    async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1689        let chunk_l1 = StreamChunk::from_pretty(
1690            "  I I
1691             + 1 4
1692             + 2 5
1693             + . 6",
1694        );
1695        let chunk_l2 = StreamChunk::from_pretty(
1696            "  I I
1697             + . 8
1698             - . 8",
1699        );
1700        let chunk_r1 = StreamChunk::from_pretty(
1701            "  I I
1702             + 2 7
1703             + 4 8
1704             + 6 9",
1705        );
1706        let chunk_r2 = StreamChunk::from_pretty(
1707            "  I  I
1708             + . 10
1709             + 6 11",
1710        );
1711        let (mut tx_l, mut tx_r, mut hash_join) =
1712            create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1713
1714        // push the init barrier for left and right
1715        tx_l.push_barrier(test_epoch(1), false);
1716        tx_r.push_barrier(test_epoch(1), false);
1717        hash_join.next_unwrap_ready_barrier()?;
1718
1719        // push the 1st left chunk
1720        tx_l.push_chunk(chunk_l1);
1721        hash_join.next_unwrap_pending();
1722
1723        // push the init barrier for left and right
1724        tx_l.push_barrier(test_epoch(2), false);
1725        tx_r.push_barrier(test_epoch(2), false);
1726        hash_join.next_unwrap_ready_barrier()?;
1727
1728        // push the 2nd left chunk
1729        tx_l.push_chunk(chunk_l2);
1730        hash_join.next_unwrap_pending();
1731
1732        // push the 1st right chunk
1733        tx_r.push_chunk(chunk_r1);
1734        let chunk = hash_join.next_unwrap_ready_chunk()?;
1735        assert_eq!(
1736            chunk,
1737            StreamChunk::from_pretty(
1738                " I I I I
1739                + 2 5 2 7"
1740            )
1741        );
1742
1743        // push the 2nd right chunk
1744        tx_r.push_chunk(chunk_r2);
1745        let chunk = hash_join.next_unwrap_ready_chunk()?;
1746        assert_eq!(
1747            chunk,
1748            StreamChunk::from_pretty(
1749                " I I I I
1750                + . 6 . 10"
1751            )
1752        );
1753
1754        Ok(())
1755    }
1756
1757    #[tokio::test]
1758    async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1759        let chunk_l1 = StreamChunk::from_pretty(
1760            "  I I
1761             + 1 4
1762             + 2 5
1763             + 3 6",
1764        );
1765        let chunk_l2 = StreamChunk::from_pretty(
1766            "  I I
1767             + 3 8
1768             - 3 8",
1769        );
1770        let chunk_r1 = StreamChunk::from_pretty(
1771            "  I I
1772             + 2 7
1773             + 4 8
1774             + 6 9",
1775        );
1776        let chunk_r2 = StreamChunk::from_pretty(
1777            "  I  I
1778             + 3 10
1779             + 6 11",
1780        );
1781        let chunk_l3 = StreamChunk::from_pretty(
1782            "  I I
1783             + 6 10",
1784        );
1785        let chunk_r3 = StreamChunk::from_pretty(
1786            "  I  I
1787             - 6 11",
1788        );
1789        let chunk_r4 = StreamChunk::from_pretty(
1790            "  I  I
1791             - 6 9",
1792        );
1793        let (mut tx_l, mut tx_r, mut hash_join) =
1794            create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1795
1796        // push the init barrier for left and right
1797        tx_l.push_barrier(test_epoch(1), false);
1798        tx_r.push_barrier(test_epoch(1), false);
1799        hash_join.next_unwrap_ready_barrier()?;
1800
1801        // push the 1st left chunk
1802        tx_l.push_chunk(chunk_l1);
1803        hash_join.next_unwrap_pending();
1804
1805        // push the init barrier for left and right
1806        tx_l.push_barrier(test_epoch(2), false);
1807        tx_r.push_barrier(test_epoch(2), false);
1808        hash_join.next_unwrap_ready_barrier()?;
1809
1810        // push the 2nd left chunk
1811        tx_l.push_chunk(chunk_l2);
1812        hash_join.next_unwrap_pending();
1813
1814        // push the 1st right chunk
1815        tx_r.push_chunk(chunk_r1);
1816        let chunk = hash_join.next_unwrap_ready_chunk()?;
1817        assert_eq!(
1818            chunk,
1819            StreamChunk::from_pretty(
1820                " I I
1821                + 2 5"
1822            )
1823        );
1824
1825        // push the 2nd right chunk
1826        tx_r.push_chunk(chunk_r2);
1827        let chunk = hash_join.next_unwrap_ready_chunk()?;
1828        assert_eq!(
1829            chunk,
1830            StreamChunk::from_pretty(
1831                " I I
1832                + 3 6"
1833            )
1834        );
1835
1836        // push the 3rd left chunk (tests forward_exactly_once)
1837        tx_l.push_chunk(chunk_l3);
1838        let chunk = hash_join.next_unwrap_ready_chunk()?;
1839        assert_eq!(
1840            chunk,
1841            StreamChunk::from_pretty(
1842                " I I
1843                + 6 10"
1844            )
1845        );
1846
1847        // push the 3rd right chunk
1848        // (tests that no change if there are still matches)
1849        tx_r.push_chunk(chunk_r3);
1850        hash_join.next_unwrap_pending();
1851
1852        // push the 3rd left chunk
1853        // (tests that deletion occurs when there are no more matches)
1854        tx_r.push_chunk(chunk_r4);
1855        let chunk = hash_join.next_unwrap_ready_chunk()?;
1856        assert_eq!(
1857            chunk,
1858            StreamChunk::from_pretty(
1859                " I I
1860                - 6 10"
1861            )
1862        );
1863
1864        Ok(())
1865    }
1866
1867    #[tokio::test]
1868    async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1869        let chunk_l1 = StreamChunk::from_pretty(
1870            "  I I
1871             + 1 4
1872             + 2 5
1873             + . 6",
1874        );
1875        let chunk_l2 = StreamChunk::from_pretty(
1876            "  I I
1877             + . 8
1878             - . 8",
1879        );
1880        let chunk_r1 = StreamChunk::from_pretty(
1881            "  I I
1882             + 2 7
1883             + 4 8
1884             + 6 9",
1885        );
1886        let chunk_r2 = StreamChunk::from_pretty(
1887            "  I  I
1888             + . 10
1889             + 6 11",
1890        );
1891        let chunk_l3 = StreamChunk::from_pretty(
1892            "  I I
1893             + 6 10",
1894        );
1895        let chunk_r3 = StreamChunk::from_pretty(
1896            "  I  I
1897             - 6 11",
1898        );
1899        let chunk_r4 = StreamChunk::from_pretty(
1900            "  I  I
1901             - 6 9",
1902        );
1903        let (mut tx_l, mut tx_r, mut hash_join) =
1904            create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1905
1906        // push the init barrier for left and right
1907        tx_l.push_barrier(test_epoch(1), false);
1908        tx_r.push_barrier(test_epoch(1), false);
1909        hash_join.next_unwrap_ready_barrier()?;
1910
1911        // push the 1st left chunk
1912        tx_l.push_chunk(chunk_l1);
1913        hash_join.next_unwrap_pending();
1914
1915        // push the init barrier for left and right
1916        tx_l.push_barrier(test_epoch(2), false);
1917        tx_r.push_barrier(test_epoch(2), false);
1918        hash_join.next_unwrap_ready_barrier()?;
1919
1920        // push the 2nd left chunk
1921        tx_l.push_chunk(chunk_l2);
1922        hash_join.next_unwrap_pending();
1923
1924        // push the 1st right chunk
1925        tx_r.push_chunk(chunk_r1);
1926        let chunk = hash_join.next_unwrap_ready_chunk()?;
1927        assert_eq!(
1928            chunk,
1929            StreamChunk::from_pretty(
1930                " I I
1931                + 2 5"
1932            )
1933        );
1934
1935        // push the 2nd right chunk
1936        tx_r.push_chunk(chunk_r2);
1937        let chunk = hash_join.next_unwrap_ready_chunk()?;
1938        assert_eq!(
1939            chunk,
1940            StreamChunk::from_pretty(
1941                " I I
1942                + . 6"
1943            )
1944        );
1945
1946        // push the 3rd left chunk (tests forward_exactly_once)
1947        tx_l.push_chunk(chunk_l3);
1948        let chunk = hash_join.next_unwrap_ready_chunk()?;
1949        assert_eq!(
1950            chunk,
1951            StreamChunk::from_pretty(
1952                " I I
1953                + 6 10"
1954            )
1955        );
1956
1957        // push the 3rd right chunk
1958        // (tests that no change if there are still matches)
1959        tx_r.push_chunk(chunk_r3);
1960        hash_join.next_unwrap_pending();
1961
1962        // push the 3rd left chunk
1963        // (tests that deletion occurs when there are no more matches)
1964        tx_r.push_chunk(chunk_r4);
1965        let chunk = hash_join.next_unwrap_ready_chunk()?;
1966        assert_eq!(
1967            chunk,
1968            StreamChunk::from_pretty(
1969                " I I
1970                - 6 10"
1971            )
1972        );
1973
1974        Ok(())
1975    }
1976
1977    #[tokio::test]
1978    async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
1979        let chunk_l1 = StreamChunk::from_pretty(
1980            "  I I I
1981             + 1 4 1
1982             + 2 5 2
1983             + 3 6 3",
1984        );
1985        let chunk_l2 = StreamChunk::from_pretty(
1986            "  I I I
1987             + 4 9 4
1988             + 5 10 5",
1989        );
1990        let chunk_r1 = StreamChunk::from_pretty(
1991            "  I I I
1992             + 2 5 1
1993             + 4 9 2
1994             + 6 9 3",
1995        );
1996        let chunk_r2 = StreamChunk::from_pretty(
1997            "  I I I
1998             + 1 4 4
1999             + 3 6 5",
2000        );
2001
2002        let (mut tx_l, mut tx_r, mut hash_join) =
2003            create_append_only_executor::<{ JoinType::Inner }>(false).await;
2004
2005        // push the init barrier for left and right
2006        tx_l.push_barrier(test_epoch(1), false);
2007        tx_r.push_barrier(test_epoch(1), false);
2008        hash_join.next_unwrap_ready_barrier()?;
2009
2010        // push the 1st left chunk
2011        tx_l.push_chunk(chunk_l1);
2012        hash_join.next_unwrap_pending();
2013
2014        // push the init barrier for left and right
2015        tx_l.push_barrier(test_epoch(2), false);
2016        tx_r.push_barrier(test_epoch(2), false);
2017        hash_join.next_unwrap_ready_barrier()?;
2018
2019        // push the 2nd left chunk
2020        tx_l.push_chunk(chunk_l2);
2021        hash_join.next_unwrap_pending();
2022
2023        // push the 1st right chunk
2024        tx_r.push_chunk(chunk_r1);
2025        let chunk = hash_join.next_unwrap_ready_chunk()?;
2026        assert_eq!(
2027            chunk,
2028            StreamChunk::from_pretty(
2029                " I I I I I I
2030                + 2 5 2 2 5 1
2031                + 4 9 4 4 9 2"
2032            )
2033        );
2034
2035        // push the 2nd right chunk
2036        tx_r.push_chunk(chunk_r2);
2037        let chunk = hash_join.next_unwrap_ready_chunk()?;
2038        assert_eq!(
2039            chunk,
2040            StreamChunk::from_pretty(
2041                " I I I I I I
2042                + 1 4 1 1 4 4
2043                + 3 6 3 3 6 5"
2044            )
2045        );
2046
2047        Ok(())
2048    }
2049
2050    #[tokio::test]
2051    async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2052        let chunk_l1 = StreamChunk::from_pretty(
2053            "  I I I
2054             + 1 4 1
2055             + 2 5 2
2056             + 3 6 3",
2057        );
2058        let chunk_l2 = StreamChunk::from_pretty(
2059            "  I I I
2060             + 4 9 4
2061             + 5 10 5",
2062        );
2063        let chunk_r1 = StreamChunk::from_pretty(
2064            "  I I I
2065             + 2 5 1
2066             + 4 9 2
2067             + 6 9 3",
2068        );
2069        let chunk_r2 = StreamChunk::from_pretty(
2070            "  I I I
2071             + 1 4 4
2072             + 3 6 5",
2073        );
2074
2075        let (mut tx_l, mut tx_r, mut hash_join) =
2076            create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2077
2078        // push the init barrier for left and right
2079        tx_l.push_barrier(test_epoch(1), false);
2080        tx_r.push_barrier(test_epoch(1), false);
2081        hash_join.next_unwrap_ready_barrier()?;
2082
2083        // push the 1st left chunk
2084        tx_l.push_chunk(chunk_l1);
2085        hash_join.next_unwrap_pending();
2086
2087        // push the init barrier for left and right
2088        tx_l.push_barrier(test_epoch(2), false);
2089        tx_r.push_barrier(test_epoch(2), false);
2090        hash_join.next_unwrap_ready_barrier()?;
2091
2092        // push the 2nd left chunk
2093        tx_l.push_chunk(chunk_l2);
2094        hash_join.next_unwrap_pending();
2095
2096        // push the 1st right chunk
2097        tx_r.push_chunk(chunk_r1);
2098        let chunk = hash_join.next_unwrap_ready_chunk()?;
2099        assert_eq!(
2100            chunk,
2101            StreamChunk::from_pretty(
2102                " I I I
2103                + 2 5 2
2104                + 4 9 4"
2105            )
2106        );
2107
2108        // push the 2nd right chunk
2109        tx_r.push_chunk(chunk_r2);
2110        let chunk = hash_join.next_unwrap_ready_chunk()?;
2111        assert_eq!(
2112            chunk,
2113            StreamChunk::from_pretty(
2114                " I I I
2115                + 1 4 1
2116                + 3 6 3"
2117            )
2118        );
2119
2120        Ok(())
2121    }
2122
2123    #[tokio::test]
2124    async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2125        let chunk_l1 = StreamChunk::from_pretty(
2126            "  I I I
2127             + 1 4 1
2128             + 2 5 2
2129             + 3 6 3",
2130        );
2131        let chunk_l2 = StreamChunk::from_pretty(
2132            "  I I I
2133             + 4 9 4
2134             + 5 10 5",
2135        );
2136        let chunk_r1 = StreamChunk::from_pretty(
2137            "  I I I
2138             + 2 5 1
2139             + 4 9 2
2140             + 6 9 3",
2141        );
2142        let chunk_r2 = StreamChunk::from_pretty(
2143            "  I I I
2144             + 1 4 4
2145             + 3 6 5",
2146        );
2147
2148        let (mut tx_l, mut tx_r, mut hash_join) =
2149            create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2150
2151        // push the init barrier for left and right
2152        tx_l.push_barrier(test_epoch(1), false);
2153        tx_r.push_barrier(test_epoch(1), false);
2154        hash_join.next_unwrap_ready_barrier()?;
2155
2156        // push the 1st left chunk
2157        tx_l.push_chunk(chunk_l1);
2158        hash_join.next_unwrap_pending();
2159
2160        // push the init barrier for left and right
2161        tx_l.push_barrier(test_epoch(2), false);
2162        tx_r.push_barrier(test_epoch(2), false);
2163        hash_join.next_unwrap_ready_barrier()?;
2164
2165        // push the 2nd left chunk
2166        tx_l.push_chunk(chunk_l2);
2167        hash_join.next_unwrap_pending();
2168
2169        // push the 1st right chunk
2170        tx_r.push_chunk(chunk_r1);
2171        let chunk = hash_join.next_unwrap_ready_chunk()?;
2172        assert_eq!(
2173            chunk,
2174            StreamChunk::from_pretty(
2175                " I I I
2176                + 2 5 1
2177                + 4 9 2"
2178            )
2179        );
2180
2181        // push the 2nd right chunk
2182        tx_r.push_chunk(chunk_r2);
2183        let chunk = hash_join.next_unwrap_ready_chunk()?;
2184        assert_eq!(
2185            chunk,
2186            StreamChunk::from_pretty(
2187                " I I I
2188                + 1 4 4
2189                + 3 6 5"
2190            )
2191        );
2192
2193        Ok(())
2194    }
2195
2196    #[tokio::test]
2197    async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2198        let chunk_r1 = StreamChunk::from_pretty(
2199            "  I I
2200             + 1 4
2201             + 2 5
2202             + 3 6",
2203        );
2204        let chunk_r2 = StreamChunk::from_pretty(
2205            "  I I
2206             + 3 8
2207             - 3 8",
2208        );
2209        let chunk_l1 = StreamChunk::from_pretty(
2210            "  I I
2211             + 2 7
2212             + 4 8
2213             + 6 9",
2214        );
2215        let chunk_l2 = StreamChunk::from_pretty(
2216            "  I  I
2217             + 3 10
2218             + 6 11",
2219        );
2220        let chunk_r3 = StreamChunk::from_pretty(
2221            "  I I
2222             + 6 10",
2223        );
2224        let chunk_l3 = StreamChunk::from_pretty(
2225            "  I  I
2226             - 6 11",
2227        );
2228        let chunk_l4 = StreamChunk::from_pretty(
2229            "  I  I
2230             - 6 9",
2231        );
2232        let (mut tx_l, mut tx_r, mut hash_join) =
2233            create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2234
2235        // push the init barrier for left and right
2236        tx_l.push_barrier(test_epoch(1), false);
2237        tx_r.push_barrier(test_epoch(1), false);
2238        hash_join.next_unwrap_ready_barrier()?;
2239
2240        // push the 1st right chunk
2241        tx_r.push_chunk(chunk_r1);
2242        hash_join.next_unwrap_pending();
2243
2244        // push the init barrier for left and right
2245        tx_l.push_barrier(test_epoch(2), false);
2246        tx_r.push_barrier(test_epoch(2), false);
2247        hash_join.next_unwrap_ready_barrier()?;
2248
2249        // push the 2nd right chunk
2250        tx_r.push_chunk(chunk_r2);
2251        hash_join.next_unwrap_pending();
2252
2253        // push the 1st left chunk
2254        tx_l.push_chunk(chunk_l1);
2255        let chunk = hash_join.next_unwrap_ready_chunk()?;
2256        assert_eq!(
2257            chunk,
2258            StreamChunk::from_pretty(
2259                " I I
2260                + 2 5"
2261            )
2262        );
2263
2264        // push the 2nd left chunk
2265        tx_l.push_chunk(chunk_l2);
2266        let chunk = hash_join.next_unwrap_ready_chunk()?;
2267        assert_eq!(
2268            chunk,
2269            StreamChunk::from_pretty(
2270                " I I
2271                + 3 6"
2272            )
2273        );
2274
2275        // push the 3rd right chunk (tests forward_exactly_once)
2276        tx_r.push_chunk(chunk_r3);
2277        let chunk = hash_join.next_unwrap_ready_chunk()?;
2278        assert_eq!(
2279            chunk,
2280            StreamChunk::from_pretty(
2281                " I I
2282                + 6 10"
2283            )
2284        );
2285
2286        // push the 3rd left chunk
2287        // (tests that no change if there are still matches)
2288        tx_l.push_chunk(chunk_l3);
2289        hash_join.next_unwrap_pending();
2290
2291        // push the 3rd right chunk
2292        // (tests that deletion occurs when there are no more matches)
2293        tx_l.push_chunk(chunk_l4);
2294        let chunk = hash_join.next_unwrap_ready_chunk()?;
2295        assert_eq!(
2296            chunk,
2297            StreamChunk::from_pretty(
2298                " I I
2299                - 6 10"
2300            )
2301        );
2302
2303        Ok(())
2304    }
2305
2306    #[tokio::test]
2307    async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2308        let chunk_l1 = StreamChunk::from_pretty(
2309            "  I I
2310             + 1 4
2311             + 2 5
2312             + 3 6",
2313        );
2314        let chunk_l2 = StreamChunk::from_pretty(
2315            "  I I
2316             + 3 8
2317             - 3 8",
2318        );
2319        let chunk_r1 = StreamChunk::from_pretty(
2320            "  I I
2321             + 2 7
2322             + 4 8
2323             + 6 9",
2324        );
2325        let chunk_r2 = StreamChunk::from_pretty(
2326            "  I  I
2327             + 3 10
2328             + 6 11
2329             + 1 2
2330             + 1 3",
2331        );
2332        let chunk_l3 = StreamChunk::from_pretty(
2333            "  I I
2334             + 9 10",
2335        );
2336        let chunk_r3 = StreamChunk::from_pretty(
2337            "  I I
2338             - 1 2",
2339        );
2340        let chunk_r4 = StreamChunk::from_pretty(
2341            "  I I
2342             - 1 3",
2343        );
2344        let (mut tx_l, mut tx_r, mut hash_join) =
2345            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2346
2347        // push the init barrier for left and right
2348        tx_l.push_barrier(test_epoch(1), false);
2349        tx_r.push_barrier(test_epoch(1), false);
2350        hash_join.next_unwrap_ready_barrier()?;
2351
2352        // push the 1st left chunk
2353        tx_l.push_chunk(chunk_l1);
2354        let chunk = hash_join.next_unwrap_ready_chunk()?;
2355        assert_eq!(
2356            chunk,
2357            StreamChunk::from_pretty(
2358                " I I
2359                + 1 4
2360                + 2 5
2361                + 3 6",
2362            )
2363        );
2364
2365        // push the init barrier for left and right
2366        tx_l.push_barrier(test_epoch(2), false);
2367        tx_r.push_barrier(test_epoch(2), false);
2368        hash_join.next_unwrap_ready_barrier()?;
2369
2370        // push the 2nd left chunk
2371        tx_l.push_chunk(chunk_l2);
2372        let chunk = hash_join.next_unwrap_ready_chunk()?;
2373        assert_eq!(
2374            chunk,
2375            StreamChunk::from_pretty(
2376                "  I I
2377                 + 3 8
2378                 - 3 8",
2379            )
2380        );
2381
2382        // push the 1st right chunk
2383        tx_r.push_chunk(chunk_r1);
2384        let chunk = hash_join.next_unwrap_ready_chunk()?;
2385        assert_eq!(
2386            chunk,
2387            StreamChunk::from_pretty(
2388                " I I
2389                - 2 5"
2390            )
2391        );
2392
2393        // push the 2nd right chunk
2394        tx_r.push_chunk(chunk_r2);
2395        let chunk = hash_join.next_unwrap_ready_chunk()?;
2396        assert_eq!(
2397            chunk,
2398            StreamChunk::from_pretty(
2399                " I I
2400                - 3 6
2401                - 1 4"
2402            )
2403        );
2404
2405        // push the 3rd left chunk (tests forward_exactly_once)
2406        tx_l.push_chunk(chunk_l3);
2407        let chunk = hash_join.next_unwrap_ready_chunk()?;
2408        assert_eq!(
2409            chunk,
2410            StreamChunk::from_pretty(
2411                " I I
2412                + 9 10"
2413            )
2414        );
2415
2416        // push the 3rd right chunk
2417        // (tests that no change if there are still matches)
2418        tx_r.push_chunk(chunk_r3);
2419        hash_join.next_unwrap_pending();
2420
2421        // push the 4th right chunk
2422        // (tests that insertion occurs when there are no more matches)
2423        tx_r.push_chunk(chunk_r4);
2424        let chunk = hash_join.next_unwrap_ready_chunk()?;
2425        assert_eq!(
2426            chunk,
2427            StreamChunk::from_pretty(
2428                " I I
2429                + 1 4"
2430            )
2431        );
2432
2433        Ok(())
2434    }
2435
2436    #[tokio::test]
2437    async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2438        let chunk_r1 = StreamChunk::from_pretty(
2439            "  I I
2440             + 1 4
2441             + 2 5
2442             + 3 6",
2443        );
2444        let chunk_r2 = StreamChunk::from_pretty(
2445            "  I I
2446             + 3 8
2447             - 3 8",
2448        );
2449        let chunk_l1 = StreamChunk::from_pretty(
2450            "  I I
2451             + 2 7
2452             + 4 8
2453             + 6 9",
2454        );
2455        let chunk_l2 = StreamChunk::from_pretty(
2456            "  I  I
2457             + 3 10
2458             + 6 11
2459             + 1 2
2460             + 1 3",
2461        );
2462        let chunk_r3 = StreamChunk::from_pretty(
2463            "  I I
2464             + 9 10",
2465        );
2466        let chunk_l3 = StreamChunk::from_pretty(
2467            "  I I
2468             - 1 2",
2469        );
2470        let chunk_l4 = StreamChunk::from_pretty(
2471            "  I I
2472             - 1 3",
2473        );
2474        let (mut tx_r, mut tx_l, mut hash_join) =
2475            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2476
2477        // push the init barrier for left and right
2478        tx_r.push_barrier(test_epoch(1), false);
2479        tx_l.push_barrier(test_epoch(1), false);
2480        hash_join.next_unwrap_ready_barrier()?;
2481
2482        // push the 1st right chunk
2483        tx_r.push_chunk(chunk_r1);
2484        let chunk = hash_join.next_unwrap_ready_chunk()?;
2485        assert_eq!(
2486            chunk,
2487            StreamChunk::from_pretty(
2488                " I I
2489                + 1 4
2490                + 2 5
2491                + 3 6",
2492            )
2493        );
2494
2495        // push the init barrier for left and right
2496        tx_r.push_barrier(test_epoch(2), false);
2497        tx_l.push_barrier(test_epoch(2), false);
2498        hash_join.next_unwrap_ready_barrier()?;
2499
2500        // push the 2nd right chunk
2501        tx_r.push_chunk(chunk_r2);
2502        let chunk = hash_join.next_unwrap_ready_chunk()?;
2503        assert_eq!(
2504            chunk,
2505            StreamChunk::from_pretty(
2506                "  I I
2507                 + 3 8
2508                 - 3 8",
2509            )
2510        );
2511
2512        // push the 1st left chunk
2513        tx_l.push_chunk(chunk_l1);
2514        let chunk = hash_join.next_unwrap_ready_chunk()?;
2515        assert_eq!(
2516            chunk,
2517            StreamChunk::from_pretty(
2518                " I I
2519                - 2 5"
2520            )
2521        );
2522
2523        // push the 2nd left chunk
2524        tx_l.push_chunk(chunk_l2);
2525        let chunk = hash_join.next_unwrap_ready_chunk()?;
2526        assert_eq!(
2527            chunk,
2528            StreamChunk::from_pretty(
2529                " I I
2530                - 3 6
2531                - 1 4"
2532            )
2533        );
2534
2535        // push the 3rd right chunk (tests forward_exactly_once)
2536        tx_r.push_chunk(chunk_r3);
2537        let chunk = hash_join.next_unwrap_ready_chunk()?;
2538        assert_eq!(
2539            chunk,
2540            StreamChunk::from_pretty(
2541                " I I
2542                + 9 10"
2543            )
2544        );
2545
2546        // push the 3rd left chunk
2547        // (tests that no change if there are still matches)
2548        tx_l.push_chunk(chunk_l3);
2549        hash_join.next_unwrap_pending();
2550
2551        // push the 4th left chunk
2552        // (tests that insertion occurs when there are no more matches)
2553        tx_l.push_chunk(chunk_l4);
2554        let chunk = hash_join.next_unwrap_ready_chunk()?;
2555        assert_eq!(
2556            chunk,
2557            StreamChunk::from_pretty(
2558                " I I
2559                + 1 4"
2560            )
2561        );
2562
2563        Ok(())
2564    }
2565
2566    #[tokio::test]
2567    async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2568        let chunk_l1 = StreamChunk::from_pretty(
2569            "  I I
2570             + 1 4
2571             + 2 5
2572             + 3 6",
2573        );
2574        let chunk_l2 = StreamChunk::from_pretty(
2575            "  I I
2576             + 6 8
2577             + 3 8",
2578        );
2579        let chunk_r1 = StreamChunk::from_pretty(
2580            "  I I
2581             + 2 7
2582             + 4 8
2583             + 6 9",
2584        );
2585        let chunk_r2 = StreamChunk::from_pretty(
2586            "  I  I
2587             + 3 10
2588             + 6 11",
2589        );
2590        let (mut tx_l, mut tx_r, mut hash_join) =
2591            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2592
2593        // push the init barrier for left and right
2594        tx_l.push_barrier(test_epoch(1), false);
2595        tx_r.push_barrier(test_epoch(1), false);
2596        hash_join.next_unwrap_ready_barrier()?;
2597
2598        // push the 1st left chunk
2599        tx_l.push_chunk(chunk_l1);
2600        hash_join.next_unwrap_pending();
2601
2602        // push a barrier to left side
2603        tx_l.push_barrier(test_epoch(2), false);
2604
2605        // push the 2nd left chunk
2606        tx_l.push_chunk(chunk_l2);
2607
2608        // join the first right chunk
2609        tx_r.push_chunk(chunk_r1);
2610
2611        // Consume stream chunk
2612        let chunk = hash_join.next_unwrap_ready_chunk()?;
2613        assert_eq!(
2614            chunk,
2615            StreamChunk::from_pretty(
2616                " I I I I
2617                + 2 5 2 7"
2618            )
2619        );
2620
2621        // push a barrier to right side
2622        tx_r.push_barrier(test_epoch(2), false);
2623
2624        // get the aligned barrier here
2625        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2626        assert!(matches!(
2627            hash_join.next_unwrap_ready_barrier()?,
2628            Barrier {
2629                epoch,
2630                mutation: None,
2631                ..
2632            } if epoch == expected_epoch
2633        ));
2634
2635        // join the 2nd left chunk
2636        let chunk = hash_join.next_unwrap_ready_chunk()?;
2637        assert_eq!(
2638            chunk,
2639            StreamChunk::from_pretty(
2640                " I I I I
2641                + 6 8 6 9"
2642            )
2643        );
2644
2645        // push the 2nd right chunk
2646        tx_r.push_chunk(chunk_r2);
2647        let chunk = hash_join.next_unwrap_ready_chunk()?;
2648        assert_eq!(
2649            chunk,
2650            StreamChunk::from_pretty(
2651                " I I I I
2652                + 3 6 3 10
2653                + 3 8 3 10
2654                + 6 8 6 11"
2655            )
2656        );
2657
2658        Ok(())
2659    }
2660
2661    #[tokio::test]
2662    async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2663        let chunk_l1 = StreamChunk::from_pretty(
2664            "  I I
2665             + 1 4
2666             + 2 .
2667             + 3 .",
2668        );
2669        let chunk_l2 = StreamChunk::from_pretty(
2670            "  I I
2671             + 6 .
2672             + 3 8",
2673        );
2674        let chunk_r1 = StreamChunk::from_pretty(
2675            "  I I
2676             + 2 7
2677             + 4 8
2678             + 6 9",
2679        );
2680        let chunk_r2 = StreamChunk::from_pretty(
2681            "  I  I
2682             + 3 10
2683             + 6 11",
2684        );
2685        let (mut tx_l, mut tx_r, mut hash_join) =
2686            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2687
2688        // push the init barrier for left and right
2689        tx_l.push_barrier(test_epoch(1), false);
2690        tx_r.push_barrier(test_epoch(1), false);
2691        hash_join.next_unwrap_ready_barrier()?;
2692
2693        // push the 1st left chunk
2694        tx_l.push_chunk(chunk_l1);
2695        hash_join.next_unwrap_pending();
2696
2697        // push a barrier to left side
2698        tx_l.push_barrier(test_epoch(2), false);
2699
2700        // push the 2nd left chunk
2701        tx_l.push_chunk(chunk_l2);
2702
2703        // join the first right chunk
2704        tx_r.push_chunk(chunk_r1);
2705
2706        // Consume stream chunk
2707        let chunk = hash_join.next_unwrap_ready_chunk()?;
2708        assert_eq!(
2709            chunk,
2710            StreamChunk::from_pretty(
2711                " I I I I
2712                + 2 . 2 7"
2713            )
2714        );
2715
2716        // push a barrier to right side
2717        tx_r.push_barrier(test_epoch(2), false);
2718
2719        // get the aligned barrier here
2720        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2721        assert!(matches!(
2722            hash_join.next_unwrap_ready_barrier()?,
2723            Barrier {
2724                epoch,
2725                mutation: None,
2726                ..
2727            } if epoch == expected_epoch
2728        ));
2729
2730        // join the 2nd left chunk
2731        let chunk = hash_join.next_unwrap_ready_chunk()?;
2732        assert_eq!(
2733            chunk,
2734            StreamChunk::from_pretty(
2735                " I I I I
2736                + 6 . 6 9"
2737            )
2738        );
2739
2740        // push the 2nd right chunk
2741        tx_r.push_chunk(chunk_r2);
2742        let chunk = hash_join.next_unwrap_ready_chunk()?;
2743        assert_eq!(
2744            chunk,
2745            StreamChunk::from_pretty(
2746                " I I I I
2747                + 3 8 3 10
2748                + 3 . 3 10
2749                + 6 . 6 11"
2750            )
2751        );
2752
2753        Ok(())
2754    }
2755
2756    #[tokio::test]
2757    async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2758        let chunk_l1 = StreamChunk::from_pretty(
2759            "  I I
2760             + 1 4
2761             + 2 5
2762             + 3 6",
2763        );
2764        let chunk_l2 = StreamChunk::from_pretty(
2765            "  I I
2766             + 3 8
2767             - 3 8",
2768        );
2769        let chunk_r1 = StreamChunk::from_pretty(
2770            "  I I
2771             + 2 7
2772             + 4 8
2773             + 6 9",
2774        );
2775        let chunk_r2 = StreamChunk::from_pretty(
2776            "  I  I
2777             + 3 10
2778             + 6 11",
2779        );
2780        let (mut tx_l, mut tx_r, mut hash_join) =
2781            create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2782
2783        // push the init barrier for left and right
2784        tx_l.push_barrier(test_epoch(1), false);
2785        tx_r.push_barrier(test_epoch(1), false);
2786        hash_join.next_unwrap_ready_barrier()?;
2787
2788        // push the 1st left chunk
2789        tx_l.push_chunk(chunk_l1);
2790        let chunk = hash_join.next_unwrap_ready_chunk()?;
2791        assert_eq!(
2792            chunk,
2793            StreamChunk::from_pretty(
2794                " I I I I
2795                + 1 4 . .
2796                + 2 5 . .
2797                + 3 6 . ."
2798            )
2799        );
2800
2801        // push the 2nd left chunk
2802        tx_l.push_chunk(chunk_l2);
2803        let chunk = hash_join.next_unwrap_ready_chunk()?;
2804        assert_eq!(
2805            chunk,
2806            StreamChunk::from_pretty(
2807                " I I I I
2808                + 3 8 . .
2809                - 3 8 . ."
2810            )
2811        );
2812
2813        // push the 1st right chunk
2814        tx_r.push_chunk(chunk_r1);
2815        let chunk = hash_join.next_unwrap_ready_chunk()?;
2816        assert_eq!(
2817            chunk,
2818            StreamChunk::from_pretty(
2819                "  I I I I
2820                U- 2 5 . .
2821                U+ 2 5 2 7"
2822            )
2823        );
2824
2825        // push the 2nd right chunk
2826        tx_r.push_chunk(chunk_r2);
2827        let chunk = hash_join.next_unwrap_ready_chunk()?;
2828        assert_eq!(
2829            chunk,
2830            StreamChunk::from_pretty(
2831                "  I I I I
2832                U- 3 6 . .
2833                U+ 3 6 3 10"
2834            )
2835        );
2836
2837        Ok(())
2838    }
2839
2840    #[tokio::test]
2841    async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2842        let chunk_l1 = StreamChunk::from_pretty(
2843            "  I I
2844             + 1 4
2845             + 2 5
2846             + . 6",
2847        );
2848        let chunk_l2 = StreamChunk::from_pretty(
2849            "  I I
2850             + . 8
2851             - . 8",
2852        );
2853        let chunk_r1 = StreamChunk::from_pretty(
2854            "  I I
2855             + 2 7
2856             + 4 8
2857             + 6 9",
2858        );
2859        let chunk_r2 = StreamChunk::from_pretty(
2860            "  I  I
2861             + . 10
2862             + 6 11",
2863        );
2864        let (mut tx_l, mut tx_r, mut hash_join) =
2865            create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2866
2867        // push the init barrier for left and right
2868        tx_l.push_barrier(test_epoch(1), false);
2869        tx_r.push_barrier(test_epoch(1), false);
2870        hash_join.next_unwrap_ready_barrier()?;
2871
2872        // push the 1st left chunk
2873        tx_l.push_chunk(chunk_l1);
2874        let chunk = hash_join.next_unwrap_ready_chunk()?;
2875        assert_eq!(
2876            chunk,
2877            StreamChunk::from_pretty(
2878                " I I I I
2879                + 1 4 . .
2880                + 2 5 . .
2881                + . 6 . ."
2882            )
2883        );
2884
2885        // push the 2nd left chunk
2886        tx_l.push_chunk(chunk_l2);
2887        let chunk = hash_join.next_unwrap_ready_chunk()?;
2888        assert_eq!(
2889            chunk,
2890            StreamChunk::from_pretty(
2891                " I I I I
2892                + . 8 . .
2893                - . 8 . ."
2894            )
2895        );
2896
2897        // push the 1st right chunk
2898        tx_r.push_chunk(chunk_r1);
2899        let chunk = hash_join.next_unwrap_ready_chunk()?;
2900        assert_eq!(
2901            chunk,
2902            StreamChunk::from_pretty(
2903                "  I I I I
2904                U- 2 5 . .
2905                U+ 2 5 2 7"
2906            )
2907        );
2908
2909        // push the 2nd right chunk
2910        tx_r.push_chunk(chunk_r2);
2911        let chunk = hash_join.next_unwrap_ready_chunk()?;
2912        assert_eq!(
2913            chunk,
2914            StreamChunk::from_pretty(
2915                "  I I I I
2916                U- . 6 . .
2917                U+ . 6 . 10"
2918            )
2919        );
2920
2921        Ok(())
2922    }
2923
2924    #[tokio::test]
2925    async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2926        let chunk_l1 = StreamChunk::from_pretty(
2927            "  I I
2928             + 1 4
2929             + 2 5
2930             + 3 6",
2931        );
2932        let chunk_l2 = StreamChunk::from_pretty(
2933            "  I I
2934             + 3 8
2935             - 3 8",
2936        );
2937        let chunk_r1 = StreamChunk::from_pretty(
2938            "  I I
2939             + 2 7
2940             + 4 8
2941             + 6 9",
2942        );
2943        let chunk_r2 = StreamChunk::from_pretty(
2944            "  I  I
2945             + 5 10
2946             - 5 10",
2947        );
2948        let (mut tx_l, mut tx_r, mut hash_join) =
2949            create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
2950
2951        // push the init barrier for left and right
2952        tx_l.push_barrier(test_epoch(1), false);
2953        tx_r.push_barrier(test_epoch(1), false);
2954        hash_join.next_unwrap_ready_barrier()?;
2955
2956        // push the 1st left chunk
2957        tx_l.push_chunk(chunk_l1);
2958        hash_join.next_unwrap_pending();
2959
2960        // push the 2nd left chunk
2961        tx_l.push_chunk(chunk_l2);
2962        hash_join.next_unwrap_pending();
2963
2964        // push the 1st right chunk
2965        tx_r.push_chunk(chunk_r1);
2966        let chunk = hash_join.next_unwrap_ready_chunk()?;
2967        assert_eq!(
2968            chunk,
2969            StreamChunk::from_pretty(
2970                " I I I I
2971                + 2 5 2 7
2972                + . . 4 8
2973                + . . 6 9"
2974            )
2975        );
2976
2977        // push the 2nd right chunk
2978        tx_r.push_chunk(chunk_r2);
2979        let chunk = hash_join.next_unwrap_ready_chunk()?;
2980        assert_eq!(
2981            chunk,
2982            StreamChunk::from_pretty(
2983                " I I I I
2984                + . . 5 10
2985                - . . 5 10"
2986            )
2987        );
2988
2989        Ok(())
2990    }
2991
2992    #[tokio::test]
2993    async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
2994        let chunk_l1 = StreamChunk::from_pretty(
2995            "  I I I
2996             + 1 4 1
2997             + 2 5 2
2998             + 3 6 3",
2999        );
3000        let chunk_l2 = StreamChunk::from_pretty(
3001            "  I I I
3002             + 4 9 4
3003             + 5 10 5",
3004        );
3005        let chunk_r1 = StreamChunk::from_pretty(
3006            "  I I I
3007             + 2 5 1
3008             + 4 9 2
3009             + 6 9 3",
3010        );
3011        let chunk_r2 = StreamChunk::from_pretty(
3012            "  I I I
3013             + 1 4 4
3014             + 3 6 5",
3015        );
3016
3017        let (mut tx_l, mut tx_r, mut hash_join) =
3018            create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3019
3020        // push the init barrier for left and right
3021        tx_l.push_barrier(test_epoch(1), false);
3022        tx_r.push_barrier(test_epoch(1), false);
3023        hash_join.next_unwrap_ready_barrier()?;
3024
3025        // push the 1st left chunk
3026        tx_l.push_chunk(chunk_l1);
3027        let chunk = hash_join.next_unwrap_ready_chunk()?;
3028        assert_eq!(
3029            chunk,
3030            StreamChunk::from_pretty(
3031                " I I I I I I
3032                + 1 4 1 . . .
3033                + 2 5 2 . . .
3034                + 3 6 3 . . ."
3035            )
3036        );
3037
3038        // push the 2nd left chunk
3039        tx_l.push_chunk(chunk_l2);
3040        let chunk = hash_join.next_unwrap_ready_chunk()?;
3041        assert_eq!(
3042            chunk,
3043            StreamChunk::from_pretty(
3044                " I I I I I I
3045                + 4 9 4 . . .
3046                + 5 10 5 . . ."
3047            )
3048        );
3049
3050        // push the 1st right chunk
3051        tx_r.push_chunk(chunk_r1);
3052        let chunk = hash_join.next_unwrap_ready_chunk()?;
3053        assert_eq!(
3054            chunk,
3055            StreamChunk::from_pretty(
3056                "  I I I I I I
3057                U- 2 5 2 . . .
3058                U+ 2 5 2 2 5 1
3059                U- 4 9 4 . . .
3060                U+ 4 9 4 4 9 2"
3061            )
3062        );
3063
3064        // push the 2nd right chunk
3065        tx_r.push_chunk(chunk_r2);
3066        let chunk = hash_join.next_unwrap_ready_chunk()?;
3067        assert_eq!(
3068            chunk,
3069            StreamChunk::from_pretty(
3070                "  I I I I I I
3071                U- 1 4 1 . . .
3072                U+ 1 4 1 1 4 4
3073                U- 3 6 3 . . .
3074                U+ 3 6 3 3 6 5"
3075            )
3076        );
3077
3078        Ok(())
3079    }
3080
3081    #[tokio::test]
3082    async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3083        let chunk_l1 = StreamChunk::from_pretty(
3084            "  I I I
3085             + 1 4 1
3086             + 2 5 2
3087             + 3 6 3",
3088        );
3089        let chunk_l2 = StreamChunk::from_pretty(
3090            "  I I I
3091             + 4 9 4
3092             + 5 10 5",
3093        );
3094        let chunk_r1 = StreamChunk::from_pretty(
3095            "  I I I
3096             + 2 5 1
3097             + 4 9 2
3098             + 6 9 3",
3099        );
3100        let chunk_r2 = StreamChunk::from_pretty(
3101            "  I I I
3102             + 1 4 4
3103             + 3 6 5
3104             + 7 7 6",
3105        );
3106
3107        let (mut tx_l, mut tx_r, mut hash_join) =
3108            create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3109
3110        // push the init barrier for left and right
3111        tx_l.push_barrier(test_epoch(1), false);
3112        tx_r.push_barrier(test_epoch(1), false);
3113        hash_join.next_unwrap_ready_barrier()?;
3114
3115        // push the 1st left chunk
3116        tx_l.push_chunk(chunk_l1);
3117        hash_join.next_unwrap_pending();
3118
3119        // push the 2nd left chunk
3120        tx_l.push_chunk(chunk_l2);
3121        hash_join.next_unwrap_pending();
3122
3123        // push the 1st right chunk
3124        tx_r.push_chunk(chunk_r1);
3125        let chunk = hash_join.next_unwrap_ready_chunk()?;
3126        assert_eq!(
3127            chunk,
3128            StreamChunk::from_pretty(
3129                "  I I I I I I
3130                + 2 5 2 2 5 1
3131                + 4 9 4 4 9 2
3132                + . . . 6 9 3"
3133            )
3134        );
3135
3136        // push the 2nd right chunk
3137        tx_r.push_chunk(chunk_r2);
3138        let chunk = hash_join.next_unwrap_ready_chunk()?;
3139        assert_eq!(
3140            chunk,
3141            StreamChunk::from_pretty(
3142                "  I I I I I I
3143                + 1 4 1 1 4 4
3144                + 3 6 3 3 6 5
3145                + . . . 7 7 6"
3146            )
3147        );
3148
3149        Ok(())
3150    }
3151
3152    #[tokio::test]
3153    async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3154        let chunk_l1 = StreamChunk::from_pretty(
3155            "  I I
3156             + 1 4
3157             + 2 5
3158             + 3 6",
3159        );
3160        let chunk_l2 = StreamChunk::from_pretty(
3161            "  I I
3162             + 3 8
3163             - 3 8",
3164        );
3165        let chunk_r1 = StreamChunk::from_pretty(
3166            "  I I
3167             + 2 7
3168             + 4 8
3169             + 6 9",
3170        );
3171        let chunk_r2 = StreamChunk::from_pretty(
3172            "  I  I
3173             + 5 10
3174             - 5 10",
3175        );
3176        let (mut tx_l, mut tx_r, mut hash_join) =
3177            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3178
3179        // push the init barrier for left and right
3180        tx_l.push_barrier(test_epoch(1), false);
3181        tx_r.push_barrier(test_epoch(1), false);
3182        hash_join.next_unwrap_ready_barrier()?;
3183
3184        // push the 1st left chunk
3185        tx_l.push_chunk(chunk_l1);
3186        let chunk = hash_join.next_unwrap_ready_chunk()?;
3187        assert_eq!(
3188            chunk,
3189            StreamChunk::from_pretty(
3190                " I I I I
3191                + 1 4 . .
3192                + 2 5 . .
3193                + 3 6 . ."
3194            )
3195        );
3196
3197        // push the 2nd left chunk
3198        tx_l.push_chunk(chunk_l2);
3199        let chunk = hash_join.next_unwrap_ready_chunk()?;
3200        assert_eq!(
3201            chunk,
3202            StreamChunk::from_pretty(
3203                " I I I I
3204                + 3 8 . .
3205                - 3 8 . ."
3206            )
3207        );
3208
3209        // push the 1st right chunk
3210        tx_r.push_chunk(chunk_r1);
3211        let chunk = hash_join.next_unwrap_ready_chunk()?;
3212        assert_eq!(
3213            chunk,
3214            StreamChunk::from_pretty(
3215                "  I I I I
3216                U-  2 5 . .
3217                U+  2 5 2 7
3218                +  . . 4 8
3219                +  . . 6 9"
3220            )
3221        );
3222
3223        // push the 2nd right chunk
3224        tx_r.push_chunk(chunk_r2);
3225        let chunk = hash_join.next_unwrap_ready_chunk()?;
3226        assert_eq!(
3227            chunk,
3228            StreamChunk::from_pretty(
3229                " I I I I
3230                + . . 5 10
3231                - . . 5 10"
3232            )
3233        );
3234
3235        Ok(())
3236    }
3237
3238    #[tokio::test]
3239    async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3240        let (mut tx_l, mut tx_r, mut hash_join) =
3241            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3242
3243        // push the init barrier for left and right
3244        tx_l.push_barrier(test_epoch(1), false);
3245        tx_r.push_barrier(test_epoch(1), false);
3246        hash_join.next_unwrap_ready_barrier()?;
3247
3248        tx_l.push_chunk(StreamChunk::from_pretty(
3249            "  I I
3250             + 1 1
3251            ",
3252        ));
3253        let chunk = hash_join.next_unwrap_ready_chunk()?;
3254        assert_eq!(
3255            chunk,
3256            StreamChunk::from_pretty(
3257                " I I I I
3258                + 1 1 . ."
3259            )
3260        );
3261
3262        tx_r.push_chunk(StreamChunk::from_pretty(
3263            "  I I
3264             + 1 1
3265            ",
3266        ));
3267        let chunk = hash_join.next_unwrap_ready_chunk()?;
3268
3269        assert_eq!(
3270            chunk,
3271            StreamChunk::from_pretty(
3272                " I I I I
3273                U- 1 1 . .
3274                U+ 1 1 1 1"
3275            )
3276        );
3277
3278        tx_l.push_chunk(StreamChunk::from_pretty(
3279            "   I I
3280              - 1 1
3281              + 1 2
3282            ",
3283        ));
3284        let chunk = hash_join.next_unwrap_ready_chunk()?;
3285        let chunk = chunk.compact();
3286        assert_eq!(
3287            chunk,
3288            StreamChunk::from_pretty(
3289                " I I I I
3290                - 1 1 1 1
3291                + 1 2 1 1
3292                "
3293            )
3294        );
3295
3296        Ok(())
3297    }
3298
3299    #[tokio::test]
3300    async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3301    {
3302        let chunk_l1 = StreamChunk::from_pretty(
3303            "  I I
3304             + 1 4
3305             + 2 5
3306             + 3 6
3307             + 3 7",
3308        );
3309        let chunk_l2 = StreamChunk::from_pretty(
3310            "  I I
3311             + 3 8
3312             - 3 8
3313             - 1 4", // delete row to cause an empty JoinHashEntry
3314        );
3315        let chunk_r1 = StreamChunk::from_pretty(
3316            "  I I
3317             + 2 6
3318             + 4 8
3319             + 3 4",
3320        );
3321        let chunk_r2 = StreamChunk::from_pretty(
3322            "  I  I
3323             + 5 10
3324             - 5 10
3325             + 1 2",
3326        );
3327        let (mut tx_l, mut tx_r, mut hash_join) =
3328            create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3329
3330        // push the init barrier for left and right
3331        tx_l.push_barrier(test_epoch(1), false);
3332        tx_r.push_barrier(test_epoch(1), false);
3333        hash_join.next_unwrap_ready_barrier()?;
3334
3335        // push the 1st left chunk
3336        tx_l.push_chunk(chunk_l1);
3337        let chunk = hash_join.next_unwrap_ready_chunk()?;
3338        assert_eq!(
3339            chunk,
3340            StreamChunk::from_pretty(
3341                " I I I I
3342                + 1 4 . .
3343                + 2 5 . .
3344                + 3 6 . .
3345                + 3 7 . ."
3346            )
3347        );
3348
3349        // push the 2nd left chunk
3350        tx_l.push_chunk(chunk_l2);
3351        let chunk = hash_join.next_unwrap_ready_chunk()?;
3352        assert_eq!(
3353            chunk,
3354            StreamChunk::from_pretty(
3355                " I I I I
3356                + 3 8 . .
3357                - 3 8 . .
3358                - 1 4 . ."
3359            )
3360        );
3361
3362        // push the 1st right chunk
3363        tx_r.push_chunk(chunk_r1);
3364        let chunk = hash_join.next_unwrap_ready_chunk()?;
3365        assert_eq!(
3366            chunk,
3367            StreamChunk::from_pretty(
3368                "  I I I I
3369                U-  2 5 . .
3370                U+  2 5 2 6
3371                +  . . 4 8
3372                +  . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once
3373                             * despite matching on eq join on 2
3374                             * entries */
3375            )
3376        );
3377
3378        // push the 2nd right chunk
3379        tx_r.push_chunk(chunk_r2);
3380        let chunk = hash_join.next_unwrap_ready_chunk()?;
3381        assert_eq!(
3382            chunk,
3383            StreamChunk::from_pretty(
3384                " I I I I
3385                + . . 5 10
3386                - . . 5 10
3387                + . . 1 2" /* regression test (#2420): 1 2 forwarded even if matches on an empty
3388                            * join entry */
3389            )
3390        );
3391
3392        Ok(())
3393    }
3394
3395    #[tokio::test]
3396    async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3397        let chunk_l1 = StreamChunk::from_pretty(
3398            "  I I
3399             + 1 4
3400             + 2 10
3401             + 3 6",
3402        );
3403        let chunk_l2 = StreamChunk::from_pretty(
3404            "  I I
3405             + 3 8
3406             - 3 8",
3407        );
3408        let chunk_r1 = StreamChunk::from_pretty(
3409            "  I I
3410             + 2 7
3411             + 4 8
3412             + 6 9",
3413        );
3414        let chunk_r2 = StreamChunk::from_pretty(
3415            "  I  I
3416             + 3 10
3417             + 6 11",
3418        );
3419        let (mut tx_l, mut tx_r, mut hash_join) =
3420            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3421
3422        // push the init barrier for left and right
3423        tx_l.push_barrier(test_epoch(1), false);
3424        tx_r.push_barrier(test_epoch(1), false);
3425        hash_join.next_unwrap_ready_barrier()?;
3426
3427        // push the 1st left chunk
3428        tx_l.push_chunk(chunk_l1);
3429        hash_join.next_unwrap_pending();
3430
3431        // push the 2nd left chunk
3432        tx_l.push_chunk(chunk_l2);
3433        hash_join.next_unwrap_pending();
3434
3435        // push the 1st right chunk
3436        tx_r.push_chunk(chunk_r1);
3437        hash_join.next_unwrap_pending();
3438
3439        // push the 2nd right chunk
3440        tx_r.push_chunk(chunk_r2);
3441        let chunk = hash_join.next_unwrap_ready_chunk()?;
3442        assert_eq!(
3443            chunk,
3444            StreamChunk::from_pretty(
3445                " I I I I
3446                + 3 6 3 10"
3447            )
3448        );
3449
3450        Ok(())
3451    }
3452
3453    #[tokio::test]
3454    async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3455        let (mut tx_l, mut tx_r, mut hash_join) =
3456            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3457
3458        // push the init barrier for left and right
3459        tx_l.push_barrier(test_epoch(1), false);
3460        tx_r.push_barrier(test_epoch(1), false);
3461        hash_join.next_unwrap_ready_barrier()?;
3462
3463        tx_l.push_int64_watermark(0, 100);
3464
3465        tx_l.push_int64_watermark(0, 200);
3466
3467        tx_l.push_barrier(test_epoch(2), false);
3468        tx_r.push_barrier(test_epoch(2), false);
3469        hash_join.next_unwrap_ready_barrier()?;
3470
3471        tx_r.push_int64_watermark(0, 50);
3472
3473        let w1 = hash_join.next().await.unwrap().unwrap();
3474        let w1 = w1.as_watermark().unwrap();
3475
3476        let w2 = hash_join.next().await.unwrap().unwrap();
3477        let w2 = w2.as_watermark().unwrap();
3478
3479        tx_r.push_int64_watermark(0, 100);
3480
3481        let w3 = hash_join.next().await.unwrap().unwrap();
3482        let w3 = w3.as_watermark().unwrap();
3483
3484        let w4 = hash_join.next().await.unwrap().unwrap();
3485        let w4 = w4.as_watermark().unwrap();
3486
3487        assert_eq!(
3488            w1,
3489            &Watermark {
3490                col_idx: 2,
3491                data_type: DataType::Int64,
3492                val: ScalarImpl::Int64(50)
3493            }
3494        );
3495
3496        assert_eq!(
3497            w2,
3498            &Watermark {
3499                col_idx: 0,
3500                data_type: DataType::Int64,
3501                val: ScalarImpl::Int64(50)
3502            }
3503        );
3504
3505        assert_eq!(
3506            w3,
3507            &Watermark {
3508                col_idx: 2,
3509                data_type: DataType::Int64,
3510                val: ScalarImpl::Int64(100)
3511            }
3512        );
3513
3514        assert_eq!(
3515            w4,
3516            &Watermark {
3517                col_idx: 0,
3518                data_type: DataType::Int64,
3519                val: ScalarImpl::Int64(100)
3520            }
3521        );
3522
3523        Ok(())
3524    }
3525}