risingwave_stream/executor/
hash_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::assert_matches::assert_matches;
15use std::collections::{BTreeMap, HashSet};
16use std::marker::PhantomData;
17use std::time::Duration;
18
19use anyhow::Context;
20use itertools::Itertools;
21use multimap::MultiMap;
22use risingwave_common::array::Op;
23use risingwave_common::hash::{HashKey, NullBitmap};
24use risingwave_common::row::RowExt;
25use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_expr::ExprError;
29use risingwave_expr::expr::NonStrictExpression;
30use tokio::time::Instant;
31
32use self::builder::JoinChunkBuilder;
33use super::barrier_align::*;
34use super::join::hash_join::*;
35use super::join::row::{JoinEncoding, JoinRow};
36use super::join::*;
37use super::watermark::*;
38use crate::executor::CachedJoinRow;
39use crate::executor::join::builder::JoinStreamChunkBuilder;
40use crate::executor::join::hash_join::CacheResult;
41use crate::executor::prelude::*;
42
43/// Evict the cache every n rows.
44const EVICT_EVERY_N_ROWS: u32 = 16;
45
46fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
47    HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
48}
49
50pub struct JoinParams {
51    /// Indices of the join keys
52    pub join_key_indices: Vec<usize>,
53    /// Indices of the input pk after dedup
54    pub deduped_pk_indices: Vec<usize>,
55}
56
57impl JoinParams {
58    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
59        Self {
60            join_key_indices,
61            deduped_pk_indices,
62        }
63    }
64}
65
66struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
67    /// Store all data from a one side stream
68    ht: JoinHashMap<K, S, E>,
69    /// Indices of the join key columns
70    join_key_indices: Vec<usize>,
71    /// The data type of all columns without degree.
72    all_data_types: Vec<DataType>,
73    /// The start position for the side in output new columns
74    start_pos: usize,
75    /// The mapping from input indices of a side to output columes.
76    i2o_mapping: Vec<(usize, usize)>,
77    i2o_mapping_indexed: MultiMap<usize, usize>,
78    /// The first field of the ith element indicates that when a watermark at the ith column of
79    /// this side comes, what band join conditions should be updated in order to possibly
80    /// generate a new watermark at that column or the corresponding column in the counterpart
81    /// join side.
82    ///
83    /// The second field indicates that whether the column is required less than the
84    /// the corresponding column in the counterpart join side in the band join condition.
85    input2inequality_index: Vec<Vec<(usize, bool)>>,
86    /// Some fields which are required non null to match due to inequalities.
87    non_null_fields: Vec<usize>,
88    /// (i, j) in this `Vec` means that state data in this join side can be cleaned if the value of
89    /// its ith column is less than the synthetic watermark of the jth band join condition.
90    state_clean_columns: Vec<(usize, usize)>,
91    /// Whether degree table is needed for this side.
92    need_degree_table: bool,
93    _marker: std::marker::PhantomData<E>,
94}
95
96impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        f.debug_struct("JoinSide")
99            .field("join_key_indices", &self.join_key_indices)
100            .field("col_types", &self.all_data_types)
101            .field("start_pos", &self.start_pos)
102            .field("i2o_mapping", &self.i2o_mapping)
103            .field("need_degree_table", &self.need_degree_table)
104            .finish()
105    }
106}
107
108impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
109    // WARNING: Please do not call this until we implement it.
110    fn is_dirty(&self) -> bool {
111        unimplemented!()
112    }
113
114    #[expect(dead_code)]
115    fn clear_cache(&mut self) {
116        assert!(
117            !self.is_dirty(),
118            "cannot clear cache while states of hash join are dirty"
119        );
120
121        // TODO: not working with rearranged chain
122        // self.ht.clear();
123    }
124
125    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
126        self.ht.init(epoch).await
127    }
128}
129
130/// `HashJoinExecutor` takes two input streams and runs equal hash join on them.
131/// The output columns are the concatenation of left and right columns.
132pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
133{
134    ctx: ActorContextRef,
135    info: ExecutorInfo,
136
137    /// Left input executor
138    input_l: Option<Executor>,
139    /// Right input executor
140    input_r: Option<Executor>,
141    /// The data types of the formed new columns
142    actual_output_data_types: Vec<DataType>,
143    /// The parameters of the left join executor
144    side_l: JoinSide<K, S, E>,
145    /// The parameters of the right join executor
146    side_r: JoinSide<K, S, E>,
147    /// Optional non-equi join conditions
148    cond: Option<NonStrictExpression>,
149    /// Column indices of watermark output and offset expression of each inequality, respectively.
150    inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
151    /// The output watermark of each inequality condition and its value is the minimum of the
152    /// calculation result of both side. It will be used to generate watermark into downstream
153    /// and do state cleaning if `clean_state` field of that inequality is `true`.
154    inequality_watermarks: Vec<Option<Watermark>>,
155
156    /// Whether the logic can be optimized for append-only stream
157    append_only_optimize: bool,
158
159    metrics: Arc<StreamingMetrics>,
160    /// The maximum size of the chunk produced by executor at a time
161    chunk_size: usize,
162    /// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
163    cnt_rows_received: u32,
164
165    /// watermark column index -> `BufferedWatermarks`
166    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
167
168    /// When to alert high join amplification
169    high_join_amplification_threshold: usize,
170
171    /// Max number of rows that will be cached in the entry state.
172    entry_state_max_rows: usize,
173}
174
175impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
176    for HashJoinExecutor<K, S, T, E>
177{
178    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179        f.debug_struct("HashJoinExecutor")
180            .field("join_type", &T)
181            .field("input_left", &self.input_l.as_ref().unwrap().identity())
182            .field("input_right", &self.input_r.as_ref().unwrap().identity())
183            .field("side_l", &self.side_l)
184            .field("side_r", &self.side_r)
185            .field("pk_indices", &self.info.pk_indices)
186            .field("schema", &self.info.schema)
187            .field("actual_output_data_types", &self.actual_output_data_types)
188            .finish()
189    }
190}
191
192impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
193    for HashJoinExecutor<K, S, T, E>
194{
195    fn execute(self: Box<Self>) -> BoxedMessageStream {
196        self.into_stream().boxed()
197    }
198}
199
200struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
201    ctx: &'a ActorContextRef,
202    side_l: &'a mut JoinSide<K, S, E>,
203    side_r: &'a mut JoinSide<K, S, E>,
204    actual_output_data_types: &'a [DataType],
205    cond: &'a mut Option<NonStrictExpression>,
206    inequality_watermarks: &'a [Option<Watermark>],
207    chunk: StreamChunk,
208    append_only_optimize: bool,
209    chunk_size: usize,
210    cnt_rows_received: &'a mut u32,
211    high_join_amplification_threshold: usize,
212    entry_state_max_rows: usize,
213}
214
215impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
216    HashJoinExecutor<K, S, T, E>
217{
218    #[allow(clippy::too_many_arguments)]
219    pub fn new(
220        ctx: ActorContextRef,
221        info: ExecutorInfo,
222        input_l: Executor,
223        input_r: Executor,
224        params_l: JoinParams,
225        params_r: JoinParams,
226        null_safe: Vec<bool>,
227        output_indices: Vec<usize>,
228        cond: Option<NonStrictExpression>,
229        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
230        state_table_l: StateTable<S>,
231        degree_state_table_l: StateTable<S>,
232        state_table_r: StateTable<S>,
233        degree_state_table_r: StateTable<S>,
234        watermark_epoch: AtomicU64Ref,
235        is_append_only: bool,
236        metrics: Arc<StreamingMetrics>,
237        chunk_size: usize,
238        high_join_amplification_threshold: usize,
239    ) -> Self {
240        Self::new_with_cache_size(
241            ctx,
242            info,
243            input_l,
244            input_r,
245            params_l,
246            params_r,
247            null_safe,
248            output_indices,
249            cond,
250            inequality_pairs,
251            state_table_l,
252            degree_state_table_l,
253            state_table_r,
254            degree_state_table_r,
255            watermark_epoch,
256            is_append_only,
257            metrics,
258            chunk_size,
259            high_join_amplification_threshold,
260            None,
261        )
262    }
263
264    #[allow(clippy::too_many_arguments)]
265    pub fn new_with_cache_size(
266        ctx: ActorContextRef,
267        info: ExecutorInfo,
268        input_l: Executor,
269        input_r: Executor,
270        params_l: JoinParams,
271        params_r: JoinParams,
272        null_safe: Vec<bool>,
273        output_indices: Vec<usize>,
274        cond: Option<NonStrictExpression>,
275        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
276        state_table_l: StateTable<S>,
277        degree_state_table_l: StateTable<S>,
278        state_table_r: StateTable<S>,
279        degree_state_table_r: StateTable<S>,
280        watermark_epoch: AtomicU64Ref,
281        is_append_only: bool,
282        metrics: Arc<StreamingMetrics>,
283        chunk_size: usize,
284        high_join_amplification_threshold: usize,
285        entry_state_max_rows: Option<usize>,
286    ) -> Self {
287        let entry_state_max_rows = match entry_state_max_rows {
288            None => {
289                ctx.streaming_config
290                    .developer
291                    .hash_join_entry_state_max_rows
292            }
293            Some(entry_state_max_rows) => entry_state_max_rows,
294        };
295        let side_l_column_n = input_l.schema().len();
296
297        let schema_fields = match T {
298            JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
299            JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
300            _ => [
301                input_l.schema().fields.clone(),
302                input_r.schema().fields.clone(),
303            ]
304            .concat(),
305        };
306
307        let original_output_data_types = schema_fields
308            .iter()
309            .map(|field| field.data_type())
310            .collect_vec();
311        let actual_output_data_types = output_indices
312            .iter()
313            .map(|&idx| original_output_data_types[idx].clone())
314            .collect_vec();
315
316        // Data types of of hash join state.
317        let state_all_data_types_l = input_l.schema().data_types();
318        let state_all_data_types_r = input_r.schema().data_types();
319
320        let state_pk_indices_l = input_l.pk_indices().to_vec();
321        let state_pk_indices_r = input_r.pk_indices().to_vec();
322
323        let state_join_key_indices_l = params_l.join_key_indices;
324        let state_join_key_indices_r = params_r.join_key_indices;
325
326        let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
327        let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
328
329        let degree_pk_indices_l = (state_join_key_indices_l.len()
330            ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
331            .collect_vec();
332        let degree_pk_indices_r = (state_join_key_indices_r.len()
333            ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
334            .collect_vec();
335
336        // If pk is contained in join key.
337        let pk_contained_in_jk_l =
338            is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
339        let pk_contained_in_jk_r =
340            is_subset(state_pk_indices_r.clone(), state_join_key_indices_r.clone());
341
342        // check whether join key contains pk in both side
343        let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
344
345        let join_key_data_types_l = state_join_key_indices_l
346            .iter()
347            .map(|idx| state_all_data_types_l[*idx].clone())
348            .collect_vec();
349
350        let join_key_data_types_r = state_join_key_indices_r
351            .iter()
352            .map(|idx| state_all_data_types_r[*idx].clone())
353            .collect_vec();
354
355        assert_eq!(join_key_data_types_l, join_key_data_types_r);
356
357        let null_matched = K::Bitmap::from_bool_vec(null_safe);
358
359        let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
360        let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
361
362        let degree_state_l = need_degree_table_l.then(|| {
363            TableInner::new(
364                degree_pk_indices_l,
365                degree_join_key_indices_l,
366                degree_state_table_l,
367            )
368        });
369        let degree_state_r = need_degree_table_r.then(|| {
370            TableInner::new(
371                degree_pk_indices_r,
372                degree_join_key_indices_r,
373                degree_state_table_r,
374            )
375        });
376
377        let (left_to_output, right_to_output) = {
378            let (left_len, right_len) = if is_left_semi_or_anti(T) {
379                (state_all_data_types_l.len(), 0usize)
380            } else if is_right_semi_or_anti(T) {
381                (0usize, state_all_data_types_r.len())
382            } else {
383                (state_all_data_types_l.len(), state_all_data_types_r.len())
384            };
385            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
386        };
387
388        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
389        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
390
391        let left_input_len = input_l.schema().len();
392        let right_input_len = input_r.schema().len();
393        let mut l2inequality_index = vec![vec![]; left_input_len];
394        let mut r2inequality_index = vec![vec![]; right_input_len];
395        let mut l_state_clean_columns = vec![];
396        let mut r_state_clean_columns = vec![];
397        let inequality_pairs = inequality_pairs
398            .into_iter()
399            .enumerate()
400            .map(
401                |(
402                    index,
403                    (key_required_larger, key_required_smaller, clean_state, delta_expression),
404                )| {
405                    let output_indices = if key_required_larger < key_required_smaller {
406                        if clean_state {
407                            l_state_clean_columns.push((key_required_larger, index));
408                        }
409                        l2inequality_index[key_required_larger].push((index, false));
410                        r2inequality_index[key_required_smaller - left_input_len]
411                            .push((index, true));
412                        l2o_indexed
413                            .get_vec(&key_required_larger)
414                            .cloned()
415                            .unwrap_or_default()
416                    } else {
417                        if clean_state {
418                            r_state_clean_columns
419                                .push((key_required_larger - left_input_len, index));
420                        }
421                        l2inequality_index[key_required_smaller].push((index, true));
422                        r2inequality_index[key_required_larger - left_input_len]
423                            .push((index, false));
424                        r2o_indexed
425                            .get_vec(&(key_required_larger - left_input_len))
426                            .cloned()
427                            .unwrap_or_default()
428                    };
429                    (output_indices, delta_expression)
430                },
431            )
432            .collect_vec();
433
434        let mut l_non_null_fields = l2inequality_index
435            .iter()
436            .positions(|inequalities| !inequalities.is_empty())
437            .collect_vec();
438        let mut r_non_null_fields = r2inequality_index
439            .iter()
440            .positions(|inequalities| !inequalities.is_empty())
441            .collect_vec();
442
443        if append_only_optimize {
444            l_state_clean_columns.clear();
445            r_state_clean_columns.clear();
446            l_non_null_fields.clear();
447            r_non_null_fields.clear();
448        }
449
450        let inequality_watermarks = vec![None; inequality_pairs.len()];
451        let watermark_buffers = BTreeMap::new();
452
453        Self {
454            ctx: ctx.clone(),
455            info,
456            input_l: Some(input_l),
457            input_r: Some(input_r),
458            actual_output_data_types,
459            side_l: JoinSide {
460                ht: JoinHashMap::new(
461                    watermark_epoch.clone(),
462                    join_key_data_types_l,
463                    state_join_key_indices_l.clone(),
464                    state_all_data_types_l.clone(),
465                    state_table_l,
466                    params_l.deduped_pk_indices,
467                    degree_state_l,
468                    null_matched.clone(),
469                    pk_contained_in_jk_l,
470                    None,
471                    metrics.clone(),
472                    ctx.id,
473                    ctx.fragment_id,
474                    "left",
475                ),
476                join_key_indices: state_join_key_indices_l,
477                all_data_types: state_all_data_types_l,
478                i2o_mapping: left_to_output,
479                i2o_mapping_indexed: l2o_indexed,
480                input2inequality_index: l2inequality_index,
481                non_null_fields: l_non_null_fields,
482                state_clean_columns: l_state_clean_columns,
483                start_pos: 0,
484                need_degree_table: need_degree_table_l,
485                _marker: PhantomData,
486            },
487            side_r: JoinSide {
488                ht: JoinHashMap::new(
489                    watermark_epoch,
490                    join_key_data_types_r,
491                    state_join_key_indices_r.clone(),
492                    state_all_data_types_r.clone(),
493                    state_table_r,
494                    params_r.deduped_pk_indices,
495                    degree_state_r,
496                    null_matched,
497                    pk_contained_in_jk_r,
498                    None,
499                    metrics.clone(),
500                    ctx.id,
501                    ctx.fragment_id,
502                    "right",
503                ),
504                join_key_indices: state_join_key_indices_r,
505                all_data_types: state_all_data_types_r,
506                start_pos: side_l_column_n,
507                i2o_mapping: right_to_output,
508                i2o_mapping_indexed: r2o_indexed,
509                input2inequality_index: r2inequality_index,
510                non_null_fields: r_non_null_fields,
511                state_clean_columns: r_state_clean_columns,
512                need_degree_table: need_degree_table_r,
513                _marker: PhantomData,
514            },
515            cond,
516            inequality_pairs,
517            inequality_watermarks,
518            append_only_optimize,
519            metrics,
520            chunk_size,
521            cnt_rows_received: 0,
522            watermark_buffers,
523            high_join_amplification_threshold,
524            entry_state_max_rows,
525        }
526    }
527
528    #[try_stream(ok = Message, error = StreamExecutorError)]
529    async fn into_stream(mut self) {
530        let input_l = self.input_l.take().unwrap();
531        let input_r = self.input_r.take().unwrap();
532        let aligned_stream = barrier_align(
533            input_l.execute(),
534            input_r.execute(),
535            self.ctx.id,
536            self.ctx.fragment_id,
537            self.metrics.clone(),
538            "Join",
539        );
540        pin_mut!(aligned_stream);
541
542        let actor_id = self.ctx.id;
543
544        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
545        let first_epoch = barrier.epoch;
546        // The first barrier message should be propagated.
547        yield Message::Barrier(barrier);
548        self.side_l.init(first_epoch).await?;
549        self.side_r.init(first_epoch).await?;
550
551        let actor_id_str = self.ctx.id.to_string();
552        let fragment_id_str = self.ctx.fragment_id.to_string();
553
554        // initialized some metrics
555        let join_actor_input_waiting_duration_ns = self
556            .metrics
557            .join_actor_input_waiting_duration_ns
558            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
559        let left_join_match_duration_ns = self
560            .metrics
561            .join_match_duration_ns
562            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
563        let right_join_match_duration_ns = self
564            .metrics
565            .join_match_duration_ns
566            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
567
568        let barrier_join_match_duration_ns = self
569            .metrics
570            .join_match_duration_ns
571            .with_guarded_label_values(&[
572                actor_id_str.as_str(),
573                fragment_id_str.as_str(),
574                "barrier",
575            ]);
576
577        let left_join_cached_entry_count = self
578            .metrics
579            .join_cached_entry_count
580            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
581
582        let right_join_cached_entry_count = self
583            .metrics
584            .join_cached_entry_count
585            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
586
587        let mut start_time = Instant::now();
588
589        while let Some(msg) = aligned_stream
590            .next()
591            .instrument_await("hash_join_barrier_align")
592            .await
593        {
594            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
595            match msg? {
596                AlignedMessage::WatermarkLeft(watermark) => {
597                    for watermark_to_emit in
598                        self.handle_watermark(SideType::Left, watermark).await?
599                    {
600                        yield Message::Watermark(watermark_to_emit);
601                    }
602                }
603                AlignedMessage::WatermarkRight(watermark) => {
604                    for watermark_to_emit in
605                        self.handle_watermark(SideType::Right, watermark).await?
606                    {
607                        yield Message::Watermark(watermark_to_emit);
608                    }
609                }
610                AlignedMessage::Left(chunk) => {
611                    let mut left_time = Duration::from_nanos(0);
612                    let mut left_start_time = Instant::now();
613                    #[for_await]
614                    for chunk in Self::eq_join_left(EqJoinArgs {
615                        ctx: &self.ctx,
616                        side_l: &mut self.side_l,
617                        side_r: &mut self.side_r,
618                        actual_output_data_types: &self.actual_output_data_types,
619                        cond: &mut self.cond,
620                        inequality_watermarks: &self.inequality_watermarks,
621                        chunk,
622                        append_only_optimize: self.append_only_optimize,
623                        chunk_size: self.chunk_size,
624                        cnt_rows_received: &mut self.cnt_rows_received,
625                        high_join_amplification_threshold: self.high_join_amplification_threshold,
626                        entry_state_max_rows: self.entry_state_max_rows,
627                    }) {
628                        left_time += left_start_time.elapsed();
629                        yield Message::Chunk(chunk?);
630                        left_start_time = Instant::now();
631                    }
632                    left_time += left_start_time.elapsed();
633                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
634                    self.try_flush_data().await?;
635                }
636                AlignedMessage::Right(chunk) => {
637                    let mut right_time = Duration::from_nanos(0);
638                    let mut right_start_time = Instant::now();
639                    #[for_await]
640                    for chunk in Self::eq_join_right(EqJoinArgs {
641                        ctx: &self.ctx,
642                        side_l: &mut self.side_l,
643                        side_r: &mut self.side_r,
644                        actual_output_data_types: &self.actual_output_data_types,
645                        cond: &mut self.cond,
646                        inequality_watermarks: &self.inequality_watermarks,
647                        chunk,
648                        append_only_optimize: self.append_only_optimize,
649                        chunk_size: self.chunk_size,
650                        cnt_rows_received: &mut self.cnt_rows_received,
651                        high_join_amplification_threshold: self.high_join_amplification_threshold,
652                        entry_state_max_rows: self.entry_state_max_rows,
653                    }) {
654                        right_time += right_start_time.elapsed();
655                        yield Message::Chunk(chunk?);
656                        right_start_time = Instant::now();
657                    }
658                    right_time += right_start_time.elapsed();
659                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
660                    self.try_flush_data().await?;
661                }
662                AlignedMessage::Barrier(barrier) => {
663                    let barrier_start_time = Instant::now();
664                    let (left_post_commit, right_post_commit) =
665                        self.flush_data(barrier.epoch).await?;
666
667                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
668                    yield Message::Barrier(barrier);
669
670                    // Update the vnode bitmap for state tables of both sides if asked.
671                    right_post_commit
672                        .post_yield_barrier(update_vnode_bitmap.clone())
673                        .await?;
674                    if left_post_commit
675                        .post_yield_barrier(update_vnode_bitmap)
676                        .await?
677                        .unwrap_or(false)
678                    {
679                        self.watermark_buffers
680                            .values_mut()
681                            .for_each(|buffers| buffers.clear());
682                        self.inequality_watermarks.fill(None);
683                    }
684
685                    // Report metrics of cached join rows/entries
686                    for (join_cached_entry_count, ht) in [
687                        (&left_join_cached_entry_count, &self.side_l.ht),
688                        (&right_join_cached_entry_count, &self.side_r.ht),
689                    ] {
690                        join_cached_entry_count.set(ht.entry_count() as i64);
691                    }
692
693                    barrier_join_match_duration_ns
694                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
695                }
696            }
697            start_time = Instant::now();
698        }
699    }
700
701    async fn flush_data(
702        &mut self,
703        epoch: EpochPair,
704    ) -> StreamExecutorResult<(
705        JoinHashMapPostCommit<'_, K, S, E>,
706        JoinHashMapPostCommit<'_, K, S, E>,
707    )> {
708        // All changes to the state has been buffered in the mem-table of the state table. Just
709        // `commit` them here.
710        let left = self.side_l.ht.flush(epoch).await?;
711        let right = self.side_r.ht.flush(epoch).await?;
712        Ok((left, right))
713    }
714
715    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
716        // All changes to the state has been buffered in the mem-table of the state table. Just
717        // `commit` them here.
718        self.side_l.ht.try_flush().await?;
719        self.side_r.ht.try_flush().await?;
720        Ok(())
721    }
722
723    // We need to manually evict the cache.
724    fn evict_cache(
725        side_update: &mut JoinSide<K, S, E>,
726        side_match: &mut JoinSide<K, S, E>,
727        cnt_rows_received: &mut u32,
728    ) {
729        *cnt_rows_received += 1;
730        if *cnt_rows_received == EVICT_EVERY_N_ROWS {
731            side_update.ht.evict();
732            side_match.ht.evict();
733            *cnt_rows_received = 0;
734        }
735    }
736
737    async fn handle_watermark(
738        &mut self,
739        side: SideTypePrimitive,
740        watermark: Watermark,
741    ) -> StreamExecutorResult<Vec<Watermark>> {
742        let (side_update, side_match) = if side == SideType::Left {
743            (&mut self.side_l, &mut self.side_r)
744        } else {
745            (&mut self.side_r, &mut self.side_l)
746        };
747
748        // State cleaning
749        if side_update.join_key_indices[0] == watermark.col_idx {
750            side_match.ht.update_watermark(watermark.val.clone());
751        }
752
753        // Select watermarks to yield.
754        let wm_in_jk = side_update
755            .join_key_indices
756            .iter()
757            .positions(|idx| *idx == watermark.col_idx);
758        let mut watermarks_to_emit = vec![];
759        for idx in wm_in_jk {
760            let buffers = self
761                .watermark_buffers
762                .entry(idx)
763                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
764            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
765                let empty_indices = vec![];
766                let output_indices = side_update
767                    .i2o_mapping_indexed
768                    .get_vec(&side_update.join_key_indices[idx])
769                    .unwrap_or(&empty_indices)
770                    .iter()
771                    .chain(
772                        side_match
773                            .i2o_mapping_indexed
774                            .get_vec(&side_match.join_key_indices[idx])
775                            .unwrap_or(&empty_indices),
776                    );
777                for output_idx in output_indices {
778                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
779                }
780            };
781        }
782        for (inequality_index, need_offset) in
783            &side_update.input2inequality_index[watermark.col_idx]
784        {
785            let buffers = self
786                .watermark_buffers
787                .entry(side_update.join_key_indices.len() + inequality_index)
788                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
789            let mut input_watermark = watermark.clone();
790            if *need_offset
791                && let Some(delta_expression) = self.inequality_pairs[*inequality_index].1.as_ref()
792            {
793                // allow since we will handle error manually.
794                #[allow(clippy::disallowed_methods)]
795                let eval_result = delta_expression
796                    .inner()
797                    .eval_row(&OwnedRow::new(vec![Some(input_watermark.val)]))
798                    .await;
799                match eval_result {
800                    Ok(value) => input_watermark.val = value.unwrap(),
801                    Err(err) => {
802                        if !matches!(err, ExprError::NumericOutOfRange) {
803                            self.ctx.on_compute_error(err, &self.info.identity);
804                        }
805                        continue;
806                    }
807                }
808            };
809            if let Some(selected_watermark) = buffers.handle_watermark(side, input_watermark) {
810                for output_idx in &self.inequality_pairs[*inequality_index].0 {
811                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
812                }
813                self.inequality_watermarks[*inequality_index] = Some(selected_watermark);
814            }
815        }
816        Ok(watermarks_to_emit)
817    }
818
819    fn row_concat(
820        row_update: impl Row,
821        update_start_pos: usize,
822        row_matched: impl Row,
823        matched_start_pos: usize,
824    ) -> OwnedRow {
825        let mut new_row = vec![None; row_update.len() + row_matched.len()];
826
827        for (i, datum_ref) in row_update.iter().enumerate() {
828            new_row[i + update_start_pos] = datum_ref.to_owned_datum();
829        }
830        for (i, datum_ref) in row_matched.iter().enumerate() {
831            new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
832        }
833        OwnedRow::new(new_row)
834    }
835
836    /// Used to forward `eq_join_oneside` to show join side in stack.
837    fn eq_join_left(
838        args: EqJoinArgs<'_, K, S, E>,
839    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
840        Self::eq_join_oneside::<{ SideType::Left }>(args)
841    }
842
843    /// Used to forward `eq_join_oneside` to show join side in stack.
844    fn eq_join_right(
845        args: EqJoinArgs<'_, K, S, E>,
846    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
847        Self::eq_join_oneside::<{ SideType::Right }>(args)
848    }
849
850    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
851    async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
852        let EqJoinArgs {
853            ctx,
854            side_l,
855            side_r,
856            actual_output_data_types,
857            cond,
858            inequality_watermarks,
859            chunk,
860            append_only_optimize,
861            chunk_size,
862            cnt_rows_received,
863            high_join_amplification_threshold,
864            entry_state_max_rows,
865            ..
866        } = args;
867
868        let (side_update, side_match) = if SIDE == SideType::Left {
869            (side_l, side_r)
870        } else {
871            (side_r, side_l)
872        };
873
874        let useful_state_clean_columns = side_match
875            .state_clean_columns
876            .iter()
877            .filter_map(|(column_idx, inequality_index)| {
878                inequality_watermarks[*inequality_index]
879                    .as_ref()
880                    .map(|watermark| (*column_idx, watermark))
881            })
882            .collect_vec();
883
884        let mut hashjoin_chunk_builder =
885            JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
886                chunk_size,
887                actual_output_data_types.to_vec(),
888                side_update.i2o_mapping.clone(),
889                side_match.i2o_mapping.clone(),
890            ));
891
892        let join_matched_join_keys = ctx
893            .streaming_metrics
894            .join_matched_join_keys
895            .with_guarded_label_values(&[
896                &ctx.id.to_string(),
897                &ctx.fragment_id.to_string(),
898                &side_update.ht.table_id().to_string(),
899            ]);
900
901        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
902        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
903            let Some((op, row)) = r else {
904                continue;
905            };
906            Self::evict_cache(side_update, side_match, cnt_rows_received);
907
908            let cache_lookup_result = {
909                let probe_non_null_requirement_satisfied = side_update
910                    .non_null_fields
911                    .iter()
912                    .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
913                let build_non_null_requirement_satisfied =
914                    key.null_bitmap().is_subset(side_match.ht.null_matched());
915                if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
916                    side_match.ht.take_state_opt(key)
917                } else {
918                    CacheResult::NeverMatch
919                }
920            };
921            let mut total_matches = 0;
922
923            macro_rules! match_rows {
924                ($op:ident) => {
925                    Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
926                        cache_lookup_result,
927                        row,
928                        key,
929                        &mut hashjoin_chunk_builder,
930                        side_match,
931                        side_update,
932                        &useful_state_clean_columns,
933                        cond,
934                        append_only_optimize,
935                        entry_state_max_rows,
936                    )
937                };
938            }
939
940            match op {
941                Op::Insert | Op::UpdateInsert =>
942                {
943                    #[for_await]
944                    for chunk in match_rows!(Insert) {
945                        let chunk = chunk?;
946                        total_matches += chunk.cardinality();
947                        yield chunk;
948                    }
949                }
950                Op::Delete | Op::UpdateDelete =>
951                {
952                    #[for_await]
953                    for chunk in match_rows!(Delete) {
954                        let chunk = chunk?;
955                        total_matches += chunk.cardinality();
956                        yield chunk;
957                    }
958                }
959            };
960
961            join_matched_join_keys.observe(total_matches as _);
962            if total_matches > high_join_amplification_threshold {
963                let join_key_data_types = side_update.ht.join_key_data_types();
964                let key = key.deserialize(join_key_data_types)?;
965                tracing::warn!(target: "high_join_amplification",
966                    matched_rows_len = total_matches,
967                    update_table_id = side_update.ht.table_id(),
968                    match_table_id = side_match.ht.table_id(),
969                    join_key = ?key,
970                    actor_id = ctx.id,
971                    fragment_id = ctx.fragment_id,
972                    "large rows matched for join key"
973                );
974            }
975        }
976        // NOTE(kwannoel): We don't track metrics for this last chunk.
977        if let Some(chunk) = hashjoin_chunk_builder.take() {
978            yield chunk;
979        }
980    }
981
982    /// For the probe-side row, we need to check if it has values in cache, if not, we need to
983    /// fetch the matched rows from the state table.
984    ///
985    /// Every matched build-side row being processed needs to go through the following phases:
986    /// 1. Handle join condition evaluation.
987    /// 2. Always do cache refill, if the state count is good.
988    /// 3. Handle state cleaning.
989    /// 4. Handle degree table update.
990    #[allow(clippy::too_many_arguments)]
991    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
992    async fn handle_match_rows<
993        'a,
994        const SIDE: SideTypePrimitive,
995        const JOIN_OP: JoinOpPrimitive,
996    >(
997        cached_lookup_result: CacheResult<E>,
998        row: RowRef<'a>,
999        key: &'a K,
1000        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1001        side_match: &'a mut JoinSide<K, S, E>,
1002        side_update: &'a mut JoinSide<K, S, E>,
1003        useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1004        cond: &'a mut Option<NonStrictExpression>,
1005        append_only_optimize: bool,
1006        entry_state_max_rows: usize,
1007    ) {
1008        let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1009        let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1010        let mut entry_state_count = 0;
1011
1012        let mut degree = 0;
1013        let mut append_only_matched_row: Option<JoinRow<OwnedRow>> = None;
1014        let mut matched_rows_to_clean = vec![];
1015
1016        macro_rules! match_row {
1017            (
1018                $match_order_key_indices:expr,
1019                $degree_table:expr,
1020                $matched_row:expr,
1021                $matched_row_ref:expr,
1022                $from_cache:literal
1023            ) => {
1024                Self::handle_match_row::<SIDE, { JOIN_OP }, { $from_cache }>(
1025                    row,
1026                    $matched_row,
1027                    $matched_row_ref,
1028                    hashjoin_chunk_builder,
1029                    $match_order_key_indices,
1030                    $degree_table,
1031                    side_update.start_pos,
1032                    side_match.start_pos,
1033                    cond,
1034                    &mut degree,
1035                    useful_state_clean_columns,
1036                    append_only_optimize,
1037                    &mut append_only_matched_row,
1038                    &mut matched_rows_to_clean,
1039                )
1040            };
1041        }
1042
1043        let entry_state = match cached_lookup_result {
1044            CacheResult::NeverMatch => {
1045                let op = match JOIN_OP {
1046                    JoinOp::Insert => Op::Insert,
1047                    JoinOp::Delete => Op::Delete,
1048                };
1049                if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1050                    yield chunk;
1051                }
1052                return Ok(());
1053            }
1054            CacheResult::Hit(mut cached_rows) => {
1055                let (match_order_key_indices, match_degree_state) =
1056                    side_match.ht.get_degree_state_mut_ref();
1057                // Handle cached rows which match the probe-side row.
1058                for (matched_row_ref, matched_row) in
1059                    cached_rows.values_mut(&side_match.all_data_types)
1060                {
1061                    let matched_row = matched_row?;
1062                    if let Some(chunk) = match_row!(
1063                        match_order_key_indices,
1064                        match_degree_state,
1065                        matched_row,
1066                        Some(matched_row_ref),
1067                        true
1068                    )
1069                    .await
1070                    {
1071                        yield chunk;
1072                    }
1073                }
1074
1075                cached_rows
1076            }
1077            CacheResult::Miss => {
1078                // Handle rows which are not in cache.
1079                let (matched_rows, match_order_key_indices, degree_table) = side_match
1080                    .ht
1081                    .fetch_matched_rows_and_get_degree_table_ref(key)
1082                    .await?;
1083
1084                #[for_await]
1085                for matched_row in matched_rows {
1086                    let (encoded_pk, matched_row) = matched_row?;
1087
1088                    let mut matched_row_ref = None;
1089
1090                    // cache refill
1091                    if entry_state_count <= entry_state_max_rows {
1092                        let row_ref = entry_state
1093                            .insert(encoded_pk, E::encode(&matched_row), None) // TODO(kwannoel): handle ineq key for asof join.
1094                            .with_context(|| format!("row: {}", row.display(),))?;
1095                        matched_row_ref = Some(row_ref);
1096                        entry_state_count += 1;
1097                    }
1098                    if let Some(chunk) = match_row!(
1099                        match_order_key_indices,
1100                        degree_table,
1101                        matched_row,
1102                        matched_row_ref,
1103                        false
1104                    )
1105                    .await
1106                    {
1107                        yield chunk;
1108                    }
1109                }
1110                Box::new(entry_state)
1111            }
1112        };
1113
1114        // forward rows depending on join types
1115        let op = match JOIN_OP {
1116            JoinOp::Insert => Op::Insert,
1117            JoinOp::Delete => Op::Delete,
1118        };
1119        if degree == 0 {
1120            if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1121                yield chunk;
1122            }
1123        } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1124        {
1125            yield chunk;
1126        }
1127
1128        // cache refill
1129        if cache_hit || entry_state_count <= entry_state_max_rows {
1130            side_match.ht.update_state(key, entry_state);
1131        }
1132
1133        // watermark state cleaning
1134        for matched_row in matched_rows_to_clean {
1135            side_match.ht.delete_handle_degree(key, matched_row)?;
1136        }
1137
1138        // apply append_only optimization to clean matched_rows which have been persisted
1139        if append_only_optimize && let Some(row) = append_only_matched_row {
1140            assert_matches!(JOIN_OP, JoinOp::Insert);
1141            side_match.ht.delete_handle_degree(key, row)?;
1142            return Ok(());
1143        }
1144
1145        // no append_only optimization, update state table(s).
1146        match JOIN_OP {
1147            JoinOp::Insert => {
1148                side_update
1149                    .ht
1150                    .insert_handle_degree(key, JoinRow::new(row, degree))?;
1151            }
1152            JoinOp::Delete => {
1153                side_update
1154                    .ht
1155                    .delete_handle_degree(key, JoinRow::new(row, degree))?;
1156            }
1157        }
1158    }
1159
1160    #[allow(clippy::too_many_arguments)]
1161    #[inline]
1162    async fn handle_match_row<
1163        'a,
1164        const SIDE: SideTypePrimitive,
1165        const JOIN_OP: JoinOpPrimitive,
1166        const MATCHED_ROWS_FROM_CACHE: bool,
1167    >(
1168        update_row: RowRef<'a>,
1169        mut matched_row: JoinRow<OwnedRow>,
1170        mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1171        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1172        match_order_key_indices: &[usize],
1173        match_degree_table: &mut Option<TableInner<S>>,
1174        side_update_start_pos: usize,
1175        side_match_start_pos: usize,
1176        cond: &Option<NonStrictExpression>,
1177        update_row_degree: &mut u64,
1178        useful_state_clean_columns: &[(usize, &'a Watermark)],
1179        append_only_optimize: bool,
1180        append_only_matched_row: &mut Option<JoinRow<OwnedRow>>,
1181        matched_rows_to_clean: &mut Vec<JoinRow<OwnedRow>>,
1182    ) -> Option<StreamChunk> {
1183        let mut need_state_clean = false;
1184        let mut chunk_opt = None;
1185
1186        // TODO(kwannoel): Instead of evaluating this every loop,
1187        // we can call this only if there's a non-equi expression.
1188        // check join cond
1189        let join_condition_satisfied = Self::check_join_condition(
1190            update_row,
1191            side_update_start_pos,
1192            &matched_row.row,
1193            side_match_start_pos,
1194            cond,
1195        )
1196        .await;
1197
1198        if join_condition_satisfied {
1199            // update degree
1200            *update_row_degree += 1;
1201            // send matched row downstream
1202
1203            // Inserts must happen before degrees are updated,
1204            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1205            if matches!(JOIN_OP, JoinOp::Insert)
1206                && !forward_exactly_once(T, SIDE)
1207                && let Some(chunk) =
1208                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1209            {
1210                chunk_opt = Some(chunk);
1211            }
1212            // TODO(kwannoel): We can actually statically decide this, using join side + join type.
1213            if let Some(degree_table) = match_degree_table {
1214                update_degree::<S, { JOIN_OP }>(
1215                    match_order_key_indices,
1216                    degree_table,
1217                    &mut matched_row,
1218                );
1219                if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1220                    // update matched row in cache
1221                    match JOIN_OP {
1222                        JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1223                        JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1224                    }
1225                }
1226            }
1227
1228            // Deletes must happen after degree table is updated.
1229            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1230            if matches!(JOIN_OP, JoinOp::Delete)
1231                && !forward_exactly_once(T, SIDE)
1232                && let Some(chunk) =
1233                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1234            {
1235                chunk_opt = Some(chunk);
1236            }
1237        } else {
1238            // check if need state cleaning
1239            for (column_idx, watermark) in useful_state_clean_columns {
1240                if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1241                    scalar
1242                        .default_cmp(&watermark.val.as_scalar_ref_impl())
1243                        .is_lt()
1244                }) {
1245                    need_state_clean = true;
1246                    break;
1247                }
1248            }
1249        }
1250        // If the stream is append-only and the join key covers pk in both side,
1251        // then we can remove matched rows since pk is unique and will not be
1252        // inserted again
1253        if append_only_optimize {
1254            assert_matches!(JOIN_OP, JoinOp::Insert);
1255            // Since join key contains pk and pk is unique, there should be only
1256            // one row if matched.
1257            assert!(append_only_matched_row.is_none());
1258            *append_only_matched_row = Some(matched_row);
1259        } else if need_state_clean {
1260            // `append_only_optimize` and `need_state_clean` won't both be true.
1261            // 'else' here is only to suppress compiler error, otherwise
1262            // `matched_row` will be moved twice.
1263            matched_rows_to_clean.push(matched_row);
1264        }
1265
1266        chunk_opt
1267    }
1268
1269    // TODO(yuhao-su): We should find a better way to eval the expression
1270    // without concat two rows.
1271    // if there are non-equi expressions
1272    // NOTE(kwannoel): We can probably let `eval` use `impl Row` instead of `OwnedRow`.
1273    #[inline]
1274    async fn check_join_condition(
1275        row: impl Row,
1276        side_update_start_pos: usize,
1277        matched_row: impl Row,
1278        side_match_start_pos: usize,
1279        join_condition: &Option<NonStrictExpression>,
1280    ) -> bool {
1281        if let Some(join_condition) = join_condition {
1282            let new_row = Self::row_concat(
1283                row,
1284                side_update_start_pos,
1285                matched_row,
1286                side_match_start_pos,
1287            );
1288            join_condition
1289                .eval_row_infallible(&new_row)
1290                .await
1291                .map(|s| *s.as_bool())
1292                .unwrap_or(false)
1293        } else {
1294            true
1295        }
1296    }
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301    use std::sync::atomic::AtomicU64;
1302
1303    use pretty_assertions::assert_eq;
1304    use risingwave_common::array::*;
1305    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1306    use risingwave_common::hash::{Key64, Key128};
1307    use risingwave_common::util::epoch::test_epoch;
1308    use risingwave_common::util::sort_util::OrderType;
1309    use risingwave_storage::memory::MemoryStateStore;
1310
1311    use super::*;
1312    use crate::common::table::test_utils::gen_pbtable;
1313    use crate::executor::MemoryEncoding;
1314    use crate::executor::test_utils::expr::build_from_pretty;
1315    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1316
1317    async fn create_in_memory_state_table(
1318        mem_state: MemoryStateStore,
1319        data_types: &[DataType],
1320        order_types: &[OrderType],
1321        pk_indices: &[usize],
1322        table_id: u32,
1323    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1324        let column_descs = data_types
1325            .iter()
1326            .enumerate()
1327            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1328            .collect_vec();
1329        let state_table = StateTable::from_table_catalog(
1330            &gen_pbtable(
1331                TableId::new(table_id),
1332                column_descs,
1333                order_types.to_vec(),
1334                pk_indices.to_vec(),
1335                0,
1336            ),
1337            mem_state.clone(),
1338            None,
1339        )
1340        .await;
1341
1342        // Create degree table
1343        let mut degree_table_column_descs = vec![];
1344        pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1345            degree_table_column_descs.push(ColumnDesc::unnamed(
1346                ColumnId::new(pk_id as i32),
1347                data_types[*idx].clone(),
1348            ))
1349        });
1350        degree_table_column_descs.push(ColumnDesc::unnamed(
1351            ColumnId::new(pk_indices.len() as i32),
1352            DataType::Int64,
1353        ));
1354        let degree_state_table = StateTable::from_table_catalog(
1355            &gen_pbtable(
1356                TableId::new(table_id + 1),
1357                degree_table_column_descs,
1358                order_types.to_vec(),
1359                pk_indices.to_vec(),
1360                0,
1361            ),
1362            mem_state,
1363            None,
1364        )
1365        .await;
1366        (state_table, degree_state_table)
1367    }
1368
1369    fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1370        build_from_pretty(
1371            condition_text
1372                .as_deref()
1373                .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1374        )
1375    }
1376
1377    async fn create_executor<const T: JoinTypePrimitive>(
1378        with_condition: bool,
1379        null_safe: bool,
1380        condition_text: Option<String>,
1381        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
1382    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1383        let schema = Schema {
1384            fields: vec![
1385                Field::unnamed(DataType::Int64), // join key
1386                Field::unnamed(DataType::Int64),
1387            ],
1388        };
1389        let (tx_l, source_l) = MockSource::channel();
1390        let source_l = source_l.into_executor(schema.clone(), vec![1]);
1391        let (tx_r, source_r) = MockSource::channel();
1392        let source_r = source_r.into_executor(schema, vec![1]);
1393        let params_l = JoinParams::new(vec![0], vec![1]);
1394        let params_r = JoinParams::new(vec![0], vec![1]);
1395        let cond = with_condition.then(|| create_cond(condition_text));
1396
1397        let mem_state = MemoryStateStore::new();
1398
1399        let (state_l, degree_state_l) = create_in_memory_state_table(
1400            mem_state.clone(),
1401            &[DataType::Int64, DataType::Int64],
1402            &[OrderType::ascending(), OrderType::ascending()],
1403            &[0, 1],
1404            0,
1405        )
1406        .await;
1407
1408        let (state_r, degree_state_r) = create_in_memory_state_table(
1409            mem_state,
1410            &[DataType::Int64, DataType::Int64],
1411            &[OrderType::ascending(), OrderType::ascending()],
1412            &[0, 1],
1413            2,
1414        )
1415        .await;
1416
1417        let schema = match T {
1418            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1419            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1420            _ => [source_l.schema().fields(), source_r.schema().fields()]
1421                .concat()
1422                .into_iter()
1423                .collect(),
1424        };
1425        let schema_len = schema.len();
1426        let info = ExecutorInfo::new(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1427
1428        let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1429            ActorContext::for_test(123),
1430            info,
1431            source_l,
1432            source_r,
1433            params_l,
1434            params_r,
1435            vec![null_safe],
1436            (0..schema_len).collect_vec(),
1437            cond,
1438            inequality_pairs,
1439            state_l,
1440            degree_state_l,
1441            state_r,
1442            degree_state_r,
1443            Arc::new(AtomicU64::new(0)),
1444            false,
1445            Arc::new(StreamingMetrics::unused()),
1446            1024,
1447            2048,
1448        );
1449        (tx_l, tx_r, executor.boxed().execute())
1450    }
1451
1452    async fn create_classical_executor<const T: JoinTypePrimitive>(
1453        with_condition: bool,
1454        null_safe: bool,
1455        condition_text: Option<String>,
1456    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1457        create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1458    }
1459
1460    async fn create_append_only_executor<const T: JoinTypePrimitive>(
1461        with_condition: bool,
1462    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1463        let schema = Schema {
1464            fields: vec![
1465                Field::unnamed(DataType::Int64),
1466                Field::unnamed(DataType::Int64),
1467                Field::unnamed(DataType::Int64),
1468            ],
1469        };
1470        let (tx_l, source_l) = MockSource::channel();
1471        let source_l = source_l.into_executor(schema.clone(), vec![0]);
1472        let (tx_r, source_r) = MockSource::channel();
1473        let source_r = source_r.into_executor(schema, vec![0]);
1474        let params_l = JoinParams::new(vec![0, 1], vec![]);
1475        let params_r = JoinParams::new(vec![0, 1], vec![]);
1476        let cond = with_condition.then(|| create_cond(None));
1477
1478        let mem_state = MemoryStateStore::new();
1479
1480        let (state_l, degree_state_l) = create_in_memory_state_table(
1481            mem_state.clone(),
1482            &[DataType::Int64, DataType::Int64, DataType::Int64],
1483            &[
1484                OrderType::ascending(),
1485                OrderType::ascending(),
1486                OrderType::ascending(),
1487            ],
1488            &[0, 1, 0],
1489            0,
1490        )
1491        .await;
1492
1493        let (state_r, degree_state_r) = create_in_memory_state_table(
1494            mem_state,
1495            &[DataType::Int64, DataType::Int64, DataType::Int64],
1496            &[
1497                OrderType::ascending(),
1498                OrderType::ascending(),
1499                OrderType::ascending(),
1500            ],
1501            &[0, 1, 1],
1502            1,
1503        )
1504        .await;
1505
1506        let schema = match T {
1507            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1508            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1509            _ => [source_l.schema().fields(), source_r.schema().fields()]
1510                .concat()
1511                .into_iter()
1512                .collect(),
1513        };
1514        let schema_len = schema.len();
1515        let info = ExecutorInfo::new(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1516
1517        let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1518            ActorContext::for_test(123),
1519            info,
1520            source_l,
1521            source_r,
1522            params_l,
1523            params_r,
1524            vec![false],
1525            (0..schema_len).collect_vec(),
1526            cond,
1527            vec![],
1528            state_l,
1529            degree_state_l,
1530            state_r,
1531            degree_state_r,
1532            Arc::new(AtomicU64::new(0)),
1533            true,
1534            Arc::new(StreamingMetrics::unused()),
1535            1024,
1536            2048,
1537        );
1538        (tx_l, tx_r, executor.boxed().execute())
1539    }
1540
1541    #[tokio::test]
1542    async fn test_interval_join() -> StreamExecutorResult<()> {
1543        let chunk_l1 = StreamChunk::from_pretty(
1544            "  I I
1545             + 1 4
1546             + 2 3
1547             + 2 5
1548             + 3 6",
1549        );
1550        let chunk_l2 = StreamChunk::from_pretty(
1551            "  I I
1552             + 3 8
1553             - 3 8",
1554        );
1555        let chunk_r1 = StreamChunk::from_pretty(
1556            "  I I
1557             + 2 6
1558             + 4 8
1559             + 6 9",
1560        );
1561        let chunk_r2 = StreamChunk::from_pretty(
1562            "  I  I
1563             + 2 3
1564             + 6 11",
1565        );
1566        let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1567            true,
1568            false,
1569            Some(String::from("(and:boolean (greater_than:boolean $1:int8 (subtract:int8 $3:int8 2:int8)) (greater_than:boolean $3:int8 (subtract:int8 $1:int8 2:int8)))")),
1570            vec![(1, 3, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)"))), (3, 1, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)")))],
1571        )
1572        .await;
1573
1574        // push the init barrier for left and right
1575        tx_l.push_barrier(test_epoch(1), false);
1576        tx_r.push_barrier(test_epoch(1), false);
1577        hash_join.next_unwrap_ready_barrier()?;
1578
1579        // push the 1st left chunk
1580        tx_l.push_chunk(chunk_l1);
1581        hash_join.next_unwrap_pending();
1582
1583        // push the init barrier for left and right
1584        tx_l.push_barrier(test_epoch(2), false);
1585        tx_r.push_barrier(test_epoch(2), false);
1586        hash_join.next_unwrap_ready_barrier()?;
1587
1588        // push the 2nd left chunk
1589        tx_l.push_chunk(chunk_l2);
1590        hash_join.next_unwrap_pending();
1591
1592        tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1593        hash_join.next_unwrap_pending();
1594
1595        tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1596        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1597        assert_eq!(
1598            output_watermark,
1599            Watermark::new(1, DataType::Int64, ScalarImpl::Int64(4))
1600        );
1601        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1602        assert_eq!(
1603            output_watermark,
1604            Watermark::new(3, DataType::Int64, ScalarImpl::Int64(6))
1605        );
1606
1607        // push the 1st right chunk
1608        tx_r.push_chunk(chunk_r1);
1609        let chunk = hash_join.next_unwrap_ready_chunk()?;
1610        // data "2 3" should have been cleaned
1611        assert_eq!(
1612            chunk,
1613            StreamChunk::from_pretty(
1614                " I I I I
1615                + 2 5 2 6"
1616            )
1617        );
1618
1619        // push the 2nd right chunk
1620        tx_r.push_chunk(chunk_r2);
1621        // pending means that state clean is successful, or the executor will yield a chunk "+ 2 3
1622        // 2 3" here.
1623        hash_join.next_unwrap_pending();
1624
1625        Ok(())
1626    }
1627
1628    #[tokio::test]
1629    async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1630        let chunk_l1 = StreamChunk::from_pretty(
1631            "  I I
1632             + 1 4
1633             + 2 5
1634             + 3 6",
1635        );
1636        let chunk_l2 = StreamChunk::from_pretty(
1637            "  I I
1638             + 3 8
1639             - 3 8",
1640        );
1641        let chunk_r1 = StreamChunk::from_pretty(
1642            "  I I
1643             + 2 7
1644             + 4 8
1645             + 6 9",
1646        );
1647        let chunk_r2 = StreamChunk::from_pretty(
1648            "  I  I
1649             + 3 10
1650             + 6 11",
1651        );
1652        let (mut tx_l, mut tx_r, mut hash_join) =
1653            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1654
1655        // push the init barrier for left and right
1656        tx_l.push_barrier(test_epoch(1), false);
1657        tx_r.push_barrier(test_epoch(1), false);
1658        hash_join.next_unwrap_ready_barrier()?;
1659
1660        // push the 1st left chunk
1661        tx_l.push_chunk(chunk_l1);
1662        hash_join.next_unwrap_pending();
1663
1664        // push the init barrier for left and right
1665        tx_l.push_barrier(test_epoch(2), false);
1666        tx_r.push_barrier(test_epoch(2), false);
1667        hash_join.next_unwrap_ready_barrier()?;
1668
1669        // push the 2nd left chunk
1670        tx_l.push_chunk(chunk_l2);
1671        hash_join.next_unwrap_pending();
1672
1673        // push the 1st right chunk
1674        tx_r.push_chunk(chunk_r1);
1675        let chunk = hash_join.next_unwrap_ready_chunk()?;
1676        assert_eq!(
1677            chunk,
1678            StreamChunk::from_pretty(
1679                " I I I I
1680                + 2 5 2 7"
1681            )
1682        );
1683
1684        // push the 2nd right chunk
1685        tx_r.push_chunk(chunk_r2);
1686        let chunk = hash_join.next_unwrap_ready_chunk()?;
1687        assert_eq!(
1688            chunk,
1689            StreamChunk::from_pretty(
1690                " I I I I
1691                + 3 6 3 10"
1692            )
1693        );
1694
1695        Ok(())
1696    }
1697
1698    #[tokio::test]
1699    async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1700        let chunk_l1 = StreamChunk::from_pretty(
1701            "  I I
1702             + 1 4
1703             + 2 5
1704             + . 6",
1705        );
1706        let chunk_l2 = StreamChunk::from_pretty(
1707            "  I I
1708             + . 8
1709             - . 8",
1710        );
1711        let chunk_r1 = StreamChunk::from_pretty(
1712            "  I I
1713             + 2 7
1714             + 4 8
1715             + 6 9",
1716        );
1717        let chunk_r2 = StreamChunk::from_pretty(
1718            "  I  I
1719             + . 10
1720             + 6 11",
1721        );
1722        let (mut tx_l, mut tx_r, mut hash_join) =
1723            create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1724
1725        // push the init barrier for left and right
1726        tx_l.push_barrier(test_epoch(1), false);
1727        tx_r.push_barrier(test_epoch(1), false);
1728        hash_join.next_unwrap_ready_barrier()?;
1729
1730        // push the 1st left chunk
1731        tx_l.push_chunk(chunk_l1);
1732        hash_join.next_unwrap_pending();
1733
1734        // push the init barrier for left and right
1735        tx_l.push_barrier(test_epoch(2), false);
1736        tx_r.push_barrier(test_epoch(2), false);
1737        hash_join.next_unwrap_ready_barrier()?;
1738
1739        // push the 2nd left chunk
1740        tx_l.push_chunk(chunk_l2);
1741        hash_join.next_unwrap_pending();
1742
1743        // push the 1st right chunk
1744        tx_r.push_chunk(chunk_r1);
1745        let chunk = hash_join.next_unwrap_ready_chunk()?;
1746        assert_eq!(
1747            chunk,
1748            StreamChunk::from_pretty(
1749                " I I I I
1750                + 2 5 2 7"
1751            )
1752        );
1753
1754        // push the 2nd right chunk
1755        tx_r.push_chunk(chunk_r2);
1756        let chunk = hash_join.next_unwrap_ready_chunk()?;
1757        assert_eq!(
1758            chunk,
1759            StreamChunk::from_pretty(
1760                " I I I I
1761                + . 6 . 10"
1762            )
1763        );
1764
1765        Ok(())
1766    }
1767
1768    #[tokio::test]
1769    async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1770        let chunk_l1 = StreamChunk::from_pretty(
1771            "  I I
1772             + 1 4
1773             + 2 5
1774             + 3 6",
1775        );
1776        let chunk_l2 = StreamChunk::from_pretty(
1777            "  I I
1778             + 3 8
1779             - 3 8",
1780        );
1781        let chunk_r1 = StreamChunk::from_pretty(
1782            "  I I
1783             + 2 7
1784             + 4 8
1785             + 6 9",
1786        );
1787        let chunk_r2 = StreamChunk::from_pretty(
1788            "  I  I
1789             + 3 10
1790             + 6 11",
1791        );
1792        let chunk_l3 = StreamChunk::from_pretty(
1793            "  I I
1794             + 6 10",
1795        );
1796        let chunk_r3 = StreamChunk::from_pretty(
1797            "  I  I
1798             - 6 11",
1799        );
1800        let chunk_r4 = StreamChunk::from_pretty(
1801            "  I  I
1802             - 6 9",
1803        );
1804        let (mut tx_l, mut tx_r, mut hash_join) =
1805            create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1806
1807        // push the init barrier for left and right
1808        tx_l.push_barrier(test_epoch(1), false);
1809        tx_r.push_barrier(test_epoch(1), false);
1810        hash_join.next_unwrap_ready_barrier()?;
1811
1812        // push the 1st left chunk
1813        tx_l.push_chunk(chunk_l1);
1814        hash_join.next_unwrap_pending();
1815
1816        // push the init barrier for left and right
1817        tx_l.push_barrier(test_epoch(2), false);
1818        tx_r.push_barrier(test_epoch(2), false);
1819        hash_join.next_unwrap_ready_barrier()?;
1820
1821        // push the 2nd left chunk
1822        tx_l.push_chunk(chunk_l2);
1823        hash_join.next_unwrap_pending();
1824
1825        // push the 1st right chunk
1826        tx_r.push_chunk(chunk_r1);
1827        let chunk = hash_join.next_unwrap_ready_chunk()?;
1828        assert_eq!(
1829            chunk,
1830            StreamChunk::from_pretty(
1831                " I I
1832                + 2 5"
1833            )
1834        );
1835
1836        // push the 2nd right chunk
1837        tx_r.push_chunk(chunk_r2);
1838        let chunk = hash_join.next_unwrap_ready_chunk()?;
1839        assert_eq!(
1840            chunk,
1841            StreamChunk::from_pretty(
1842                " I I
1843                + 3 6"
1844            )
1845        );
1846
1847        // push the 3rd left chunk (tests forward_exactly_once)
1848        tx_l.push_chunk(chunk_l3);
1849        let chunk = hash_join.next_unwrap_ready_chunk()?;
1850        assert_eq!(
1851            chunk,
1852            StreamChunk::from_pretty(
1853                " I I
1854                + 6 10"
1855            )
1856        );
1857
1858        // push the 3rd right chunk
1859        // (tests that no change if there are still matches)
1860        tx_r.push_chunk(chunk_r3);
1861        hash_join.next_unwrap_pending();
1862
1863        // push the 3rd left chunk
1864        // (tests that deletion occurs when there are no more matches)
1865        tx_r.push_chunk(chunk_r4);
1866        let chunk = hash_join.next_unwrap_ready_chunk()?;
1867        assert_eq!(
1868            chunk,
1869            StreamChunk::from_pretty(
1870                " I I
1871                - 6 10"
1872            )
1873        );
1874
1875        Ok(())
1876    }
1877
1878    #[tokio::test]
1879    async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1880        let chunk_l1 = StreamChunk::from_pretty(
1881            "  I I
1882             + 1 4
1883             + 2 5
1884             + . 6",
1885        );
1886        let chunk_l2 = StreamChunk::from_pretty(
1887            "  I I
1888             + . 8
1889             - . 8",
1890        );
1891        let chunk_r1 = StreamChunk::from_pretty(
1892            "  I I
1893             + 2 7
1894             + 4 8
1895             + 6 9",
1896        );
1897        let chunk_r2 = StreamChunk::from_pretty(
1898            "  I  I
1899             + . 10
1900             + 6 11",
1901        );
1902        let chunk_l3 = StreamChunk::from_pretty(
1903            "  I I
1904             + 6 10",
1905        );
1906        let chunk_r3 = StreamChunk::from_pretty(
1907            "  I  I
1908             - 6 11",
1909        );
1910        let chunk_r4 = StreamChunk::from_pretty(
1911            "  I  I
1912             - 6 9",
1913        );
1914        let (mut tx_l, mut tx_r, mut hash_join) =
1915            create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1916
1917        // push the init barrier for left and right
1918        tx_l.push_barrier(test_epoch(1), false);
1919        tx_r.push_barrier(test_epoch(1), false);
1920        hash_join.next_unwrap_ready_barrier()?;
1921
1922        // push the 1st left chunk
1923        tx_l.push_chunk(chunk_l1);
1924        hash_join.next_unwrap_pending();
1925
1926        // push the init barrier for left and right
1927        tx_l.push_barrier(test_epoch(2), false);
1928        tx_r.push_barrier(test_epoch(2), false);
1929        hash_join.next_unwrap_ready_barrier()?;
1930
1931        // push the 2nd left chunk
1932        tx_l.push_chunk(chunk_l2);
1933        hash_join.next_unwrap_pending();
1934
1935        // push the 1st right chunk
1936        tx_r.push_chunk(chunk_r1);
1937        let chunk = hash_join.next_unwrap_ready_chunk()?;
1938        assert_eq!(
1939            chunk,
1940            StreamChunk::from_pretty(
1941                " I I
1942                + 2 5"
1943            )
1944        );
1945
1946        // push the 2nd right chunk
1947        tx_r.push_chunk(chunk_r2);
1948        let chunk = hash_join.next_unwrap_ready_chunk()?;
1949        assert_eq!(
1950            chunk,
1951            StreamChunk::from_pretty(
1952                " I I
1953                + . 6"
1954            )
1955        );
1956
1957        // push the 3rd left chunk (tests forward_exactly_once)
1958        tx_l.push_chunk(chunk_l3);
1959        let chunk = hash_join.next_unwrap_ready_chunk()?;
1960        assert_eq!(
1961            chunk,
1962            StreamChunk::from_pretty(
1963                " I I
1964                + 6 10"
1965            )
1966        );
1967
1968        // push the 3rd right chunk
1969        // (tests that no change if there are still matches)
1970        tx_r.push_chunk(chunk_r3);
1971        hash_join.next_unwrap_pending();
1972
1973        // push the 3rd left chunk
1974        // (tests that deletion occurs when there are no more matches)
1975        tx_r.push_chunk(chunk_r4);
1976        let chunk = hash_join.next_unwrap_ready_chunk()?;
1977        assert_eq!(
1978            chunk,
1979            StreamChunk::from_pretty(
1980                " I I
1981                - 6 10"
1982            )
1983        );
1984
1985        Ok(())
1986    }
1987
1988    #[tokio::test]
1989    async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
1990        let chunk_l1 = StreamChunk::from_pretty(
1991            "  I I I
1992             + 1 4 1
1993             + 2 5 2
1994             + 3 6 3",
1995        );
1996        let chunk_l2 = StreamChunk::from_pretty(
1997            "  I I I
1998             + 4 9 4
1999             + 5 10 5",
2000        );
2001        let chunk_r1 = StreamChunk::from_pretty(
2002            "  I I I
2003             + 2 5 1
2004             + 4 9 2
2005             + 6 9 3",
2006        );
2007        let chunk_r2 = StreamChunk::from_pretty(
2008            "  I I I
2009             + 1 4 4
2010             + 3 6 5",
2011        );
2012
2013        let (mut tx_l, mut tx_r, mut hash_join) =
2014            create_append_only_executor::<{ JoinType::Inner }>(false).await;
2015
2016        // push the init barrier for left and right
2017        tx_l.push_barrier(test_epoch(1), false);
2018        tx_r.push_barrier(test_epoch(1), false);
2019        hash_join.next_unwrap_ready_barrier()?;
2020
2021        // push the 1st left chunk
2022        tx_l.push_chunk(chunk_l1);
2023        hash_join.next_unwrap_pending();
2024
2025        // push the init barrier for left and right
2026        tx_l.push_barrier(test_epoch(2), false);
2027        tx_r.push_barrier(test_epoch(2), false);
2028        hash_join.next_unwrap_ready_barrier()?;
2029
2030        // push the 2nd left chunk
2031        tx_l.push_chunk(chunk_l2);
2032        hash_join.next_unwrap_pending();
2033
2034        // push the 1st right chunk
2035        tx_r.push_chunk(chunk_r1);
2036        let chunk = hash_join.next_unwrap_ready_chunk()?;
2037        assert_eq!(
2038            chunk,
2039            StreamChunk::from_pretty(
2040                " I I I I I I
2041                + 2 5 2 2 5 1
2042                + 4 9 4 4 9 2"
2043            )
2044        );
2045
2046        // push the 2nd right chunk
2047        tx_r.push_chunk(chunk_r2);
2048        let chunk = hash_join.next_unwrap_ready_chunk()?;
2049        assert_eq!(
2050            chunk,
2051            StreamChunk::from_pretty(
2052                " I I I I I I
2053                + 1 4 1 1 4 4
2054                + 3 6 3 3 6 5"
2055            )
2056        );
2057
2058        Ok(())
2059    }
2060
2061    #[tokio::test]
2062    async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2063        let chunk_l1 = StreamChunk::from_pretty(
2064            "  I I I
2065             + 1 4 1
2066             + 2 5 2
2067             + 3 6 3",
2068        );
2069        let chunk_l2 = StreamChunk::from_pretty(
2070            "  I I I
2071             + 4 9 4
2072             + 5 10 5",
2073        );
2074        let chunk_r1 = StreamChunk::from_pretty(
2075            "  I I I
2076             + 2 5 1
2077             + 4 9 2
2078             + 6 9 3",
2079        );
2080        let chunk_r2 = StreamChunk::from_pretty(
2081            "  I I I
2082             + 1 4 4
2083             + 3 6 5",
2084        );
2085
2086        let (mut tx_l, mut tx_r, mut hash_join) =
2087            create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2088
2089        // push the init barrier for left and right
2090        tx_l.push_barrier(test_epoch(1), false);
2091        tx_r.push_barrier(test_epoch(1), false);
2092        hash_join.next_unwrap_ready_barrier()?;
2093
2094        // push the 1st left chunk
2095        tx_l.push_chunk(chunk_l1);
2096        hash_join.next_unwrap_pending();
2097
2098        // push the init barrier for left and right
2099        tx_l.push_barrier(test_epoch(2), false);
2100        tx_r.push_barrier(test_epoch(2), false);
2101        hash_join.next_unwrap_ready_barrier()?;
2102
2103        // push the 2nd left chunk
2104        tx_l.push_chunk(chunk_l2);
2105        hash_join.next_unwrap_pending();
2106
2107        // push the 1st right chunk
2108        tx_r.push_chunk(chunk_r1);
2109        let chunk = hash_join.next_unwrap_ready_chunk()?;
2110        assert_eq!(
2111            chunk,
2112            StreamChunk::from_pretty(
2113                " I I I
2114                + 2 5 2
2115                + 4 9 4"
2116            )
2117        );
2118
2119        // push the 2nd right chunk
2120        tx_r.push_chunk(chunk_r2);
2121        let chunk = hash_join.next_unwrap_ready_chunk()?;
2122        assert_eq!(
2123            chunk,
2124            StreamChunk::from_pretty(
2125                " I I I
2126                + 1 4 1
2127                + 3 6 3"
2128            )
2129        );
2130
2131        Ok(())
2132    }
2133
2134    #[tokio::test]
2135    async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2136        let chunk_l1 = StreamChunk::from_pretty(
2137            "  I I I
2138             + 1 4 1
2139             + 2 5 2
2140             + 3 6 3",
2141        );
2142        let chunk_l2 = StreamChunk::from_pretty(
2143            "  I I I
2144             + 4 9 4
2145             + 5 10 5",
2146        );
2147        let chunk_r1 = StreamChunk::from_pretty(
2148            "  I I I
2149             + 2 5 1
2150             + 4 9 2
2151             + 6 9 3",
2152        );
2153        let chunk_r2 = StreamChunk::from_pretty(
2154            "  I I I
2155             + 1 4 4
2156             + 3 6 5",
2157        );
2158
2159        let (mut tx_l, mut tx_r, mut hash_join) =
2160            create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2161
2162        // push the init barrier for left and right
2163        tx_l.push_barrier(test_epoch(1), false);
2164        tx_r.push_barrier(test_epoch(1), false);
2165        hash_join.next_unwrap_ready_barrier()?;
2166
2167        // push the 1st left chunk
2168        tx_l.push_chunk(chunk_l1);
2169        hash_join.next_unwrap_pending();
2170
2171        // push the init barrier for left and right
2172        tx_l.push_barrier(test_epoch(2), false);
2173        tx_r.push_barrier(test_epoch(2), false);
2174        hash_join.next_unwrap_ready_barrier()?;
2175
2176        // push the 2nd left chunk
2177        tx_l.push_chunk(chunk_l2);
2178        hash_join.next_unwrap_pending();
2179
2180        // push the 1st right chunk
2181        tx_r.push_chunk(chunk_r1);
2182        let chunk = hash_join.next_unwrap_ready_chunk()?;
2183        assert_eq!(
2184            chunk,
2185            StreamChunk::from_pretty(
2186                " I I I
2187                + 2 5 1
2188                + 4 9 2"
2189            )
2190        );
2191
2192        // push the 2nd right chunk
2193        tx_r.push_chunk(chunk_r2);
2194        let chunk = hash_join.next_unwrap_ready_chunk()?;
2195        assert_eq!(
2196            chunk,
2197            StreamChunk::from_pretty(
2198                " I I I
2199                + 1 4 4
2200                + 3 6 5"
2201            )
2202        );
2203
2204        Ok(())
2205    }
2206
2207    #[tokio::test]
2208    async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2209        let chunk_r1 = StreamChunk::from_pretty(
2210            "  I I
2211             + 1 4
2212             + 2 5
2213             + 3 6",
2214        );
2215        let chunk_r2 = StreamChunk::from_pretty(
2216            "  I I
2217             + 3 8
2218             - 3 8",
2219        );
2220        let chunk_l1 = StreamChunk::from_pretty(
2221            "  I I
2222             + 2 7
2223             + 4 8
2224             + 6 9",
2225        );
2226        let chunk_l2 = StreamChunk::from_pretty(
2227            "  I  I
2228             + 3 10
2229             + 6 11",
2230        );
2231        let chunk_r3 = StreamChunk::from_pretty(
2232            "  I I
2233             + 6 10",
2234        );
2235        let chunk_l3 = StreamChunk::from_pretty(
2236            "  I  I
2237             - 6 11",
2238        );
2239        let chunk_l4 = StreamChunk::from_pretty(
2240            "  I  I
2241             - 6 9",
2242        );
2243        let (mut tx_l, mut tx_r, mut hash_join) =
2244            create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2245
2246        // push the init barrier for left and right
2247        tx_l.push_barrier(test_epoch(1), false);
2248        tx_r.push_barrier(test_epoch(1), false);
2249        hash_join.next_unwrap_ready_barrier()?;
2250
2251        // push the 1st right chunk
2252        tx_r.push_chunk(chunk_r1);
2253        hash_join.next_unwrap_pending();
2254
2255        // push the init barrier for left and right
2256        tx_l.push_barrier(test_epoch(2), false);
2257        tx_r.push_barrier(test_epoch(2), false);
2258        hash_join.next_unwrap_ready_barrier()?;
2259
2260        // push the 2nd right chunk
2261        tx_r.push_chunk(chunk_r2);
2262        hash_join.next_unwrap_pending();
2263
2264        // push the 1st left chunk
2265        tx_l.push_chunk(chunk_l1);
2266        let chunk = hash_join.next_unwrap_ready_chunk()?;
2267        assert_eq!(
2268            chunk,
2269            StreamChunk::from_pretty(
2270                " I I
2271                + 2 5"
2272            )
2273        );
2274
2275        // push the 2nd left chunk
2276        tx_l.push_chunk(chunk_l2);
2277        let chunk = hash_join.next_unwrap_ready_chunk()?;
2278        assert_eq!(
2279            chunk,
2280            StreamChunk::from_pretty(
2281                " I I
2282                + 3 6"
2283            )
2284        );
2285
2286        // push the 3rd right chunk (tests forward_exactly_once)
2287        tx_r.push_chunk(chunk_r3);
2288        let chunk = hash_join.next_unwrap_ready_chunk()?;
2289        assert_eq!(
2290            chunk,
2291            StreamChunk::from_pretty(
2292                " I I
2293                + 6 10"
2294            )
2295        );
2296
2297        // push the 3rd left chunk
2298        // (tests that no change if there are still matches)
2299        tx_l.push_chunk(chunk_l3);
2300        hash_join.next_unwrap_pending();
2301
2302        // push the 3rd right chunk
2303        // (tests that deletion occurs when there are no more matches)
2304        tx_l.push_chunk(chunk_l4);
2305        let chunk = hash_join.next_unwrap_ready_chunk()?;
2306        assert_eq!(
2307            chunk,
2308            StreamChunk::from_pretty(
2309                " I I
2310                - 6 10"
2311            )
2312        );
2313
2314        Ok(())
2315    }
2316
2317    #[tokio::test]
2318    async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2319        let chunk_l1 = StreamChunk::from_pretty(
2320            "  I I
2321             + 1 4
2322             + 2 5
2323             + 3 6",
2324        );
2325        let chunk_l2 = StreamChunk::from_pretty(
2326            "  I I
2327             + 3 8
2328             - 3 8",
2329        );
2330        let chunk_r1 = StreamChunk::from_pretty(
2331            "  I I
2332             + 2 7
2333             + 4 8
2334             + 6 9",
2335        );
2336        let chunk_r2 = StreamChunk::from_pretty(
2337            "  I  I
2338             + 3 10
2339             + 6 11
2340             + 1 2
2341             + 1 3",
2342        );
2343        let chunk_l3 = StreamChunk::from_pretty(
2344            "  I I
2345             + 9 10",
2346        );
2347        let chunk_r3 = StreamChunk::from_pretty(
2348            "  I I
2349             - 1 2",
2350        );
2351        let chunk_r4 = StreamChunk::from_pretty(
2352            "  I I
2353             - 1 3",
2354        );
2355        let (mut tx_l, mut tx_r, mut hash_join) =
2356            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2357
2358        // push the init barrier for left and right
2359        tx_l.push_barrier(test_epoch(1), false);
2360        tx_r.push_barrier(test_epoch(1), false);
2361        hash_join.next_unwrap_ready_barrier()?;
2362
2363        // push the 1st left chunk
2364        tx_l.push_chunk(chunk_l1);
2365        let chunk = hash_join.next_unwrap_ready_chunk()?;
2366        assert_eq!(
2367            chunk,
2368            StreamChunk::from_pretty(
2369                " I I
2370                + 1 4
2371                + 2 5
2372                + 3 6",
2373            )
2374        );
2375
2376        // push the init barrier for left and right
2377        tx_l.push_barrier(test_epoch(2), false);
2378        tx_r.push_barrier(test_epoch(2), false);
2379        hash_join.next_unwrap_ready_barrier()?;
2380
2381        // push the 2nd left chunk
2382        tx_l.push_chunk(chunk_l2);
2383        let chunk = hash_join.next_unwrap_ready_chunk()?;
2384        assert_eq!(
2385            chunk,
2386            StreamChunk::from_pretty(
2387                "  I I
2388                 + 3 8
2389                 - 3 8",
2390            )
2391        );
2392
2393        // push the 1st right chunk
2394        tx_r.push_chunk(chunk_r1);
2395        let chunk = hash_join.next_unwrap_ready_chunk()?;
2396        assert_eq!(
2397            chunk,
2398            StreamChunk::from_pretty(
2399                " I I
2400                - 2 5"
2401            )
2402        );
2403
2404        // push the 2nd right chunk
2405        tx_r.push_chunk(chunk_r2);
2406        let chunk = hash_join.next_unwrap_ready_chunk()?;
2407        assert_eq!(
2408            chunk,
2409            StreamChunk::from_pretty(
2410                " I I
2411                - 3 6
2412                - 1 4"
2413            )
2414        );
2415
2416        // push the 3rd left chunk (tests forward_exactly_once)
2417        tx_l.push_chunk(chunk_l3);
2418        let chunk = hash_join.next_unwrap_ready_chunk()?;
2419        assert_eq!(
2420            chunk,
2421            StreamChunk::from_pretty(
2422                " I I
2423                + 9 10"
2424            )
2425        );
2426
2427        // push the 3rd right chunk
2428        // (tests that no change if there are still matches)
2429        tx_r.push_chunk(chunk_r3);
2430        hash_join.next_unwrap_pending();
2431
2432        // push the 4th right chunk
2433        // (tests that insertion occurs when there are no more matches)
2434        tx_r.push_chunk(chunk_r4);
2435        let chunk = hash_join.next_unwrap_ready_chunk()?;
2436        assert_eq!(
2437            chunk,
2438            StreamChunk::from_pretty(
2439                " I I
2440                + 1 4"
2441            )
2442        );
2443
2444        Ok(())
2445    }
2446
2447    #[tokio::test]
2448    async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2449        let chunk_r1 = StreamChunk::from_pretty(
2450            "  I I
2451             + 1 4
2452             + 2 5
2453             + 3 6",
2454        );
2455        let chunk_r2 = StreamChunk::from_pretty(
2456            "  I I
2457             + 3 8
2458             - 3 8",
2459        );
2460        let chunk_l1 = StreamChunk::from_pretty(
2461            "  I I
2462             + 2 7
2463             + 4 8
2464             + 6 9",
2465        );
2466        let chunk_l2 = StreamChunk::from_pretty(
2467            "  I  I
2468             + 3 10
2469             + 6 11
2470             + 1 2
2471             + 1 3",
2472        );
2473        let chunk_r3 = StreamChunk::from_pretty(
2474            "  I I
2475             + 9 10",
2476        );
2477        let chunk_l3 = StreamChunk::from_pretty(
2478            "  I I
2479             - 1 2",
2480        );
2481        let chunk_l4 = StreamChunk::from_pretty(
2482            "  I I
2483             - 1 3",
2484        );
2485        let (mut tx_r, mut tx_l, mut hash_join) =
2486            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2487
2488        // push the init barrier for left and right
2489        tx_r.push_barrier(test_epoch(1), false);
2490        tx_l.push_barrier(test_epoch(1), false);
2491        hash_join.next_unwrap_ready_barrier()?;
2492
2493        // push the 1st right chunk
2494        tx_r.push_chunk(chunk_r1);
2495        let chunk = hash_join.next_unwrap_ready_chunk()?;
2496        assert_eq!(
2497            chunk,
2498            StreamChunk::from_pretty(
2499                " I I
2500                + 1 4
2501                + 2 5
2502                + 3 6",
2503            )
2504        );
2505
2506        // push the init barrier for left and right
2507        tx_r.push_barrier(test_epoch(2), false);
2508        tx_l.push_barrier(test_epoch(2), false);
2509        hash_join.next_unwrap_ready_barrier()?;
2510
2511        // push the 2nd right chunk
2512        tx_r.push_chunk(chunk_r2);
2513        let chunk = hash_join.next_unwrap_ready_chunk()?;
2514        assert_eq!(
2515            chunk,
2516            StreamChunk::from_pretty(
2517                "  I I
2518                 + 3 8
2519                 - 3 8",
2520            )
2521        );
2522
2523        // push the 1st left chunk
2524        tx_l.push_chunk(chunk_l1);
2525        let chunk = hash_join.next_unwrap_ready_chunk()?;
2526        assert_eq!(
2527            chunk,
2528            StreamChunk::from_pretty(
2529                " I I
2530                - 2 5"
2531            )
2532        );
2533
2534        // push the 2nd left chunk
2535        tx_l.push_chunk(chunk_l2);
2536        let chunk = hash_join.next_unwrap_ready_chunk()?;
2537        assert_eq!(
2538            chunk,
2539            StreamChunk::from_pretty(
2540                " I I
2541                - 3 6
2542                - 1 4"
2543            )
2544        );
2545
2546        // push the 3rd right chunk (tests forward_exactly_once)
2547        tx_r.push_chunk(chunk_r3);
2548        let chunk = hash_join.next_unwrap_ready_chunk()?;
2549        assert_eq!(
2550            chunk,
2551            StreamChunk::from_pretty(
2552                " I I
2553                + 9 10"
2554            )
2555        );
2556
2557        // push the 3rd left chunk
2558        // (tests that no change if there are still matches)
2559        tx_l.push_chunk(chunk_l3);
2560        hash_join.next_unwrap_pending();
2561
2562        // push the 4th left chunk
2563        // (tests that insertion occurs when there are no more matches)
2564        tx_l.push_chunk(chunk_l4);
2565        let chunk = hash_join.next_unwrap_ready_chunk()?;
2566        assert_eq!(
2567            chunk,
2568            StreamChunk::from_pretty(
2569                " I I
2570                + 1 4"
2571            )
2572        );
2573
2574        Ok(())
2575    }
2576
2577    #[tokio::test]
2578    async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2579        let chunk_l1 = StreamChunk::from_pretty(
2580            "  I I
2581             + 1 4
2582             + 2 5
2583             + 3 6",
2584        );
2585        let chunk_l2 = StreamChunk::from_pretty(
2586            "  I I
2587             + 6 8
2588             + 3 8",
2589        );
2590        let chunk_r1 = StreamChunk::from_pretty(
2591            "  I I
2592             + 2 7
2593             + 4 8
2594             + 6 9",
2595        );
2596        let chunk_r2 = StreamChunk::from_pretty(
2597            "  I  I
2598             + 3 10
2599             + 6 11",
2600        );
2601        let (mut tx_l, mut tx_r, mut hash_join) =
2602            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2603
2604        // push the init barrier for left and right
2605        tx_l.push_barrier(test_epoch(1), false);
2606        tx_r.push_barrier(test_epoch(1), false);
2607        hash_join.next_unwrap_ready_barrier()?;
2608
2609        // push the 1st left chunk
2610        tx_l.push_chunk(chunk_l1);
2611        hash_join.next_unwrap_pending();
2612
2613        // push a barrier to left side
2614        tx_l.push_barrier(test_epoch(2), false);
2615
2616        // push the 2nd left chunk
2617        tx_l.push_chunk(chunk_l2);
2618
2619        // join the first right chunk
2620        tx_r.push_chunk(chunk_r1);
2621
2622        // Consume stream chunk
2623        let chunk = hash_join.next_unwrap_ready_chunk()?;
2624        assert_eq!(
2625            chunk,
2626            StreamChunk::from_pretty(
2627                " I I I I
2628                + 2 5 2 7"
2629            )
2630        );
2631
2632        // push a barrier to right side
2633        tx_r.push_barrier(test_epoch(2), false);
2634
2635        // get the aligned barrier here
2636        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2637        assert!(matches!(
2638            hash_join.next_unwrap_ready_barrier()?,
2639            Barrier {
2640                epoch,
2641                mutation: None,
2642                ..
2643            } if epoch == expected_epoch
2644        ));
2645
2646        // join the 2nd left chunk
2647        let chunk = hash_join.next_unwrap_ready_chunk()?;
2648        assert_eq!(
2649            chunk,
2650            StreamChunk::from_pretty(
2651                " I I I I
2652                + 6 8 6 9"
2653            )
2654        );
2655
2656        // push the 2nd right chunk
2657        tx_r.push_chunk(chunk_r2);
2658        let chunk = hash_join.next_unwrap_ready_chunk()?;
2659        assert_eq!(
2660            chunk,
2661            StreamChunk::from_pretty(
2662                " I I I I
2663                + 3 6 3 10
2664                + 3 8 3 10
2665                + 6 8 6 11"
2666            )
2667        );
2668
2669        Ok(())
2670    }
2671
2672    #[tokio::test]
2673    async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2674        let chunk_l1 = StreamChunk::from_pretty(
2675            "  I I
2676             + 1 4
2677             + 2 .
2678             + 3 .",
2679        );
2680        let chunk_l2 = StreamChunk::from_pretty(
2681            "  I I
2682             + 6 .
2683             + 3 8",
2684        );
2685        let chunk_r1 = StreamChunk::from_pretty(
2686            "  I I
2687             + 2 7
2688             + 4 8
2689             + 6 9",
2690        );
2691        let chunk_r2 = StreamChunk::from_pretty(
2692            "  I  I
2693             + 3 10
2694             + 6 11",
2695        );
2696        let (mut tx_l, mut tx_r, mut hash_join) =
2697            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2698
2699        // push the init barrier for left and right
2700        tx_l.push_barrier(test_epoch(1), false);
2701        tx_r.push_barrier(test_epoch(1), false);
2702        hash_join.next_unwrap_ready_barrier()?;
2703
2704        // push the 1st left chunk
2705        tx_l.push_chunk(chunk_l1);
2706        hash_join.next_unwrap_pending();
2707
2708        // push a barrier to left side
2709        tx_l.push_barrier(test_epoch(2), false);
2710
2711        // push the 2nd left chunk
2712        tx_l.push_chunk(chunk_l2);
2713
2714        // join the first right chunk
2715        tx_r.push_chunk(chunk_r1);
2716
2717        // Consume stream chunk
2718        let chunk = hash_join.next_unwrap_ready_chunk()?;
2719        assert_eq!(
2720            chunk,
2721            StreamChunk::from_pretty(
2722                " I I I I
2723                + 2 . 2 7"
2724            )
2725        );
2726
2727        // push a barrier to right side
2728        tx_r.push_barrier(test_epoch(2), false);
2729
2730        // get the aligned barrier here
2731        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2732        assert!(matches!(
2733            hash_join.next_unwrap_ready_barrier()?,
2734            Barrier {
2735                epoch,
2736                mutation: None,
2737                ..
2738            } if epoch == expected_epoch
2739        ));
2740
2741        // join the 2nd left chunk
2742        let chunk = hash_join.next_unwrap_ready_chunk()?;
2743        assert_eq!(
2744            chunk,
2745            StreamChunk::from_pretty(
2746                " I I I I
2747                + 6 . 6 9"
2748            )
2749        );
2750
2751        // push the 2nd right chunk
2752        tx_r.push_chunk(chunk_r2);
2753        let chunk = hash_join.next_unwrap_ready_chunk()?;
2754        assert_eq!(
2755            chunk,
2756            StreamChunk::from_pretty(
2757                " I I I I
2758                + 3 8 3 10
2759                + 3 . 3 10
2760                + 6 . 6 11"
2761            )
2762        );
2763
2764        Ok(())
2765    }
2766
2767    #[tokio::test]
2768    async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2769        let chunk_l1 = StreamChunk::from_pretty(
2770            "  I I
2771             + 1 4
2772             + 2 5
2773             + 3 6",
2774        );
2775        let chunk_l2 = StreamChunk::from_pretty(
2776            "  I I
2777             + 3 8
2778             - 3 8",
2779        );
2780        let chunk_r1 = StreamChunk::from_pretty(
2781            "  I I
2782             + 2 7
2783             + 4 8
2784             + 6 9",
2785        );
2786        let chunk_r2 = StreamChunk::from_pretty(
2787            "  I  I
2788             + 3 10
2789             + 6 11",
2790        );
2791        let (mut tx_l, mut tx_r, mut hash_join) =
2792            create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2793
2794        // push the init barrier for left and right
2795        tx_l.push_barrier(test_epoch(1), false);
2796        tx_r.push_barrier(test_epoch(1), false);
2797        hash_join.next_unwrap_ready_barrier()?;
2798
2799        // push the 1st left chunk
2800        tx_l.push_chunk(chunk_l1);
2801        let chunk = hash_join.next_unwrap_ready_chunk()?;
2802        assert_eq!(
2803            chunk,
2804            StreamChunk::from_pretty(
2805                " I I I I
2806                + 1 4 . .
2807                + 2 5 . .
2808                + 3 6 . ."
2809            )
2810        );
2811
2812        // push the 2nd left chunk
2813        tx_l.push_chunk(chunk_l2);
2814        let chunk = hash_join.next_unwrap_ready_chunk()?;
2815        assert_eq!(
2816            chunk,
2817            StreamChunk::from_pretty(
2818                " I I I I
2819                + 3 8 . .
2820                - 3 8 . ."
2821            )
2822        );
2823
2824        // push the 1st right chunk
2825        tx_r.push_chunk(chunk_r1);
2826        let chunk = hash_join.next_unwrap_ready_chunk()?;
2827        assert_eq!(
2828            chunk,
2829            StreamChunk::from_pretty(
2830                "  I I I I
2831                U- 2 5 . .
2832                U+ 2 5 2 7"
2833            )
2834        );
2835
2836        // push the 2nd right chunk
2837        tx_r.push_chunk(chunk_r2);
2838        let chunk = hash_join.next_unwrap_ready_chunk()?;
2839        assert_eq!(
2840            chunk,
2841            StreamChunk::from_pretty(
2842                "  I I I I
2843                U- 3 6 . .
2844                U+ 3 6 3 10"
2845            )
2846        );
2847
2848        Ok(())
2849    }
2850
2851    #[tokio::test]
2852    async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2853        let chunk_l1 = StreamChunk::from_pretty(
2854            "  I I
2855             + 1 4
2856             + 2 5
2857             + . 6",
2858        );
2859        let chunk_l2 = StreamChunk::from_pretty(
2860            "  I I
2861             + . 8
2862             - . 8",
2863        );
2864        let chunk_r1 = StreamChunk::from_pretty(
2865            "  I I
2866             + 2 7
2867             + 4 8
2868             + 6 9",
2869        );
2870        let chunk_r2 = StreamChunk::from_pretty(
2871            "  I  I
2872             + . 10
2873             + 6 11",
2874        );
2875        let (mut tx_l, mut tx_r, mut hash_join) =
2876            create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2877
2878        // push the init barrier for left and right
2879        tx_l.push_barrier(test_epoch(1), false);
2880        tx_r.push_barrier(test_epoch(1), false);
2881        hash_join.next_unwrap_ready_barrier()?;
2882
2883        // push the 1st left chunk
2884        tx_l.push_chunk(chunk_l1);
2885        let chunk = hash_join.next_unwrap_ready_chunk()?;
2886        assert_eq!(
2887            chunk,
2888            StreamChunk::from_pretty(
2889                " I I I I
2890                + 1 4 . .
2891                + 2 5 . .
2892                + . 6 . ."
2893            )
2894        );
2895
2896        // push the 2nd left chunk
2897        tx_l.push_chunk(chunk_l2);
2898        let chunk = hash_join.next_unwrap_ready_chunk()?;
2899        assert_eq!(
2900            chunk,
2901            StreamChunk::from_pretty(
2902                " I I I I
2903                + . 8 . .
2904                - . 8 . ."
2905            )
2906        );
2907
2908        // push the 1st right chunk
2909        tx_r.push_chunk(chunk_r1);
2910        let chunk = hash_join.next_unwrap_ready_chunk()?;
2911        assert_eq!(
2912            chunk,
2913            StreamChunk::from_pretty(
2914                "  I I I I
2915                U- 2 5 . .
2916                U+ 2 5 2 7"
2917            )
2918        );
2919
2920        // push the 2nd right chunk
2921        tx_r.push_chunk(chunk_r2);
2922        let chunk = hash_join.next_unwrap_ready_chunk()?;
2923        assert_eq!(
2924            chunk,
2925            StreamChunk::from_pretty(
2926                "  I I I I
2927                U- . 6 . .
2928                U+ . 6 . 10"
2929            )
2930        );
2931
2932        Ok(())
2933    }
2934
2935    #[tokio::test]
2936    async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2937        let chunk_l1 = StreamChunk::from_pretty(
2938            "  I I
2939             + 1 4
2940             + 2 5
2941             + 3 6",
2942        );
2943        let chunk_l2 = StreamChunk::from_pretty(
2944            "  I I
2945             + 3 8
2946             - 3 8",
2947        );
2948        let chunk_r1 = StreamChunk::from_pretty(
2949            "  I I
2950             + 2 7
2951             + 4 8
2952             + 6 9",
2953        );
2954        let chunk_r2 = StreamChunk::from_pretty(
2955            "  I  I
2956             + 5 10
2957             - 5 10",
2958        );
2959        let (mut tx_l, mut tx_r, mut hash_join) =
2960            create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
2961
2962        // push the init barrier for left and right
2963        tx_l.push_barrier(test_epoch(1), false);
2964        tx_r.push_barrier(test_epoch(1), false);
2965        hash_join.next_unwrap_ready_barrier()?;
2966
2967        // push the 1st left chunk
2968        tx_l.push_chunk(chunk_l1);
2969        hash_join.next_unwrap_pending();
2970
2971        // push the 2nd left chunk
2972        tx_l.push_chunk(chunk_l2);
2973        hash_join.next_unwrap_pending();
2974
2975        // push the 1st right chunk
2976        tx_r.push_chunk(chunk_r1);
2977        let chunk = hash_join.next_unwrap_ready_chunk()?;
2978        assert_eq!(
2979            chunk,
2980            StreamChunk::from_pretty(
2981                " I I I I
2982                + 2 5 2 7
2983                + . . 4 8
2984                + . . 6 9"
2985            )
2986        );
2987
2988        // push the 2nd right chunk
2989        tx_r.push_chunk(chunk_r2);
2990        let chunk = hash_join.next_unwrap_ready_chunk()?;
2991        assert_eq!(
2992            chunk,
2993            StreamChunk::from_pretty(
2994                " I I I I
2995                + . . 5 10
2996                - . . 5 10"
2997            )
2998        );
2999
3000        Ok(())
3001    }
3002
3003    #[tokio::test]
3004    async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3005        let chunk_l1 = StreamChunk::from_pretty(
3006            "  I I I
3007             + 1 4 1
3008             + 2 5 2
3009             + 3 6 3",
3010        );
3011        let chunk_l2 = StreamChunk::from_pretty(
3012            "  I I I
3013             + 4 9 4
3014             + 5 10 5",
3015        );
3016        let chunk_r1 = StreamChunk::from_pretty(
3017            "  I I I
3018             + 2 5 1
3019             + 4 9 2
3020             + 6 9 3",
3021        );
3022        let chunk_r2 = StreamChunk::from_pretty(
3023            "  I I I
3024             + 1 4 4
3025             + 3 6 5",
3026        );
3027
3028        let (mut tx_l, mut tx_r, mut hash_join) =
3029            create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3030
3031        // push the init barrier for left and right
3032        tx_l.push_barrier(test_epoch(1), false);
3033        tx_r.push_barrier(test_epoch(1), false);
3034        hash_join.next_unwrap_ready_barrier()?;
3035
3036        // push the 1st left chunk
3037        tx_l.push_chunk(chunk_l1);
3038        let chunk = hash_join.next_unwrap_ready_chunk()?;
3039        assert_eq!(
3040            chunk,
3041            StreamChunk::from_pretty(
3042                " I I I I I I
3043                + 1 4 1 . . .
3044                + 2 5 2 . . .
3045                + 3 6 3 . . ."
3046            )
3047        );
3048
3049        // push the 2nd left chunk
3050        tx_l.push_chunk(chunk_l2);
3051        let chunk = hash_join.next_unwrap_ready_chunk()?;
3052        assert_eq!(
3053            chunk,
3054            StreamChunk::from_pretty(
3055                " I I I I I I
3056                + 4 9 4 . . .
3057                + 5 10 5 . . ."
3058            )
3059        );
3060
3061        // push the 1st right chunk
3062        tx_r.push_chunk(chunk_r1);
3063        let chunk = hash_join.next_unwrap_ready_chunk()?;
3064        assert_eq!(
3065            chunk,
3066            StreamChunk::from_pretty(
3067                "  I I I I I I
3068                U- 2 5 2 . . .
3069                U+ 2 5 2 2 5 1
3070                U- 4 9 4 . . .
3071                U+ 4 9 4 4 9 2"
3072            )
3073        );
3074
3075        // push the 2nd right chunk
3076        tx_r.push_chunk(chunk_r2);
3077        let chunk = hash_join.next_unwrap_ready_chunk()?;
3078        assert_eq!(
3079            chunk,
3080            StreamChunk::from_pretty(
3081                "  I I I I I I
3082                U- 1 4 1 . . .
3083                U+ 1 4 1 1 4 4
3084                U- 3 6 3 . . .
3085                U+ 3 6 3 3 6 5"
3086            )
3087        );
3088
3089        Ok(())
3090    }
3091
3092    #[tokio::test]
3093    async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3094        let chunk_l1 = StreamChunk::from_pretty(
3095            "  I I I
3096             + 1 4 1
3097             + 2 5 2
3098             + 3 6 3",
3099        );
3100        let chunk_l2 = StreamChunk::from_pretty(
3101            "  I I I
3102             + 4 9 4
3103             + 5 10 5",
3104        );
3105        let chunk_r1 = StreamChunk::from_pretty(
3106            "  I I I
3107             + 2 5 1
3108             + 4 9 2
3109             + 6 9 3",
3110        );
3111        let chunk_r2 = StreamChunk::from_pretty(
3112            "  I I I
3113             + 1 4 4
3114             + 3 6 5
3115             + 7 7 6",
3116        );
3117
3118        let (mut tx_l, mut tx_r, mut hash_join) =
3119            create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3120
3121        // push the init barrier for left and right
3122        tx_l.push_barrier(test_epoch(1), false);
3123        tx_r.push_barrier(test_epoch(1), false);
3124        hash_join.next_unwrap_ready_barrier()?;
3125
3126        // push the 1st left chunk
3127        tx_l.push_chunk(chunk_l1);
3128        hash_join.next_unwrap_pending();
3129
3130        // push the 2nd left chunk
3131        tx_l.push_chunk(chunk_l2);
3132        hash_join.next_unwrap_pending();
3133
3134        // push the 1st right chunk
3135        tx_r.push_chunk(chunk_r1);
3136        let chunk = hash_join.next_unwrap_ready_chunk()?;
3137        assert_eq!(
3138            chunk,
3139            StreamChunk::from_pretty(
3140                "  I I I I I I
3141                + 2 5 2 2 5 1
3142                + 4 9 4 4 9 2
3143                + . . . 6 9 3"
3144            )
3145        );
3146
3147        // push the 2nd right chunk
3148        tx_r.push_chunk(chunk_r2);
3149        let chunk = hash_join.next_unwrap_ready_chunk()?;
3150        assert_eq!(
3151            chunk,
3152            StreamChunk::from_pretty(
3153                "  I I I I I I
3154                + 1 4 1 1 4 4
3155                + 3 6 3 3 6 5
3156                + . . . 7 7 6"
3157            )
3158        );
3159
3160        Ok(())
3161    }
3162
3163    #[tokio::test]
3164    async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3165        let chunk_l1 = StreamChunk::from_pretty(
3166            "  I I
3167             + 1 4
3168             + 2 5
3169             + 3 6",
3170        );
3171        let chunk_l2 = StreamChunk::from_pretty(
3172            "  I I
3173             + 3 8
3174             - 3 8",
3175        );
3176        let chunk_r1 = StreamChunk::from_pretty(
3177            "  I I
3178             + 2 7
3179             + 4 8
3180             + 6 9",
3181        );
3182        let chunk_r2 = StreamChunk::from_pretty(
3183            "  I  I
3184             + 5 10
3185             - 5 10",
3186        );
3187        let (mut tx_l, mut tx_r, mut hash_join) =
3188            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3189
3190        // push the init barrier for left and right
3191        tx_l.push_barrier(test_epoch(1), false);
3192        tx_r.push_barrier(test_epoch(1), false);
3193        hash_join.next_unwrap_ready_barrier()?;
3194
3195        // push the 1st left chunk
3196        tx_l.push_chunk(chunk_l1);
3197        let chunk = hash_join.next_unwrap_ready_chunk()?;
3198        assert_eq!(
3199            chunk,
3200            StreamChunk::from_pretty(
3201                " I I I I
3202                + 1 4 . .
3203                + 2 5 . .
3204                + 3 6 . ."
3205            )
3206        );
3207
3208        // push the 2nd left chunk
3209        tx_l.push_chunk(chunk_l2);
3210        let chunk = hash_join.next_unwrap_ready_chunk()?;
3211        assert_eq!(
3212            chunk,
3213            StreamChunk::from_pretty(
3214                " I I I I
3215                + 3 8 . .
3216                - 3 8 . ."
3217            )
3218        );
3219
3220        // push the 1st right chunk
3221        tx_r.push_chunk(chunk_r1);
3222        let chunk = hash_join.next_unwrap_ready_chunk()?;
3223        assert_eq!(
3224            chunk,
3225            StreamChunk::from_pretty(
3226                "  I I I I
3227                U-  2 5 . .
3228                U+  2 5 2 7
3229                +  . . 4 8
3230                +  . . 6 9"
3231            )
3232        );
3233
3234        // push the 2nd right chunk
3235        tx_r.push_chunk(chunk_r2);
3236        let chunk = hash_join.next_unwrap_ready_chunk()?;
3237        assert_eq!(
3238            chunk,
3239            StreamChunk::from_pretty(
3240                " I I I I
3241                + . . 5 10
3242                - . . 5 10"
3243            )
3244        );
3245
3246        Ok(())
3247    }
3248
3249    #[tokio::test]
3250    async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3251        let (mut tx_l, mut tx_r, mut hash_join) =
3252            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3253
3254        // push the init barrier for left and right
3255        tx_l.push_barrier(test_epoch(1), false);
3256        tx_r.push_barrier(test_epoch(1), false);
3257        hash_join.next_unwrap_ready_barrier()?;
3258
3259        tx_l.push_chunk(StreamChunk::from_pretty(
3260            "  I I
3261             + 1 1
3262            ",
3263        ));
3264        let chunk = hash_join.next_unwrap_ready_chunk()?;
3265        assert_eq!(
3266            chunk,
3267            StreamChunk::from_pretty(
3268                " I I I I
3269                + 1 1 . ."
3270            )
3271        );
3272
3273        tx_r.push_chunk(StreamChunk::from_pretty(
3274            "  I I
3275             + 1 1
3276            ",
3277        ));
3278        let chunk = hash_join.next_unwrap_ready_chunk()?;
3279
3280        assert_eq!(
3281            chunk,
3282            StreamChunk::from_pretty(
3283                " I I I I
3284                U- 1 1 . .
3285                U+ 1 1 1 1"
3286            )
3287        );
3288
3289        tx_l.push_chunk(StreamChunk::from_pretty(
3290            "   I I
3291              - 1 1
3292              + 1 2
3293            ",
3294        ));
3295        let chunk = hash_join.next_unwrap_ready_chunk()?;
3296        let chunk = chunk.compact();
3297        assert_eq!(
3298            chunk,
3299            StreamChunk::from_pretty(
3300                " I I I I
3301                - 1 1 1 1
3302                + 1 2 1 1
3303                "
3304            )
3305        );
3306
3307        Ok(())
3308    }
3309
3310    #[tokio::test]
3311    async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3312    {
3313        let chunk_l1 = StreamChunk::from_pretty(
3314            "  I I
3315             + 1 4
3316             + 2 5
3317             + 3 6
3318             + 3 7",
3319        );
3320        let chunk_l2 = StreamChunk::from_pretty(
3321            "  I I
3322             + 3 8
3323             - 3 8
3324             - 1 4", // delete row to cause an empty JoinHashEntry
3325        );
3326        let chunk_r1 = StreamChunk::from_pretty(
3327            "  I I
3328             + 2 6
3329             + 4 8
3330             + 3 4",
3331        );
3332        let chunk_r2 = StreamChunk::from_pretty(
3333            "  I  I
3334             + 5 10
3335             - 5 10
3336             + 1 2",
3337        );
3338        let (mut tx_l, mut tx_r, mut hash_join) =
3339            create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3340
3341        // push the init barrier for left and right
3342        tx_l.push_barrier(test_epoch(1), false);
3343        tx_r.push_barrier(test_epoch(1), false);
3344        hash_join.next_unwrap_ready_barrier()?;
3345
3346        // push the 1st left chunk
3347        tx_l.push_chunk(chunk_l1);
3348        let chunk = hash_join.next_unwrap_ready_chunk()?;
3349        assert_eq!(
3350            chunk,
3351            StreamChunk::from_pretty(
3352                " I I I I
3353                + 1 4 . .
3354                + 2 5 . .
3355                + 3 6 . .
3356                + 3 7 . ."
3357            )
3358        );
3359
3360        // push the 2nd left chunk
3361        tx_l.push_chunk(chunk_l2);
3362        let chunk = hash_join.next_unwrap_ready_chunk()?;
3363        assert_eq!(
3364            chunk,
3365            StreamChunk::from_pretty(
3366                " I I I I
3367                + 3 8 . .
3368                - 3 8 . .
3369                - 1 4 . ."
3370            )
3371        );
3372
3373        // push the 1st right chunk
3374        tx_r.push_chunk(chunk_r1);
3375        let chunk = hash_join.next_unwrap_ready_chunk()?;
3376        assert_eq!(
3377            chunk,
3378            StreamChunk::from_pretty(
3379                "  I I I I
3380                U-  2 5 . .
3381                U+  2 5 2 6
3382                +  . . 4 8
3383                +  . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once
3384                             * despite matching on eq join on 2
3385                             * entries */
3386            )
3387        );
3388
3389        // push the 2nd right chunk
3390        tx_r.push_chunk(chunk_r2);
3391        let chunk = hash_join.next_unwrap_ready_chunk()?;
3392        assert_eq!(
3393            chunk,
3394            StreamChunk::from_pretty(
3395                " I I I I
3396                + . . 5 10
3397                - . . 5 10
3398                + . . 1 2" /* regression test (#2420): 1 2 forwarded even if matches on an empty
3399                            * join entry */
3400            )
3401        );
3402
3403        Ok(())
3404    }
3405
3406    #[tokio::test]
3407    async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3408        let chunk_l1 = StreamChunk::from_pretty(
3409            "  I I
3410             + 1 4
3411             + 2 10
3412             + 3 6",
3413        );
3414        let chunk_l2 = StreamChunk::from_pretty(
3415            "  I I
3416             + 3 8
3417             - 3 8",
3418        );
3419        let chunk_r1 = StreamChunk::from_pretty(
3420            "  I I
3421             + 2 7
3422             + 4 8
3423             + 6 9",
3424        );
3425        let chunk_r2 = StreamChunk::from_pretty(
3426            "  I  I
3427             + 3 10
3428             + 6 11",
3429        );
3430        let (mut tx_l, mut tx_r, mut hash_join) =
3431            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3432
3433        // push the init barrier for left and right
3434        tx_l.push_barrier(test_epoch(1), false);
3435        tx_r.push_barrier(test_epoch(1), false);
3436        hash_join.next_unwrap_ready_barrier()?;
3437
3438        // push the 1st left chunk
3439        tx_l.push_chunk(chunk_l1);
3440        hash_join.next_unwrap_pending();
3441
3442        // push the 2nd left chunk
3443        tx_l.push_chunk(chunk_l2);
3444        hash_join.next_unwrap_pending();
3445
3446        // push the 1st right chunk
3447        tx_r.push_chunk(chunk_r1);
3448        hash_join.next_unwrap_pending();
3449
3450        // push the 2nd right chunk
3451        tx_r.push_chunk(chunk_r2);
3452        let chunk = hash_join.next_unwrap_ready_chunk()?;
3453        assert_eq!(
3454            chunk,
3455            StreamChunk::from_pretty(
3456                " I I I I
3457                + 3 6 3 10"
3458            )
3459        );
3460
3461        Ok(())
3462    }
3463
3464    #[tokio::test]
3465    async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3466        let (mut tx_l, mut tx_r, mut hash_join) =
3467            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3468
3469        // push the init barrier for left and right
3470        tx_l.push_barrier(test_epoch(1), false);
3471        tx_r.push_barrier(test_epoch(1), false);
3472        hash_join.next_unwrap_ready_barrier()?;
3473
3474        tx_l.push_int64_watermark(0, 100);
3475
3476        tx_l.push_int64_watermark(0, 200);
3477
3478        tx_l.push_barrier(test_epoch(2), false);
3479        tx_r.push_barrier(test_epoch(2), false);
3480        hash_join.next_unwrap_ready_barrier()?;
3481
3482        tx_r.push_int64_watermark(0, 50);
3483
3484        let w1 = hash_join.next().await.unwrap().unwrap();
3485        let w1 = w1.as_watermark().unwrap();
3486
3487        let w2 = hash_join.next().await.unwrap().unwrap();
3488        let w2 = w2.as_watermark().unwrap();
3489
3490        tx_r.push_int64_watermark(0, 100);
3491
3492        let w3 = hash_join.next().await.unwrap().unwrap();
3493        let w3 = w3.as_watermark().unwrap();
3494
3495        let w4 = hash_join.next().await.unwrap().unwrap();
3496        let w4 = w4.as_watermark().unwrap();
3497
3498        assert_eq!(
3499            w1,
3500            &Watermark {
3501                col_idx: 2,
3502                data_type: DataType::Int64,
3503                val: ScalarImpl::Int64(50)
3504            }
3505        );
3506
3507        assert_eq!(
3508            w2,
3509            &Watermark {
3510                col_idx: 0,
3511                data_type: DataType::Int64,
3512                val: ScalarImpl::Int64(50)
3513            }
3514        );
3515
3516        assert_eq!(
3517            w3,
3518            &Watermark {
3519                col_idx: 2,
3520                data_type: DataType::Int64,
3521                val: ScalarImpl::Int64(100)
3522            }
3523        );
3524
3525        assert_eq!(
3526            w4,
3527            &Watermark {
3528                col_idx: 0,
3529                data_type: DataType::Int64,
3530                val: ScalarImpl::Int64(100)
3531            }
3532        );
3533
3534        Ok(())
3535    }
3536}