risingwave_stream/executor/
hash_join.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{BTreeMap, HashSet};
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use anyhow::Context;
21use either::Either;
22use itertools::Itertools;
23use multimap::MultiMap;
24use risingwave_common::array::Op;
25use risingwave_common::hash::{HashKey, NullBitmap};
26use risingwave_common::row::RowExt;
27use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::iter_util::ZipEqDebug;
30use risingwave_expr::ExprError;
31use risingwave_expr::expr::NonStrictExpression;
32use tokio::time::Instant;
33
34use self::builder::JoinChunkBuilder;
35use super::barrier_align::*;
36use super::join::hash_join::*;
37use super::join::row::{JoinEncoding, JoinRow};
38use super::join::*;
39use super::watermark::*;
40use crate::executor::CachedJoinRow;
41use crate::executor::join::builder::JoinStreamChunkBuilder;
42use crate::executor::join::hash_join::CacheResult;
43use crate::executor::prelude::*;
44
45/// Evict the cache every n rows.
46const EVICT_EVERY_N_ROWS: u32 = 16;
47
48fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
49    HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
50}
51
52pub struct JoinParams {
53    /// Indices of the join keys
54    pub join_key_indices: Vec<usize>,
55    /// Indices of the input pk after dedup
56    pub deduped_pk_indices: Vec<usize>,
57}
58
59impl JoinParams {
60    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
61        Self {
62            join_key_indices,
63            deduped_pk_indices,
64        }
65    }
66}
67
68struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
69    /// Store all data from a one side stream
70    ht: JoinHashMap<K, S, E>,
71    /// Indices of the join key columns
72    join_key_indices: Vec<usize>,
73    /// The data type of all columns without degree.
74    all_data_types: Vec<DataType>,
75    /// The start position for the side in output new columns
76    start_pos: usize,
77    /// The mapping from input indices of a side to output columns.
78    i2o_mapping: Vec<(usize, usize)>,
79    i2o_mapping_indexed: MultiMap<usize, usize>,
80    /// The first field of the ith element indicates that when a watermark at the ith column of
81    /// this side comes, what band join conditions should be updated in order to possibly
82    /// generate a new watermark at that column or the corresponding column in the counterpart
83    /// join side.
84    ///
85    /// The second field indicates that whether the column is required less than the
86    /// the corresponding column in the counterpart join side in the band join condition.
87    input2inequality_index: Vec<Vec<(usize, bool)>>,
88    /// Some fields which are required non null to match due to inequalities.
89    non_null_fields: Vec<usize>,
90    /// (i, j) in this `Vec` means that state data in this join side can be cleaned if the value of
91    /// its ith column is less than the synthetic watermark of the jth band join condition.
92    state_clean_columns: Vec<(usize, usize)>,
93    /// Whether degree table is needed for this side.
94    need_degree_table: bool,
95    _marker: std::marker::PhantomData<E>,
96}
97
98impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        f.debug_struct("JoinSide")
101            .field("join_key_indices", &self.join_key_indices)
102            .field("col_types", &self.all_data_types)
103            .field("start_pos", &self.start_pos)
104            .field("i2o_mapping", &self.i2o_mapping)
105            .field("need_degree_table", &self.need_degree_table)
106            .finish()
107    }
108}
109
110impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
111    // WARNING: Please do not call this until we implement it.
112    fn is_dirty(&self) -> bool {
113        unimplemented!()
114    }
115
116    #[expect(dead_code)]
117    fn clear_cache(&mut self) {
118        assert!(
119            !self.is_dirty(),
120            "cannot clear cache while states of hash join are dirty"
121        );
122
123        // TODO: not working with rearranged chain
124        // self.ht.clear();
125    }
126
127    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
128        self.ht.init(epoch).await
129    }
130}
131
132/// `HashJoinExecutor` takes two input streams and runs equal hash join on them.
133/// The output columns are the concatenation of left and right columns.
134pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
135{
136    ctx: ActorContextRef,
137    info: ExecutorInfo,
138
139    /// Left input executor
140    input_l: Option<Executor>,
141    /// Right input executor
142    input_r: Option<Executor>,
143    /// The data types of the formed new columns
144    actual_output_data_types: Vec<DataType>,
145    /// The parameters of the left join executor
146    side_l: JoinSide<K, S, E>,
147    /// The parameters of the right join executor
148    side_r: JoinSide<K, S, E>,
149    /// Optional non-equi join conditions
150    cond: Option<NonStrictExpression>,
151    /// Column indices of watermark output and offset expression of each inequality, respectively.
152    inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
153    /// The output watermark of each inequality condition and its value is the minimum of the
154    /// calculation result of both side. It will be used to generate watermark into downstream
155    /// and do state cleaning if `clean_state` field of that inequality is `true`.
156    inequality_watermarks: Vec<Option<Watermark>>,
157
158    /// Whether the logic can be optimized for append-only stream
159    append_only_optimize: bool,
160
161    metrics: Arc<StreamingMetrics>,
162    /// The maximum size of the chunk produced by executor at a time
163    chunk_size: usize,
164    /// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
165    cnt_rows_received: u32,
166
167    /// watermark column index -> `BufferedWatermarks`
168    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
169
170    /// When to alert high join amplification
171    high_join_amplification_threshold: usize,
172
173    /// Max number of rows that will be cached in the entry state.
174    entry_state_max_rows: usize,
175}
176
177impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
178    for HashJoinExecutor<K, S, T, E>
179{
180    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181        f.debug_struct("HashJoinExecutor")
182            .field("join_type", &T)
183            .field("input_left", &self.input_l.as_ref().unwrap().identity())
184            .field("input_right", &self.input_r.as_ref().unwrap().identity())
185            .field("side_l", &self.side_l)
186            .field("side_r", &self.side_r)
187            .field("stream_key", &self.info.stream_key)
188            .field("schema", &self.info.schema)
189            .field("actual_output_data_types", &self.actual_output_data_types)
190            .finish()
191    }
192}
193
194impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
195    for HashJoinExecutor<K, S, T, E>
196{
197    fn execute(self: Box<Self>) -> BoxedMessageStream {
198        self.into_stream().boxed()
199    }
200}
201
202struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
203    ctx: &'a ActorContextRef,
204    side_l: &'a mut JoinSide<K, S, E>,
205    side_r: &'a mut JoinSide<K, S, E>,
206    actual_output_data_types: &'a [DataType],
207    cond: &'a mut Option<NonStrictExpression>,
208    inequality_watermarks: &'a [Option<Watermark>],
209    chunk: StreamChunk,
210    append_only_optimize: bool,
211    chunk_size: usize,
212    cnt_rows_received: &'a mut u32,
213    high_join_amplification_threshold: usize,
214    entry_state_max_rows: usize,
215}
216
217impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
218    HashJoinExecutor<K, S, T, E>
219{
220    #[allow(clippy::too_many_arguments)]
221    pub fn new(
222        ctx: ActorContextRef,
223        info: ExecutorInfo,
224        input_l: Executor,
225        input_r: Executor,
226        params_l: JoinParams,
227        params_r: JoinParams,
228        null_safe: Vec<bool>,
229        output_indices: Vec<usize>,
230        cond: Option<NonStrictExpression>,
231        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
232        state_table_l: StateTable<S>,
233        degree_state_table_l: StateTable<S>,
234        state_table_r: StateTable<S>,
235        degree_state_table_r: StateTable<S>,
236        watermark_epoch: AtomicU64Ref,
237        is_append_only: bool,
238        metrics: Arc<StreamingMetrics>,
239        chunk_size: usize,
240        high_join_amplification_threshold: usize,
241    ) -> Self {
242        Self::new_with_cache_size(
243            ctx,
244            info,
245            input_l,
246            input_r,
247            params_l,
248            params_r,
249            null_safe,
250            output_indices,
251            cond,
252            inequality_pairs,
253            state_table_l,
254            degree_state_table_l,
255            state_table_r,
256            degree_state_table_r,
257            watermark_epoch,
258            is_append_only,
259            metrics,
260            chunk_size,
261            high_join_amplification_threshold,
262            None,
263        )
264    }
265
266    #[allow(clippy::too_many_arguments)]
267    pub fn new_with_cache_size(
268        ctx: ActorContextRef,
269        info: ExecutorInfo,
270        input_l: Executor,
271        input_r: Executor,
272        params_l: JoinParams,
273        params_r: JoinParams,
274        null_safe: Vec<bool>,
275        output_indices: Vec<usize>,
276        cond: Option<NonStrictExpression>,
277        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
278        state_table_l: StateTable<S>,
279        degree_state_table_l: StateTable<S>,
280        state_table_r: StateTable<S>,
281        degree_state_table_r: StateTable<S>,
282        watermark_epoch: AtomicU64Ref,
283        is_append_only: bool,
284        metrics: Arc<StreamingMetrics>,
285        chunk_size: usize,
286        high_join_amplification_threshold: usize,
287        entry_state_max_rows: Option<usize>,
288    ) -> Self {
289        let entry_state_max_rows = match entry_state_max_rows {
290            None => ctx.config.developer.hash_join_entry_state_max_rows,
291            Some(entry_state_max_rows) => entry_state_max_rows,
292        };
293        let side_l_column_n = input_l.schema().len();
294
295        let schema_fields = match T {
296            JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
297            JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
298            _ => [
299                input_l.schema().fields.clone(),
300                input_r.schema().fields.clone(),
301            ]
302            .concat(),
303        };
304
305        let original_output_data_types = schema_fields
306            .iter()
307            .map(|field| field.data_type())
308            .collect_vec();
309        let actual_output_data_types = output_indices
310            .iter()
311            .map(|&idx| original_output_data_types[idx].clone())
312            .collect_vec();
313
314        // Data types of of hash join state.
315        let state_all_data_types_l = input_l.schema().data_types();
316        let state_all_data_types_r = input_r.schema().data_types();
317
318        let state_pk_indices_l = input_l.stream_key().to_vec();
319        let state_pk_indices_r = input_r.stream_key().to_vec();
320
321        let state_join_key_indices_l = params_l.join_key_indices;
322        let state_join_key_indices_r = params_r.join_key_indices;
323
324        let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
325        let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
326
327        let degree_pk_indices_l = (state_join_key_indices_l.len()
328            ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
329            .collect_vec();
330        let degree_pk_indices_r = (state_join_key_indices_r.len()
331            ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
332            .collect_vec();
333
334        // If pk is contained in join key.
335        let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
336        let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
337
338        // check whether join key contains pk in both side
339        let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
340
341        let join_key_data_types_l = state_join_key_indices_l
342            .iter()
343            .map(|idx| state_all_data_types_l[*idx].clone())
344            .collect_vec();
345
346        let join_key_data_types_r = state_join_key_indices_r
347            .iter()
348            .map(|idx| state_all_data_types_r[*idx].clone())
349            .collect_vec();
350
351        assert_eq!(join_key_data_types_l, join_key_data_types_r);
352
353        let null_matched = K::Bitmap::from_bool_vec(null_safe);
354
355        let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
356        let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
357
358        let degree_state_l = need_degree_table_l.then(|| {
359            TableInner::new(
360                degree_pk_indices_l,
361                degree_join_key_indices_l,
362                degree_state_table_l,
363            )
364        });
365        let degree_state_r = need_degree_table_r.then(|| {
366            TableInner::new(
367                degree_pk_indices_r,
368                degree_join_key_indices_r,
369                degree_state_table_r,
370            )
371        });
372
373        let (left_to_output, right_to_output) = {
374            let (left_len, right_len) = if is_left_semi_or_anti(T) {
375                (state_all_data_types_l.len(), 0usize)
376            } else if is_right_semi_or_anti(T) {
377                (0usize, state_all_data_types_r.len())
378            } else {
379                (state_all_data_types_l.len(), state_all_data_types_r.len())
380            };
381            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
382        };
383
384        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
385        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
386
387        let left_input_len = input_l.schema().len();
388        let right_input_len = input_r.schema().len();
389        let mut l2inequality_index = vec![vec![]; left_input_len];
390        let mut r2inequality_index = vec![vec![]; right_input_len];
391        let mut l_state_clean_columns = vec![];
392        let mut r_state_clean_columns = vec![];
393        let inequality_pairs = inequality_pairs
394            .into_iter()
395            .enumerate()
396            .map(
397                |(
398                    index,
399                    (key_required_larger, key_required_smaller, clean_state, delta_expression),
400                )| {
401                    let output_indices = if key_required_larger < key_required_smaller {
402                        if clean_state {
403                            l_state_clean_columns.push((key_required_larger, index));
404                        }
405                        l2inequality_index[key_required_larger].push((index, false));
406                        r2inequality_index[key_required_smaller - left_input_len]
407                            .push((index, true));
408                        l2o_indexed
409                            .get_vec(&key_required_larger)
410                            .cloned()
411                            .unwrap_or_default()
412                    } else {
413                        if clean_state {
414                            r_state_clean_columns
415                                .push((key_required_larger - left_input_len, index));
416                        }
417                        l2inequality_index[key_required_smaller].push((index, true));
418                        r2inequality_index[key_required_larger - left_input_len]
419                            .push((index, false));
420                        r2o_indexed
421                            .get_vec(&(key_required_larger - left_input_len))
422                            .cloned()
423                            .unwrap_or_default()
424                    };
425                    (output_indices, delta_expression)
426                },
427            )
428            .collect_vec();
429
430        let mut l_non_null_fields = l2inequality_index
431            .iter()
432            .positions(|inequalities| !inequalities.is_empty())
433            .collect_vec();
434        let mut r_non_null_fields = r2inequality_index
435            .iter()
436            .positions(|inequalities| !inequalities.is_empty())
437            .collect_vec();
438
439        if append_only_optimize {
440            l_state_clean_columns.clear();
441            r_state_clean_columns.clear();
442            l_non_null_fields.clear();
443            r_non_null_fields.clear();
444        }
445
446        let inequality_watermarks = vec![None; inequality_pairs.len()];
447        let watermark_buffers = BTreeMap::new();
448
449        Self {
450            ctx: ctx.clone(),
451            info,
452            input_l: Some(input_l),
453            input_r: Some(input_r),
454            actual_output_data_types,
455            side_l: JoinSide {
456                ht: JoinHashMap::new(
457                    watermark_epoch.clone(),
458                    join_key_data_types_l,
459                    state_join_key_indices_l.clone(),
460                    state_all_data_types_l.clone(),
461                    state_table_l,
462                    params_l.deduped_pk_indices,
463                    degree_state_l,
464                    null_matched.clone(),
465                    pk_contained_in_jk_l,
466                    None,
467                    metrics.clone(),
468                    ctx.id,
469                    ctx.fragment_id,
470                    "left",
471                ),
472                join_key_indices: state_join_key_indices_l,
473                all_data_types: state_all_data_types_l,
474                i2o_mapping: left_to_output,
475                i2o_mapping_indexed: l2o_indexed,
476                input2inequality_index: l2inequality_index,
477                non_null_fields: l_non_null_fields,
478                state_clean_columns: l_state_clean_columns,
479                start_pos: 0,
480                need_degree_table: need_degree_table_l,
481                _marker: PhantomData,
482            },
483            side_r: JoinSide {
484                ht: JoinHashMap::new(
485                    watermark_epoch,
486                    join_key_data_types_r,
487                    state_join_key_indices_r.clone(),
488                    state_all_data_types_r.clone(),
489                    state_table_r,
490                    params_r.deduped_pk_indices,
491                    degree_state_r,
492                    null_matched,
493                    pk_contained_in_jk_r,
494                    None,
495                    metrics.clone(),
496                    ctx.id,
497                    ctx.fragment_id,
498                    "right",
499                ),
500                join_key_indices: state_join_key_indices_r,
501                all_data_types: state_all_data_types_r,
502                start_pos: side_l_column_n,
503                i2o_mapping: right_to_output,
504                i2o_mapping_indexed: r2o_indexed,
505                input2inequality_index: r2inequality_index,
506                non_null_fields: r_non_null_fields,
507                state_clean_columns: r_state_clean_columns,
508                need_degree_table: need_degree_table_r,
509                _marker: PhantomData,
510            },
511            cond,
512            inequality_pairs,
513            inequality_watermarks,
514            append_only_optimize,
515            metrics,
516            chunk_size,
517            cnt_rows_received: 0,
518            watermark_buffers,
519            high_join_amplification_threshold,
520            entry_state_max_rows,
521        }
522    }
523
524    #[try_stream(ok = Message, error = StreamExecutorError)]
525    async fn into_stream(mut self) {
526        let input_l = self.input_l.take().unwrap();
527        let input_r = self.input_r.take().unwrap();
528        let aligned_stream = barrier_align(
529            input_l.execute(),
530            input_r.execute(),
531            self.ctx.id,
532            self.ctx.fragment_id,
533            self.metrics.clone(),
534            "Join",
535        );
536        pin_mut!(aligned_stream);
537
538        let actor_id = self.ctx.id;
539
540        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
541        let first_epoch = barrier.epoch;
542        // The first barrier message should be propagated.
543        yield Message::Barrier(barrier);
544        self.side_l.init(first_epoch).await?;
545        self.side_r.init(first_epoch).await?;
546
547        let actor_id_str = self.ctx.id.to_string();
548        let fragment_id_str = self.ctx.fragment_id.to_string();
549
550        // initialized some metrics
551        let join_actor_input_waiting_duration_ns = self
552            .metrics
553            .join_actor_input_waiting_duration_ns
554            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
555        let left_join_match_duration_ns = self
556            .metrics
557            .join_match_duration_ns
558            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
559        let right_join_match_duration_ns = self
560            .metrics
561            .join_match_duration_ns
562            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
563
564        let barrier_join_match_duration_ns = self
565            .metrics
566            .join_match_duration_ns
567            .with_guarded_label_values(&[
568                actor_id_str.as_str(),
569                fragment_id_str.as_str(),
570                "barrier",
571            ]);
572
573        let left_join_cached_entry_count = self
574            .metrics
575            .join_cached_entry_count
576            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
577
578        let right_join_cached_entry_count = self
579            .metrics
580            .join_cached_entry_count
581            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
582
583        let mut start_time = Instant::now();
584
585        while let Some(msg) = aligned_stream
586            .next()
587            .instrument_await("hash_join_barrier_align")
588            .await
589        {
590            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
591            match msg? {
592                AlignedMessage::WatermarkLeft(watermark) => {
593                    for watermark_to_emit in
594                        self.handle_watermark(SideType::Left, watermark).await?
595                    {
596                        yield Message::Watermark(watermark_to_emit);
597                    }
598                }
599                AlignedMessage::WatermarkRight(watermark) => {
600                    for watermark_to_emit in
601                        self.handle_watermark(SideType::Right, watermark).await?
602                    {
603                        yield Message::Watermark(watermark_to_emit);
604                    }
605                }
606                AlignedMessage::Left(chunk) => {
607                    let mut left_time = Duration::from_nanos(0);
608                    let mut left_start_time = Instant::now();
609                    #[for_await]
610                    for chunk in Self::eq_join_left(EqJoinArgs {
611                        ctx: &self.ctx,
612                        side_l: &mut self.side_l,
613                        side_r: &mut self.side_r,
614                        actual_output_data_types: &self.actual_output_data_types,
615                        cond: &mut self.cond,
616                        inequality_watermarks: &self.inequality_watermarks,
617                        chunk,
618                        append_only_optimize: self.append_only_optimize,
619                        chunk_size: self.chunk_size,
620                        cnt_rows_received: &mut self.cnt_rows_received,
621                        high_join_amplification_threshold: self.high_join_amplification_threshold,
622                        entry_state_max_rows: self.entry_state_max_rows,
623                    }) {
624                        left_time += left_start_time.elapsed();
625                        yield Message::Chunk(chunk?);
626                        left_start_time = Instant::now();
627                    }
628                    left_time += left_start_time.elapsed();
629                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
630                    self.try_flush_data().await?;
631                }
632                AlignedMessage::Right(chunk) => {
633                    let mut right_time = Duration::from_nanos(0);
634                    let mut right_start_time = Instant::now();
635                    #[for_await]
636                    for chunk in Self::eq_join_right(EqJoinArgs {
637                        ctx: &self.ctx,
638                        side_l: &mut self.side_l,
639                        side_r: &mut self.side_r,
640                        actual_output_data_types: &self.actual_output_data_types,
641                        cond: &mut self.cond,
642                        inequality_watermarks: &self.inequality_watermarks,
643                        chunk,
644                        append_only_optimize: self.append_only_optimize,
645                        chunk_size: self.chunk_size,
646                        cnt_rows_received: &mut self.cnt_rows_received,
647                        high_join_amplification_threshold: self.high_join_amplification_threshold,
648                        entry_state_max_rows: self.entry_state_max_rows,
649                    }) {
650                        right_time += right_start_time.elapsed();
651                        yield Message::Chunk(chunk?);
652                        right_start_time = Instant::now();
653                    }
654                    right_time += right_start_time.elapsed();
655                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
656                    self.try_flush_data().await?;
657                }
658                AlignedMessage::Barrier(barrier) => {
659                    let barrier_start_time = Instant::now();
660                    let (left_post_commit, right_post_commit) =
661                        self.flush_data(barrier.epoch).await?;
662
663                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
664
665                    // We don't include the time post yielding barrier because the vnode update
666                    // is a one-off and rare operation.
667                    barrier_join_match_duration_ns
668                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
669                    yield Message::Barrier(barrier);
670
671                    // Update the vnode bitmap for state tables of both sides if asked.
672                    right_post_commit
673                        .post_yield_barrier(update_vnode_bitmap.clone())
674                        .await?;
675                    if left_post_commit
676                        .post_yield_barrier(update_vnode_bitmap)
677                        .await?
678                        .unwrap_or(false)
679                    {
680                        self.watermark_buffers
681                            .values_mut()
682                            .for_each(|buffers| buffers.clear());
683                        self.inequality_watermarks.fill(None);
684                    }
685
686                    // Report metrics of cached join rows/entries
687                    for (join_cached_entry_count, ht) in [
688                        (&left_join_cached_entry_count, &self.side_l.ht),
689                        (&right_join_cached_entry_count, &self.side_r.ht),
690                    ] {
691                        join_cached_entry_count.set(ht.entry_count() as i64);
692                    }
693                }
694            }
695            start_time = Instant::now();
696        }
697    }
698
699    async fn flush_data(
700        &mut self,
701        epoch: EpochPair,
702    ) -> StreamExecutorResult<(
703        JoinHashMapPostCommit<'_, K, S, E>,
704        JoinHashMapPostCommit<'_, K, S, E>,
705    )> {
706        // All changes to the state has been buffered in the mem-table of the state table. Just
707        // `commit` them here.
708        let left = self.side_l.ht.flush(epoch).await?;
709        let right = self.side_r.ht.flush(epoch).await?;
710        Ok((left, right))
711    }
712
713    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
714        // All changes to the state has been buffered in the mem-table of the state table. Just
715        // `commit` them here.
716        self.side_l.ht.try_flush().await?;
717        self.side_r.ht.try_flush().await?;
718        Ok(())
719    }
720
721    // We need to manually evict the cache.
722    fn evict_cache(
723        side_update: &mut JoinSide<K, S, E>,
724        side_match: &mut JoinSide<K, S, E>,
725        cnt_rows_received: &mut u32,
726    ) {
727        *cnt_rows_received += 1;
728        if *cnt_rows_received == EVICT_EVERY_N_ROWS {
729            side_update.ht.evict();
730            side_match.ht.evict();
731            *cnt_rows_received = 0;
732        }
733    }
734
735    async fn handle_watermark(
736        &mut self,
737        side: SideTypePrimitive,
738        watermark: Watermark,
739    ) -> StreamExecutorResult<Vec<Watermark>> {
740        let (side_update, side_match) = if side == SideType::Left {
741            (&mut self.side_l, &mut self.side_r)
742        } else {
743            (&mut self.side_r, &mut self.side_l)
744        };
745
746        // State cleaning
747        if side_update.join_key_indices[0] == watermark.col_idx {
748            side_match.ht.update_watermark(watermark.val.clone());
749        }
750
751        // Select watermarks to yield.
752        let wm_in_jk = side_update
753            .join_key_indices
754            .iter()
755            .positions(|idx| *idx == watermark.col_idx);
756        let mut watermarks_to_emit = vec![];
757        for idx in wm_in_jk {
758            let buffers = self
759                .watermark_buffers
760                .entry(idx)
761                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
762            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
763                let empty_indices = vec![];
764                let output_indices = side_update
765                    .i2o_mapping_indexed
766                    .get_vec(&side_update.join_key_indices[idx])
767                    .unwrap_or(&empty_indices)
768                    .iter()
769                    .chain(
770                        side_match
771                            .i2o_mapping_indexed
772                            .get_vec(&side_match.join_key_indices[idx])
773                            .unwrap_or(&empty_indices),
774                    );
775                for output_idx in output_indices {
776                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
777                }
778            };
779        }
780        for (inequality_index, need_offset) in
781            &side_update.input2inequality_index[watermark.col_idx]
782        {
783            let buffers = self
784                .watermark_buffers
785                .entry(side_update.join_key_indices.len() + inequality_index)
786                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
787            let mut input_watermark = watermark.clone();
788            if *need_offset
789                && let Some(delta_expression) = self.inequality_pairs[*inequality_index].1.as_ref()
790            {
791                // allow since we will handle error manually.
792                #[allow(clippy::disallowed_methods)]
793                let eval_result = delta_expression
794                    .inner()
795                    .eval_row(&OwnedRow::new(vec![Some(input_watermark.val)]))
796                    .await;
797                match eval_result {
798                    Ok(value) => input_watermark.val = value.unwrap(),
799                    Err(err) => {
800                        if !matches!(err, ExprError::NumericOutOfRange) {
801                            self.ctx.on_compute_error(err, &self.info.identity);
802                        }
803                        continue;
804                    }
805                }
806            };
807            if let Some(selected_watermark) = buffers.handle_watermark(side, input_watermark) {
808                for output_idx in &self.inequality_pairs[*inequality_index].0 {
809                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
810                }
811                self.inequality_watermarks[*inequality_index] = Some(selected_watermark);
812            }
813        }
814        Ok(watermarks_to_emit)
815    }
816
817    fn row_concat(
818        row_update: impl Row,
819        update_start_pos: usize,
820        row_matched: impl Row,
821        matched_start_pos: usize,
822    ) -> OwnedRow {
823        let mut new_row = vec![None; row_update.len() + row_matched.len()];
824
825        for (i, datum_ref) in row_update.iter().enumerate() {
826            new_row[i + update_start_pos] = datum_ref.to_owned_datum();
827        }
828        for (i, datum_ref) in row_matched.iter().enumerate() {
829            new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
830        }
831        OwnedRow::new(new_row)
832    }
833
834    /// Used to forward `eq_join_oneside` to show join side in stack.
835    fn eq_join_left(
836        args: EqJoinArgs<'_, K, S, E>,
837    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
838        Self::eq_join_oneside::<{ SideType::Left }>(args)
839    }
840
841    /// Used to forward `eq_join_oneside` to show join side in stack.
842    fn eq_join_right(
843        args: EqJoinArgs<'_, K, S, E>,
844    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
845        Self::eq_join_oneside::<{ SideType::Right }>(args)
846    }
847
848    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
849    async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
850        let EqJoinArgs {
851            ctx,
852            side_l,
853            side_r,
854            actual_output_data_types,
855            cond,
856            inequality_watermarks,
857            chunk,
858            append_only_optimize,
859            chunk_size,
860            cnt_rows_received,
861            high_join_amplification_threshold,
862            entry_state_max_rows,
863            ..
864        } = args;
865
866        let (side_update, side_match) = if SIDE == SideType::Left {
867            (side_l, side_r)
868        } else {
869            (side_r, side_l)
870        };
871
872        let useful_state_clean_columns = side_match
873            .state_clean_columns
874            .iter()
875            .filter_map(|(column_idx, inequality_index)| {
876                inequality_watermarks[*inequality_index]
877                    .as_ref()
878                    .map(|watermark| (*column_idx, watermark))
879            })
880            .collect_vec();
881
882        let mut hashjoin_chunk_builder =
883            JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
884                chunk_size,
885                actual_output_data_types.to_vec(),
886                side_update.i2o_mapping.clone(),
887                side_match.i2o_mapping.clone(),
888            ));
889
890        let join_matched_join_keys = ctx
891            .streaming_metrics
892            .join_matched_join_keys
893            .with_guarded_label_values(&[
894                &ctx.id.to_string(),
895                &ctx.fragment_id.to_string(),
896                &side_update.ht.table_id().to_string(),
897            ]);
898
899        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
900        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
901            let Some((op, row)) = r else {
902                continue;
903            };
904            Self::evict_cache(side_update, side_match, cnt_rows_received);
905
906            let cache_lookup_result = {
907                let probe_non_null_requirement_satisfied = side_update
908                    .non_null_fields
909                    .iter()
910                    .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
911                let build_non_null_requirement_satisfied =
912                    key.null_bitmap().is_subset(side_match.ht.null_matched());
913                if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
914                    side_match.ht.take_state_opt(key)
915                } else {
916                    CacheResult::NeverMatch
917                }
918            };
919            let mut total_matches = 0;
920
921            macro_rules! match_rows {
922                ($op:ident) => {
923                    Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
924                        cache_lookup_result,
925                        row,
926                        key,
927                        &mut hashjoin_chunk_builder,
928                        side_match,
929                        side_update,
930                        &useful_state_clean_columns,
931                        cond,
932                        append_only_optimize,
933                        entry_state_max_rows,
934                    )
935                };
936            }
937
938            match op {
939                Op::Insert | Op::UpdateInsert =>
940                {
941                    #[for_await]
942                    for chunk in match_rows!(Insert) {
943                        let chunk = chunk?;
944                        total_matches += chunk.cardinality();
945                        yield chunk;
946                    }
947                }
948                Op::Delete | Op::UpdateDelete =>
949                {
950                    #[for_await]
951                    for chunk in match_rows!(Delete) {
952                        let chunk = chunk?;
953                        total_matches += chunk.cardinality();
954                        yield chunk;
955                    }
956                }
957            };
958
959            join_matched_join_keys.observe(total_matches as _);
960            if total_matches > high_join_amplification_threshold {
961                let join_key_data_types = side_update.ht.join_key_data_types();
962                let key = key.deserialize(join_key_data_types)?;
963                tracing::warn!(target: "high_join_amplification",
964                    matched_rows_len = total_matches,
965                    update_table_id = %side_update.ht.table_id(),
966                    match_table_id = %side_match.ht.table_id(),
967                    join_key = ?key,
968                    actor_id = %ctx.id,
969                    fragment_id = %ctx.fragment_id,
970                    "large rows matched for join key"
971                );
972            }
973        }
974        // NOTE(kwannoel): We don't track metrics for this last chunk.
975        if let Some(chunk) = hashjoin_chunk_builder.take() {
976            yield chunk;
977        }
978    }
979
980    /// For the probe-side row, we need to check if it has values in cache, if not, we need to
981    /// fetch the matched rows from the state table.
982    ///
983    /// Every matched build-side row being processed needs to go through the following phases:
984    /// 1. Handle join condition evaluation.
985    /// 2. Always do cache refill, if the state count is good.
986    /// 3. Handle state cleaning.
987    /// 4. Handle degree table update.
988    #[allow(clippy::too_many_arguments)]
989    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
990    async fn handle_match_rows<
991        'a,
992        const SIDE: SideTypePrimitive,
993        const JOIN_OP: JoinOpPrimitive,
994    >(
995        cached_lookup_result: CacheResult<E>,
996        row: RowRef<'a>,
997        key: &'a K,
998        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
999        side_match: &'a mut JoinSide<K, S, E>,
1000        side_update: &'a mut JoinSide<K, S, E>,
1001        useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1002        cond: &'a mut Option<NonStrictExpression>,
1003        append_only_optimize: bool,
1004        entry_state_max_rows: usize,
1005    ) {
1006        let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1007        let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1008        let mut entry_state_count = 0;
1009
1010        let mut degree = 0;
1011        let mut append_only_matched_row = None;
1012        let mut matched_rows_to_clean = vec![];
1013
1014        macro_rules! match_row {
1015            (
1016                $match_order_key_indices:expr,
1017                $degree_table:expr,
1018                $matched_row:expr,
1019                $matched_row_ref:expr,
1020                $from_cache:literal,
1021                $map_output:expr,
1022            ) => {
1023                Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1024                    row,
1025                    $matched_row,
1026                    $matched_row_ref,
1027                    hashjoin_chunk_builder,
1028                    $match_order_key_indices,
1029                    $degree_table,
1030                    side_update.start_pos,
1031                    side_match.start_pos,
1032                    cond,
1033                    &mut degree,
1034                    useful_state_clean_columns,
1035                    append_only_optimize,
1036                    &mut append_only_matched_row,
1037                    &mut matched_rows_to_clean,
1038                    $map_output,
1039                )
1040            };
1041        }
1042
1043        let entry_state = match cached_lookup_result {
1044            CacheResult::NeverMatch => {
1045                let op = match JOIN_OP {
1046                    JoinOp::Insert => Op::Insert,
1047                    JoinOp::Delete => Op::Delete,
1048                };
1049                if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1050                    yield chunk;
1051                }
1052                return Ok(());
1053            }
1054            CacheResult::Hit(mut cached_rows) => {
1055                let (match_order_key_indices, match_degree_state) =
1056                    side_match.ht.get_degree_state_mut_ref();
1057                // Handle cached rows which match the probe-side row.
1058                for (matched_row_ref, matched_row) in
1059                    cached_rows.values_mut(&side_match.all_data_types)
1060                {
1061                    let matched_row = matched_row?;
1062                    if let Some(chunk) = match_row!(
1063                        match_order_key_indices,
1064                        match_degree_state,
1065                        matched_row,
1066                        Some(matched_row_ref),
1067                        true,
1068                        Either::Left,
1069                    )
1070                    .await
1071                    {
1072                        yield chunk;
1073                    }
1074                }
1075
1076                cached_rows
1077            }
1078            CacheResult::Miss => {
1079                // Handle rows which are not in cache.
1080                let (matched_rows, match_order_key_indices, degree_table) = side_match
1081                    .ht
1082                    .fetch_matched_rows_and_get_degree_table_ref(key)
1083                    .await?;
1084
1085                #[for_await]
1086                for matched_row in matched_rows {
1087                    let (encoded_pk, matched_row) = matched_row?;
1088
1089                    let mut matched_row_ref = None;
1090
1091                    // cache refill
1092                    if entry_state_count <= entry_state_max_rows {
1093                        let row_ref = entry_state
1094                            .insert(encoded_pk, E::encode(&matched_row), None) // TODO(kwannoel): handle ineq key for asof join.
1095                            .with_context(|| format!("row: {}", row.display(),))?;
1096                        matched_row_ref = Some(row_ref);
1097                        entry_state_count += 1;
1098                    }
1099                    if let Some(chunk) = match_row!(
1100                        match_order_key_indices,
1101                        degree_table,
1102                        matched_row,
1103                        matched_row_ref,
1104                        false,
1105                        Either::Right,
1106                    )
1107                    .await
1108                    {
1109                        yield chunk;
1110                    }
1111                }
1112                Box::new(entry_state)
1113            }
1114        };
1115
1116        // forward rows depending on join types
1117        let op = match JOIN_OP {
1118            JoinOp::Insert => Op::Insert,
1119            JoinOp::Delete => Op::Delete,
1120        };
1121        if degree == 0 {
1122            if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1123                yield chunk;
1124            }
1125        } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1126        {
1127            yield chunk;
1128        }
1129
1130        // cache refill
1131        if cache_hit || entry_state_count <= entry_state_max_rows {
1132            side_match.ht.update_state(key, entry_state);
1133        }
1134
1135        // watermark state cleaning
1136        for matched_row in matched_rows_to_clean {
1137            side_match.ht.delete_handle_degree(key, matched_row)?;
1138        }
1139
1140        // apply append_only optimization to clean matched_rows which have been persisted
1141        if append_only_optimize && let Some(row) = append_only_matched_row {
1142            assert_matches!(JOIN_OP, JoinOp::Insert);
1143            side_match.ht.delete_handle_degree(key, row)?;
1144            return Ok(());
1145        }
1146
1147        // no append_only optimization, update state table(s).
1148        match JOIN_OP {
1149            JoinOp::Insert => {
1150                side_update
1151                    .ht
1152                    .insert_handle_degree(key, JoinRow::new(row, degree))?;
1153            }
1154            JoinOp::Delete => {
1155                side_update
1156                    .ht
1157                    .delete_handle_degree(key, JoinRow::new(row, degree))?;
1158            }
1159        }
1160    }
1161
1162    #[allow(clippy::too_many_arguments)]
1163    #[inline]
1164    async fn handle_match_row<
1165        'a,
1166        R: Row,  // input row type
1167        RO: Row, // output row type
1168        const SIDE: SideTypePrimitive,
1169        const JOIN_OP: JoinOpPrimitive,
1170        const MATCHED_ROWS_FROM_CACHE: bool,
1171    >(
1172        update_row: RowRef<'a>,
1173        mut matched_row: JoinRow<R>,
1174        mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1175        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1176        match_order_key_indices: &[usize],
1177        match_degree_table: &mut Option<TableInner<S>>,
1178        side_update_start_pos: usize,
1179        side_match_start_pos: usize,
1180        cond: &Option<NonStrictExpression>,
1181        update_row_degree: &mut u64,
1182        useful_state_clean_columns: &[(usize, &'a Watermark)],
1183        append_only_optimize: bool,
1184        append_only_matched_row: &mut Option<JoinRow<RO>>,
1185        matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1186        map_output: impl Fn(R) -> RO,
1187    ) -> Option<StreamChunk> {
1188        let mut need_state_clean = false;
1189        let mut chunk_opt = None;
1190
1191        // TODO(kwannoel): Instead of evaluating this every loop,
1192        // we can call this only if there's a non-equi expression.
1193        // check join cond
1194        let join_condition_satisfied = Self::check_join_condition(
1195            update_row,
1196            side_update_start_pos,
1197            &matched_row.row,
1198            side_match_start_pos,
1199            cond,
1200        )
1201        .await;
1202
1203        if join_condition_satisfied {
1204            // update degree
1205            *update_row_degree += 1;
1206            // send matched row downstream
1207
1208            // Inserts must happen before degrees are updated,
1209            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1210            if matches!(JOIN_OP, JoinOp::Insert)
1211                && !forward_exactly_once(T, SIDE)
1212                && let Some(chunk) =
1213                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1214            {
1215                chunk_opt = Some(chunk);
1216            }
1217            // TODO(kwannoel): We can actually statically decide this, using join side + join type.
1218            if let Some(degree_table) = match_degree_table {
1219                update_degree::<S, { JOIN_OP }>(
1220                    match_order_key_indices,
1221                    degree_table,
1222                    &mut matched_row,
1223                );
1224                if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1225                    // update matched row in cache
1226                    match JOIN_OP {
1227                        JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1228                        JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1229                    }
1230                }
1231            }
1232
1233            // Deletes must happen after degree table is updated.
1234            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1235            if matches!(JOIN_OP, JoinOp::Delete)
1236                && !forward_exactly_once(T, SIDE)
1237                && let Some(chunk) =
1238                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1239            {
1240                chunk_opt = Some(chunk);
1241            }
1242        } else {
1243            // check if need state cleaning
1244            for (column_idx, watermark) in useful_state_clean_columns {
1245                if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1246                    scalar
1247                        .default_cmp(&watermark.val.as_scalar_ref_impl())
1248                        .is_lt()
1249                }) {
1250                    need_state_clean = true;
1251                    break;
1252                }
1253            }
1254        }
1255        // If the stream is append-only and the join key covers pk in both side,
1256        // then we can remove matched rows since pk is unique and will not be
1257        // inserted again
1258        if append_only_optimize {
1259            assert_matches!(JOIN_OP, JoinOp::Insert);
1260            // Since join key contains pk and pk is unique, there should be only
1261            // one row if matched.
1262            assert!(append_only_matched_row.is_none());
1263            *append_only_matched_row = Some(matched_row.map(map_output));
1264        } else if need_state_clean {
1265            // `append_only_optimize` and `need_state_clean` won't both be true.
1266            // 'else' here is only to suppress compiler error, otherwise
1267            // `matched_row` will be moved twice.
1268            matched_rows_to_clean.push(matched_row.map(map_output));
1269        }
1270
1271        chunk_opt
1272    }
1273
1274    // TODO(yuhao-su): We should find a better way to eval the expression
1275    // without concat two rows.
1276    // if there are non-equi expressions
1277    // NOTE(kwannoel): We can probably let `eval` use `impl Row` instead of `OwnedRow`.
1278    #[inline]
1279    async fn check_join_condition(
1280        row: impl Row,
1281        side_update_start_pos: usize,
1282        matched_row: impl Row,
1283        side_match_start_pos: usize,
1284        join_condition: &Option<NonStrictExpression>,
1285    ) -> bool {
1286        if let Some(join_condition) = join_condition {
1287            let new_row = Self::row_concat(
1288                row,
1289                side_update_start_pos,
1290                matched_row,
1291                side_match_start_pos,
1292            );
1293            join_condition
1294                .eval_row_infallible(&new_row)
1295                .await
1296                .map(|s| *s.as_bool())
1297                .unwrap_or(false)
1298        } else {
1299            true
1300        }
1301    }
1302}
1303
1304#[cfg(test)]
1305mod tests {
1306    use std::sync::atomic::AtomicU64;
1307
1308    use pretty_assertions::assert_eq;
1309    use risingwave_common::array::*;
1310    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1311    use risingwave_common::hash::{Key64, Key128};
1312    use risingwave_common::util::epoch::test_epoch;
1313    use risingwave_common::util::sort_util::OrderType;
1314    use risingwave_storage::memory::MemoryStateStore;
1315
1316    use super::*;
1317    use crate::common::table::test_utils::gen_pbtable;
1318    use crate::executor::MemoryEncoding;
1319    use crate::executor::test_utils::expr::build_from_pretty;
1320    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1321
1322    async fn create_in_memory_state_table(
1323        mem_state: MemoryStateStore,
1324        data_types: &[DataType],
1325        order_types: &[OrderType],
1326        pk_indices: &[usize],
1327        table_id: u32,
1328    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1329        let column_descs = data_types
1330            .iter()
1331            .enumerate()
1332            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1333            .collect_vec();
1334        let state_table = StateTable::from_table_catalog(
1335            &gen_pbtable(
1336                TableId::new(table_id),
1337                column_descs,
1338                order_types.to_vec(),
1339                pk_indices.to_vec(),
1340                0,
1341            ),
1342            mem_state.clone(),
1343            None,
1344        )
1345        .await;
1346
1347        // Create degree table
1348        let mut degree_table_column_descs = vec![];
1349        pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1350            degree_table_column_descs.push(ColumnDesc::unnamed(
1351                ColumnId::new(pk_id as i32),
1352                data_types[*idx].clone(),
1353            ))
1354        });
1355        degree_table_column_descs.push(ColumnDesc::unnamed(
1356            ColumnId::new(pk_indices.len() as i32),
1357            DataType::Int64,
1358        ));
1359        let degree_state_table = StateTable::from_table_catalog(
1360            &gen_pbtable(
1361                TableId::new(table_id + 1),
1362                degree_table_column_descs,
1363                order_types.to_vec(),
1364                pk_indices.to_vec(),
1365                0,
1366            ),
1367            mem_state,
1368            None,
1369        )
1370        .await;
1371        (state_table, degree_state_table)
1372    }
1373
1374    fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1375        build_from_pretty(
1376            condition_text
1377                .as_deref()
1378                .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1379        )
1380    }
1381
1382    async fn create_executor<const T: JoinTypePrimitive>(
1383        with_condition: bool,
1384        null_safe: bool,
1385        condition_text: Option<String>,
1386        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
1387    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1388        let schema = Schema {
1389            fields: vec![
1390                Field::unnamed(DataType::Int64), // join key
1391                Field::unnamed(DataType::Int64),
1392            ],
1393        };
1394        let (tx_l, source_l) = MockSource::channel();
1395        let source_l = source_l.into_executor(schema.clone(), vec![1]);
1396        let (tx_r, source_r) = MockSource::channel();
1397        let source_r = source_r.into_executor(schema, vec![1]);
1398        let params_l = JoinParams::new(vec![0], vec![1]);
1399        let params_r = JoinParams::new(vec![0], vec![1]);
1400        let cond = with_condition.then(|| create_cond(condition_text));
1401
1402        let mem_state = MemoryStateStore::new();
1403
1404        let (state_l, degree_state_l) = create_in_memory_state_table(
1405            mem_state.clone(),
1406            &[DataType::Int64, DataType::Int64],
1407            &[OrderType::ascending(), OrderType::ascending()],
1408            &[0, 1],
1409            0,
1410        )
1411        .await;
1412
1413        let (state_r, degree_state_r) = create_in_memory_state_table(
1414            mem_state,
1415            &[DataType::Int64, DataType::Int64],
1416            &[OrderType::ascending(), OrderType::ascending()],
1417            &[0, 1],
1418            2,
1419        )
1420        .await;
1421
1422        let schema = match T {
1423            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1424            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1425            _ => [source_l.schema().fields(), source_r.schema().fields()]
1426                .concat()
1427                .into_iter()
1428                .collect(),
1429        };
1430        let schema_len = schema.len();
1431        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1432
1433        let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1434            ActorContext::for_test(123),
1435            info,
1436            source_l,
1437            source_r,
1438            params_l,
1439            params_r,
1440            vec![null_safe],
1441            (0..schema_len).collect_vec(),
1442            cond,
1443            inequality_pairs,
1444            state_l,
1445            degree_state_l,
1446            state_r,
1447            degree_state_r,
1448            Arc::new(AtomicU64::new(0)),
1449            false,
1450            Arc::new(StreamingMetrics::unused()),
1451            1024,
1452            2048,
1453        );
1454        (tx_l, tx_r, executor.boxed().execute())
1455    }
1456
1457    async fn create_classical_executor<const T: JoinTypePrimitive>(
1458        with_condition: bool,
1459        null_safe: bool,
1460        condition_text: Option<String>,
1461    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1462        create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1463    }
1464
1465    async fn create_append_only_executor<const T: JoinTypePrimitive>(
1466        with_condition: bool,
1467    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1468        let schema = Schema {
1469            fields: vec![
1470                Field::unnamed(DataType::Int64),
1471                Field::unnamed(DataType::Int64),
1472                Field::unnamed(DataType::Int64),
1473            ],
1474        };
1475        let (tx_l, source_l) = MockSource::channel();
1476        let source_l = source_l.into_executor(schema.clone(), vec![0]);
1477        let (tx_r, source_r) = MockSource::channel();
1478        let source_r = source_r.into_executor(schema, vec![0]);
1479        let params_l = JoinParams::new(vec![0, 1], vec![]);
1480        let params_r = JoinParams::new(vec![0, 1], vec![]);
1481        let cond = with_condition.then(|| create_cond(None));
1482
1483        let mem_state = MemoryStateStore::new();
1484
1485        let (state_l, degree_state_l) = create_in_memory_state_table(
1486            mem_state.clone(),
1487            &[DataType::Int64, DataType::Int64, DataType::Int64],
1488            &[
1489                OrderType::ascending(),
1490                OrderType::ascending(),
1491                OrderType::ascending(),
1492            ],
1493            &[0, 1, 0],
1494            0,
1495        )
1496        .await;
1497
1498        let (state_r, degree_state_r) = create_in_memory_state_table(
1499            mem_state,
1500            &[DataType::Int64, DataType::Int64, DataType::Int64],
1501            &[
1502                OrderType::ascending(),
1503                OrderType::ascending(),
1504                OrderType::ascending(),
1505            ],
1506            &[0, 1, 1],
1507            1,
1508        )
1509        .await;
1510
1511        let schema = match T {
1512            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1513            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1514            _ => [source_l.schema().fields(), source_r.schema().fields()]
1515                .concat()
1516                .into_iter()
1517                .collect(),
1518        };
1519        let schema_len = schema.len();
1520        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1521
1522        let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1523            ActorContext::for_test(123),
1524            info,
1525            source_l,
1526            source_r,
1527            params_l,
1528            params_r,
1529            vec![false],
1530            (0..schema_len).collect_vec(),
1531            cond,
1532            vec![],
1533            state_l,
1534            degree_state_l,
1535            state_r,
1536            degree_state_r,
1537            Arc::new(AtomicU64::new(0)),
1538            true,
1539            Arc::new(StreamingMetrics::unused()),
1540            1024,
1541            2048,
1542        );
1543        (tx_l, tx_r, executor.boxed().execute())
1544    }
1545
1546    #[tokio::test]
1547    async fn test_interval_join() -> StreamExecutorResult<()> {
1548        let chunk_l1 = StreamChunk::from_pretty(
1549            "  I I
1550             + 1 4
1551             + 2 3
1552             + 2 5
1553             + 3 6",
1554        );
1555        let chunk_l2 = StreamChunk::from_pretty(
1556            "  I I
1557             + 3 8
1558             - 3 8",
1559        );
1560        let chunk_r1 = StreamChunk::from_pretty(
1561            "  I I
1562             + 2 6
1563             + 4 8
1564             + 6 9",
1565        );
1566        let chunk_r2 = StreamChunk::from_pretty(
1567            "  I  I
1568             + 2 3
1569             + 6 11",
1570        );
1571        let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1572            true,
1573            false,
1574            Some(String::from("(and:boolean (greater_than:boolean $1:int8 (subtract:int8 $3:int8 2:int8)) (greater_than:boolean $3:int8 (subtract:int8 $1:int8 2:int8)))")),
1575            vec![(1, 3, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)"))), (3, 1, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)")))],
1576        )
1577        .await;
1578
1579        // push the init barrier for left and right
1580        tx_l.push_barrier(test_epoch(1), false);
1581        tx_r.push_barrier(test_epoch(1), false);
1582        hash_join.next_unwrap_ready_barrier()?;
1583
1584        // push the 1st left chunk
1585        tx_l.push_chunk(chunk_l1);
1586        hash_join.next_unwrap_pending();
1587
1588        // push the init barrier for left and right
1589        tx_l.push_barrier(test_epoch(2), false);
1590        tx_r.push_barrier(test_epoch(2), false);
1591        hash_join.next_unwrap_ready_barrier()?;
1592
1593        // push the 2nd left chunk
1594        tx_l.push_chunk(chunk_l2);
1595        hash_join.next_unwrap_pending();
1596
1597        tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1598        hash_join.next_unwrap_pending();
1599
1600        tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1601        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1602        assert_eq!(
1603            output_watermark,
1604            Watermark::new(1, DataType::Int64, ScalarImpl::Int64(4))
1605        );
1606        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1607        assert_eq!(
1608            output_watermark,
1609            Watermark::new(3, DataType::Int64, ScalarImpl::Int64(6))
1610        );
1611
1612        // push the 1st right chunk
1613        tx_r.push_chunk(chunk_r1);
1614        let chunk = hash_join.next_unwrap_ready_chunk()?;
1615        // data "2 3" should have been cleaned
1616        assert_eq!(
1617            chunk,
1618            StreamChunk::from_pretty(
1619                " I I I I
1620                + 2 5 2 6"
1621            )
1622        );
1623
1624        // push the 2nd right chunk
1625        tx_r.push_chunk(chunk_r2);
1626        // pending means that state clean is successful, or the executor will yield a chunk "+ 2 3
1627        // 2 3" here.
1628        hash_join.next_unwrap_pending();
1629
1630        Ok(())
1631    }
1632
1633    #[tokio::test]
1634    async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1635        let chunk_l1 = StreamChunk::from_pretty(
1636            "  I I
1637             + 1 4
1638             + 2 5
1639             + 3 6",
1640        );
1641        let chunk_l2 = StreamChunk::from_pretty(
1642            "  I I
1643             + 3 8
1644             - 3 8",
1645        );
1646        let chunk_r1 = StreamChunk::from_pretty(
1647            "  I I
1648             + 2 7
1649             + 4 8
1650             + 6 9",
1651        );
1652        let chunk_r2 = StreamChunk::from_pretty(
1653            "  I  I
1654             + 3 10
1655             + 6 11",
1656        );
1657        let (mut tx_l, mut tx_r, mut hash_join) =
1658            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1659
1660        // push the init barrier for left and right
1661        tx_l.push_barrier(test_epoch(1), false);
1662        tx_r.push_barrier(test_epoch(1), false);
1663        hash_join.next_unwrap_ready_barrier()?;
1664
1665        // push the 1st left chunk
1666        tx_l.push_chunk(chunk_l1);
1667        hash_join.next_unwrap_pending();
1668
1669        // push the init barrier for left and right
1670        tx_l.push_barrier(test_epoch(2), false);
1671        tx_r.push_barrier(test_epoch(2), false);
1672        hash_join.next_unwrap_ready_barrier()?;
1673
1674        // push the 2nd left chunk
1675        tx_l.push_chunk(chunk_l2);
1676        hash_join.next_unwrap_pending();
1677
1678        // push the 1st right chunk
1679        tx_r.push_chunk(chunk_r1);
1680        let chunk = hash_join.next_unwrap_ready_chunk()?;
1681        assert_eq!(
1682            chunk,
1683            StreamChunk::from_pretty(
1684                " I I I I
1685                + 2 5 2 7"
1686            )
1687        );
1688
1689        // push the 2nd right chunk
1690        tx_r.push_chunk(chunk_r2);
1691        let chunk = hash_join.next_unwrap_ready_chunk()?;
1692        assert_eq!(
1693            chunk,
1694            StreamChunk::from_pretty(
1695                " I I I I
1696                + 3 6 3 10"
1697            )
1698        );
1699
1700        Ok(())
1701    }
1702
1703    #[tokio::test]
1704    async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1705        let chunk_l1 = StreamChunk::from_pretty(
1706            "  I I
1707             + 1 4
1708             + 2 5
1709             + . 6",
1710        );
1711        let chunk_l2 = StreamChunk::from_pretty(
1712            "  I I
1713             + . 8
1714             - . 8",
1715        );
1716        let chunk_r1 = StreamChunk::from_pretty(
1717            "  I I
1718             + 2 7
1719             + 4 8
1720             + 6 9",
1721        );
1722        let chunk_r2 = StreamChunk::from_pretty(
1723            "  I  I
1724             + . 10
1725             + 6 11",
1726        );
1727        let (mut tx_l, mut tx_r, mut hash_join) =
1728            create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1729
1730        // push the init barrier for left and right
1731        tx_l.push_barrier(test_epoch(1), false);
1732        tx_r.push_barrier(test_epoch(1), false);
1733        hash_join.next_unwrap_ready_barrier()?;
1734
1735        // push the 1st left chunk
1736        tx_l.push_chunk(chunk_l1);
1737        hash_join.next_unwrap_pending();
1738
1739        // push the init barrier for left and right
1740        tx_l.push_barrier(test_epoch(2), false);
1741        tx_r.push_barrier(test_epoch(2), false);
1742        hash_join.next_unwrap_ready_barrier()?;
1743
1744        // push the 2nd left chunk
1745        tx_l.push_chunk(chunk_l2);
1746        hash_join.next_unwrap_pending();
1747
1748        // push the 1st right chunk
1749        tx_r.push_chunk(chunk_r1);
1750        let chunk = hash_join.next_unwrap_ready_chunk()?;
1751        assert_eq!(
1752            chunk,
1753            StreamChunk::from_pretty(
1754                " I I I I
1755                + 2 5 2 7"
1756            )
1757        );
1758
1759        // push the 2nd right chunk
1760        tx_r.push_chunk(chunk_r2);
1761        let chunk = hash_join.next_unwrap_ready_chunk()?;
1762        assert_eq!(
1763            chunk,
1764            StreamChunk::from_pretty(
1765                " I I I I
1766                + . 6 . 10"
1767            )
1768        );
1769
1770        Ok(())
1771    }
1772
1773    #[tokio::test]
1774    async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1775        let chunk_l1 = StreamChunk::from_pretty(
1776            "  I I
1777             + 1 4
1778             + 2 5
1779             + 3 6",
1780        );
1781        let chunk_l2 = StreamChunk::from_pretty(
1782            "  I I
1783             + 3 8
1784             - 3 8",
1785        );
1786        let chunk_r1 = StreamChunk::from_pretty(
1787            "  I I
1788             + 2 7
1789             + 4 8
1790             + 6 9",
1791        );
1792        let chunk_r2 = StreamChunk::from_pretty(
1793            "  I  I
1794             + 3 10
1795             + 6 11",
1796        );
1797        let chunk_l3 = StreamChunk::from_pretty(
1798            "  I I
1799             + 6 10",
1800        );
1801        let chunk_r3 = StreamChunk::from_pretty(
1802            "  I  I
1803             - 6 11",
1804        );
1805        let chunk_r4 = StreamChunk::from_pretty(
1806            "  I  I
1807             - 6 9",
1808        );
1809        let (mut tx_l, mut tx_r, mut hash_join) =
1810            create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1811
1812        // push the init barrier for left and right
1813        tx_l.push_barrier(test_epoch(1), false);
1814        tx_r.push_barrier(test_epoch(1), false);
1815        hash_join.next_unwrap_ready_barrier()?;
1816
1817        // push the 1st left chunk
1818        tx_l.push_chunk(chunk_l1);
1819        hash_join.next_unwrap_pending();
1820
1821        // push the init barrier for left and right
1822        tx_l.push_barrier(test_epoch(2), false);
1823        tx_r.push_barrier(test_epoch(2), false);
1824        hash_join.next_unwrap_ready_barrier()?;
1825
1826        // push the 2nd left chunk
1827        tx_l.push_chunk(chunk_l2);
1828        hash_join.next_unwrap_pending();
1829
1830        // push the 1st right chunk
1831        tx_r.push_chunk(chunk_r1);
1832        let chunk = hash_join.next_unwrap_ready_chunk()?;
1833        assert_eq!(
1834            chunk,
1835            StreamChunk::from_pretty(
1836                " I I
1837                + 2 5"
1838            )
1839        );
1840
1841        // push the 2nd right chunk
1842        tx_r.push_chunk(chunk_r2);
1843        let chunk = hash_join.next_unwrap_ready_chunk()?;
1844        assert_eq!(
1845            chunk,
1846            StreamChunk::from_pretty(
1847                " I I
1848                + 3 6"
1849            )
1850        );
1851
1852        // push the 3rd left chunk (tests forward_exactly_once)
1853        tx_l.push_chunk(chunk_l3);
1854        let chunk = hash_join.next_unwrap_ready_chunk()?;
1855        assert_eq!(
1856            chunk,
1857            StreamChunk::from_pretty(
1858                " I I
1859                + 6 10"
1860            )
1861        );
1862
1863        // push the 3rd right chunk
1864        // (tests that no change if there are still matches)
1865        tx_r.push_chunk(chunk_r3);
1866        hash_join.next_unwrap_pending();
1867
1868        // push the 3rd left chunk
1869        // (tests that deletion occurs when there are no more matches)
1870        tx_r.push_chunk(chunk_r4);
1871        let chunk = hash_join.next_unwrap_ready_chunk()?;
1872        assert_eq!(
1873            chunk,
1874            StreamChunk::from_pretty(
1875                " I I
1876                - 6 10"
1877            )
1878        );
1879
1880        Ok(())
1881    }
1882
1883    #[tokio::test]
1884    async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1885        let chunk_l1 = StreamChunk::from_pretty(
1886            "  I I
1887             + 1 4
1888             + 2 5
1889             + . 6",
1890        );
1891        let chunk_l2 = StreamChunk::from_pretty(
1892            "  I I
1893             + . 8
1894             - . 8",
1895        );
1896        let chunk_r1 = StreamChunk::from_pretty(
1897            "  I I
1898             + 2 7
1899             + 4 8
1900             + 6 9",
1901        );
1902        let chunk_r2 = StreamChunk::from_pretty(
1903            "  I  I
1904             + . 10
1905             + 6 11",
1906        );
1907        let chunk_l3 = StreamChunk::from_pretty(
1908            "  I I
1909             + 6 10",
1910        );
1911        let chunk_r3 = StreamChunk::from_pretty(
1912            "  I  I
1913             - 6 11",
1914        );
1915        let chunk_r4 = StreamChunk::from_pretty(
1916            "  I  I
1917             - 6 9",
1918        );
1919        let (mut tx_l, mut tx_r, mut hash_join) =
1920            create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1921
1922        // push the init barrier for left and right
1923        tx_l.push_barrier(test_epoch(1), false);
1924        tx_r.push_barrier(test_epoch(1), false);
1925        hash_join.next_unwrap_ready_barrier()?;
1926
1927        // push the 1st left chunk
1928        tx_l.push_chunk(chunk_l1);
1929        hash_join.next_unwrap_pending();
1930
1931        // push the init barrier for left and right
1932        tx_l.push_barrier(test_epoch(2), false);
1933        tx_r.push_barrier(test_epoch(2), false);
1934        hash_join.next_unwrap_ready_barrier()?;
1935
1936        // push the 2nd left chunk
1937        tx_l.push_chunk(chunk_l2);
1938        hash_join.next_unwrap_pending();
1939
1940        // push the 1st right chunk
1941        tx_r.push_chunk(chunk_r1);
1942        let chunk = hash_join.next_unwrap_ready_chunk()?;
1943        assert_eq!(
1944            chunk,
1945            StreamChunk::from_pretty(
1946                " I I
1947                + 2 5"
1948            )
1949        );
1950
1951        // push the 2nd right chunk
1952        tx_r.push_chunk(chunk_r2);
1953        let chunk = hash_join.next_unwrap_ready_chunk()?;
1954        assert_eq!(
1955            chunk,
1956            StreamChunk::from_pretty(
1957                " I I
1958                + . 6"
1959            )
1960        );
1961
1962        // push the 3rd left chunk (tests forward_exactly_once)
1963        tx_l.push_chunk(chunk_l3);
1964        let chunk = hash_join.next_unwrap_ready_chunk()?;
1965        assert_eq!(
1966            chunk,
1967            StreamChunk::from_pretty(
1968                " I I
1969                + 6 10"
1970            )
1971        );
1972
1973        // push the 3rd right chunk
1974        // (tests that no change if there are still matches)
1975        tx_r.push_chunk(chunk_r3);
1976        hash_join.next_unwrap_pending();
1977
1978        // push the 3rd left chunk
1979        // (tests that deletion occurs when there are no more matches)
1980        tx_r.push_chunk(chunk_r4);
1981        let chunk = hash_join.next_unwrap_ready_chunk()?;
1982        assert_eq!(
1983            chunk,
1984            StreamChunk::from_pretty(
1985                " I I
1986                - 6 10"
1987            )
1988        );
1989
1990        Ok(())
1991    }
1992
1993    #[tokio::test]
1994    async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
1995        let chunk_l1 = StreamChunk::from_pretty(
1996            "  I I I
1997             + 1 4 1
1998             + 2 5 2
1999             + 3 6 3",
2000        );
2001        let chunk_l2 = StreamChunk::from_pretty(
2002            "  I I I
2003             + 4 9 4
2004             + 5 10 5",
2005        );
2006        let chunk_r1 = StreamChunk::from_pretty(
2007            "  I I I
2008             + 2 5 1
2009             + 4 9 2
2010             + 6 9 3",
2011        );
2012        let chunk_r2 = StreamChunk::from_pretty(
2013            "  I I I
2014             + 1 4 4
2015             + 3 6 5",
2016        );
2017
2018        let (mut tx_l, mut tx_r, mut hash_join) =
2019            create_append_only_executor::<{ JoinType::Inner }>(false).await;
2020
2021        // push the init barrier for left and right
2022        tx_l.push_barrier(test_epoch(1), false);
2023        tx_r.push_barrier(test_epoch(1), false);
2024        hash_join.next_unwrap_ready_barrier()?;
2025
2026        // push the 1st left chunk
2027        tx_l.push_chunk(chunk_l1);
2028        hash_join.next_unwrap_pending();
2029
2030        // push the init barrier for left and right
2031        tx_l.push_barrier(test_epoch(2), false);
2032        tx_r.push_barrier(test_epoch(2), false);
2033        hash_join.next_unwrap_ready_barrier()?;
2034
2035        // push the 2nd left chunk
2036        tx_l.push_chunk(chunk_l2);
2037        hash_join.next_unwrap_pending();
2038
2039        // push the 1st right chunk
2040        tx_r.push_chunk(chunk_r1);
2041        let chunk = hash_join.next_unwrap_ready_chunk()?;
2042        assert_eq!(
2043            chunk,
2044            StreamChunk::from_pretty(
2045                " I I I I I I
2046                + 2 5 2 2 5 1
2047                + 4 9 4 4 9 2"
2048            )
2049        );
2050
2051        // push the 2nd right chunk
2052        tx_r.push_chunk(chunk_r2);
2053        let chunk = hash_join.next_unwrap_ready_chunk()?;
2054        assert_eq!(
2055            chunk,
2056            StreamChunk::from_pretty(
2057                " I I I I I I
2058                + 1 4 1 1 4 4
2059                + 3 6 3 3 6 5"
2060            )
2061        );
2062
2063        Ok(())
2064    }
2065
2066    #[tokio::test]
2067    async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2068        let chunk_l1 = StreamChunk::from_pretty(
2069            "  I I I
2070             + 1 4 1
2071             + 2 5 2
2072             + 3 6 3",
2073        );
2074        let chunk_l2 = StreamChunk::from_pretty(
2075            "  I I I
2076             + 4 9 4
2077             + 5 10 5",
2078        );
2079        let chunk_r1 = StreamChunk::from_pretty(
2080            "  I I I
2081             + 2 5 1
2082             + 4 9 2
2083             + 6 9 3",
2084        );
2085        let chunk_r2 = StreamChunk::from_pretty(
2086            "  I I I
2087             + 1 4 4
2088             + 3 6 5",
2089        );
2090
2091        let (mut tx_l, mut tx_r, mut hash_join) =
2092            create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2093
2094        // push the init barrier for left and right
2095        tx_l.push_barrier(test_epoch(1), false);
2096        tx_r.push_barrier(test_epoch(1), false);
2097        hash_join.next_unwrap_ready_barrier()?;
2098
2099        // push the 1st left chunk
2100        tx_l.push_chunk(chunk_l1);
2101        hash_join.next_unwrap_pending();
2102
2103        // push the init barrier for left and right
2104        tx_l.push_barrier(test_epoch(2), false);
2105        tx_r.push_barrier(test_epoch(2), false);
2106        hash_join.next_unwrap_ready_barrier()?;
2107
2108        // push the 2nd left chunk
2109        tx_l.push_chunk(chunk_l2);
2110        hash_join.next_unwrap_pending();
2111
2112        // push the 1st right chunk
2113        tx_r.push_chunk(chunk_r1);
2114        let chunk = hash_join.next_unwrap_ready_chunk()?;
2115        assert_eq!(
2116            chunk,
2117            StreamChunk::from_pretty(
2118                " I I I
2119                + 2 5 2
2120                + 4 9 4"
2121            )
2122        );
2123
2124        // push the 2nd right chunk
2125        tx_r.push_chunk(chunk_r2);
2126        let chunk = hash_join.next_unwrap_ready_chunk()?;
2127        assert_eq!(
2128            chunk,
2129            StreamChunk::from_pretty(
2130                " I I I
2131                + 1 4 1
2132                + 3 6 3"
2133            )
2134        );
2135
2136        Ok(())
2137    }
2138
2139    #[tokio::test]
2140    async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2141        let chunk_l1 = StreamChunk::from_pretty(
2142            "  I I I
2143             + 1 4 1
2144             + 2 5 2
2145             + 3 6 3",
2146        );
2147        let chunk_l2 = StreamChunk::from_pretty(
2148            "  I I I
2149             + 4 9 4
2150             + 5 10 5",
2151        );
2152        let chunk_r1 = StreamChunk::from_pretty(
2153            "  I I I
2154             + 2 5 1
2155             + 4 9 2
2156             + 6 9 3",
2157        );
2158        let chunk_r2 = StreamChunk::from_pretty(
2159            "  I I I
2160             + 1 4 4
2161             + 3 6 5",
2162        );
2163
2164        let (mut tx_l, mut tx_r, mut hash_join) =
2165            create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2166
2167        // push the init barrier for left and right
2168        tx_l.push_barrier(test_epoch(1), false);
2169        tx_r.push_barrier(test_epoch(1), false);
2170        hash_join.next_unwrap_ready_barrier()?;
2171
2172        // push the 1st left chunk
2173        tx_l.push_chunk(chunk_l1);
2174        hash_join.next_unwrap_pending();
2175
2176        // push the init barrier for left and right
2177        tx_l.push_barrier(test_epoch(2), false);
2178        tx_r.push_barrier(test_epoch(2), false);
2179        hash_join.next_unwrap_ready_barrier()?;
2180
2181        // push the 2nd left chunk
2182        tx_l.push_chunk(chunk_l2);
2183        hash_join.next_unwrap_pending();
2184
2185        // push the 1st right chunk
2186        tx_r.push_chunk(chunk_r1);
2187        let chunk = hash_join.next_unwrap_ready_chunk()?;
2188        assert_eq!(
2189            chunk,
2190            StreamChunk::from_pretty(
2191                " I I I
2192                + 2 5 1
2193                + 4 9 2"
2194            )
2195        );
2196
2197        // push the 2nd right chunk
2198        tx_r.push_chunk(chunk_r2);
2199        let chunk = hash_join.next_unwrap_ready_chunk()?;
2200        assert_eq!(
2201            chunk,
2202            StreamChunk::from_pretty(
2203                " I I I
2204                + 1 4 4
2205                + 3 6 5"
2206            )
2207        );
2208
2209        Ok(())
2210    }
2211
2212    #[tokio::test]
2213    async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2214        let chunk_r1 = StreamChunk::from_pretty(
2215            "  I I
2216             + 1 4
2217             + 2 5
2218             + 3 6",
2219        );
2220        let chunk_r2 = StreamChunk::from_pretty(
2221            "  I I
2222             + 3 8
2223             - 3 8",
2224        );
2225        let chunk_l1 = StreamChunk::from_pretty(
2226            "  I I
2227             + 2 7
2228             + 4 8
2229             + 6 9",
2230        );
2231        let chunk_l2 = StreamChunk::from_pretty(
2232            "  I  I
2233             + 3 10
2234             + 6 11",
2235        );
2236        let chunk_r3 = StreamChunk::from_pretty(
2237            "  I I
2238             + 6 10",
2239        );
2240        let chunk_l3 = StreamChunk::from_pretty(
2241            "  I  I
2242             - 6 11",
2243        );
2244        let chunk_l4 = StreamChunk::from_pretty(
2245            "  I  I
2246             - 6 9",
2247        );
2248        let (mut tx_l, mut tx_r, mut hash_join) =
2249            create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2250
2251        // push the init barrier for left and right
2252        tx_l.push_barrier(test_epoch(1), false);
2253        tx_r.push_barrier(test_epoch(1), false);
2254        hash_join.next_unwrap_ready_barrier()?;
2255
2256        // push the 1st right chunk
2257        tx_r.push_chunk(chunk_r1);
2258        hash_join.next_unwrap_pending();
2259
2260        // push the init barrier for left and right
2261        tx_l.push_barrier(test_epoch(2), false);
2262        tx_r.push_barrier(test_epoch(2), false);
2263        hash_join.next_unwrap_ready_barrier()?;
2264
2265        // push the 2nd right chunk
2266        tx_r.push_chunk(chunk_r2);
2267        hash_join.next_unwrap_pending();
2268
2269        // push the 1st left chunk
2270        tx_l.push_chunk(chunk_l1);
2271        let chunk = hash_join.next_unwrap_ready_chunk()?;
2272        assert_eq!(
2273            chunk,
2274            StreamChunk::from_pretty(
2275                " I I
2276                + 2 5"
2277            )
2278        );
2279
2280        // push the 2nd left chunk
2281        tx_l.push_chunk(chunk_l2);
2282        let chunk = hash_join.next_unwrap_ready_chunk()?;
2283        assert_eq!(
2284            chunk,
2285            StreamChunk::from_pretty(
2286                " I I
2287                + 3 6"
2288            )
2289        );
2290
2291        // push the 3rd right chunk (tests forward_exactly_once)
2292        tx_r.push_chunk(chunk_r3);
2293        let chunk = hash_join.next_unwrap_ready_chunk()?;
2294        assert_eq!(
2295            chunk,
2296            StreamChunk::from_pretty(
2297                " I I
2298                + 6 10"
2299            )
2300        );
2301
2302        // push the 3rd left chunk
2303        // (tests that no change if there are still matches)
2304        tx_l.push_chunk(chunk_l3);
2305        hash_join.next_unwrap_pending();
2306
2307        // push the 3rd right chunk
2308        // (tests that deletion occurs when there are no more matches)
2309        tx_l.push_chunk(chunk_l4);
2310        let chunk = hash_join.next_unwrap_ready_chunk()?;
2311        assert_eq!(
2312            chunk,
2313            StreamChunk::from_pretty(
2314                " I I
2315                - 6 10"
2316            )
2317        );
2318
2319        Ok(())
2320    }
2321
2322    #[tokio::test]
2323    async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2324        let chunk_l1 = StreamChunk::from_pretty(
2325            "  I I
2326             + 1 4
2327             + 2 5
2328             + 3 6",
2329        );
2330        let chunk_l2 = StreamChunk::from_pretty(
2331            "  I I
2332             + 3 8
2333             - 3 8",
2334        );
2335        let chunk_r1 = StreamChunk::from_pretty(
2336            "  I I
2337             + 2 7
2338             + 4 8
2339             + 6 9",
2340        );
2341        let chunk_r2 = StreamChunk::from_pretty(
2342            "  I  I
2343             + 3 10
2344             + 6 11
2345             + 1 2
2346             + 1 3",
2347        );
2348        let chunk_l3 = StreamChunk::from_pretty(
2349            "  I I
2350             + 9 10",
2351        );
2352        let chunk_r3 = StreamChunk::from_pretty(
2353            "  I I
2354             - 1 2",
2355        );
2356        let chunk_r4 = StreamChunk::from_pretty(
2357            "  I I
2358             - 1 3",
2359        );
2360        let (mut tx_l, mut tx_r, mut hash_join) =
2361            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2362
2363        // push the init barrier for left and right
2364        tx_l.push_barrier(test_epoch(1), false);
2365        tx_r.push_barrier(test_epoch(1), false);
2366        hash_join.next_unwrap_ready_barrier()?;
2367
2368        // push the 1st left chunk
2369        tx_l.push_chunk(chunk_l1);
2370        let chunk = hash_join.next_unwrap_ready_chunk()?;
2371        assert_eq!(
2372            chunk,
2373            StreamChunk::from_pretty(
2374                " I I
2375                + 1 4
2376                + 2 5
2377                + 3 6",
2378            )
2379        );
2380
2381        // push the init barrier for left and right
2382        tx_l.push_barrier(test_epoch(2), false);
2383        tx_r.push_barrier(test_epoch(2), false);
2384        hash_join.next_unwrap_ready_barrier()?;
2385
2386        // push the 2nd left chunk
2387        tx_l.push_chunk(chunk_l2);
2388        let chunk = hash_join.next_unwrap_ready_chunk()?;
2389        assert_eq!(
2390            chunk,
2391            StreamChunk::from_pretty(
2392                "  I I
2393                 + 3 8 D
2394                 - 3 8 D",
2395            )
2396        );
2397
2398        // push the 1st right chunk
2399        tx_r.push_chunk(chunk_r1);
2400        let chunk = hash_join.next_unwrap_ready_chunk()?;
2401        assert_eq!(
2402            chunk,
2403            StreamChunk::from_pretty(
2404                " I I
2405                - 2 5"
2406            )
2407        );
2408
2409        // push the 2nd right chunk
2410        tx_r.push_chunk(chunk_r2);
2411        let chunk = hash_join.next_unwrap_ready_chunk()?;
2412        assert_eq!(
2413            chunk,
2414            StreamChunk::from_pretty(
2415                " I I
2416                - 3 6
2417                - 1 4"
2418            )
2419        );
2420
2421        // push the 3rd left chunk (tests forward_exactly_once)
2422        tx_l.push_chunk(chunk_l3);
2423        let chunk = hash_join.next_unwrap_ready_chunk()?;
2424        assert_eq!(
2425            chunk,
2426            StreamChunk::from_pretty(
2427                " I I
2428                + 9 10"
2429            )
2430        );
2431
2432        // push the 3rd right chunk
2433        // (tests that no change if there are still matches)
2434        tx_r.push_chunk(chunk_r3);
2435        hash_join.next_unwrap_pending();
2436
2437        // push the 4th right chunk
2438        // (tests that insertion occurs when there are no more matches)
2439        tx_r.push_chunk(chunk_r4);
2440        let chunk = hash_join.next_unwrap_ready_chunk()?;
2441        assert_eq!(
2442            chunk,
2443            StreamChunk::from_pretty(
2444                " I I
2445                + 1 4"
2446            )
2447        );
2448
2449        Ok(())
2450    }
2451
2452    #[tokio::test]
2453    async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2454        let chunk_r1 = StreamChunk::from_pretty(
2455            "  I I
2456             + 1 4
2457             + 2 5
2458             + 3 6",
2459        );
2460        let chunk_r2 = StreamChunk::from_pretty(
2461            "  I I
2462             + 3 8
2463             - 3 8",
2464        );
2465        let chunk_l1 = StreamChunk::from_pretty(
2466            "  I I
2467             + 2 7
2468             + 4 8
2469             + 6 9",
2470        );
2471        let chunk_l2 = StreamChunk::from_pretty(
2472            "  I  I
2473             + 3 10
2474             + 6 11
2475             + 1 2
2476             + 1 3",
2477        );
2478        let chunk_r3 = StreamChunk::from_pretty(
2479            "  I I
2480             + 9 10",
2481        );
2482        let chunk_l3 = StreamChunk::from_pretty(
2483            "  I I
2484             - 1 2",
2485        );
2486        let chunk_l4 = StreamChunk::from_pretty(
2487            "  I I
2488             - 1 3",
2489        );
2490        let (mut tx_r, mut tx_l, mut hash_join) =
2491            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2492
2493        // push the init barrier for left and right
2494        tx_r.push_barrier(test_epoch(1), false);
2495        tx_l.push_barrier(test_epoch(1), false);
2496        hash_join.next_unwrap_ready_barrier()?;
2497
2498        // push the 1st right chunk
2499        tx_r.push_chunk(chunk_r1);
2500        let chunk = hash_join.next_unwrap_ready_chunk()?;
2501        assert_eq!(
2502            chunk,
2503            StreamChunk::from_pretty(
2504                " I I
2505                + 1 4
2506                + 2 5
2507                + 3 6",
2508            )
2509        );
2510
2511        // push the init barrier for left and right
2512        tx_r.push_barrier(test_epoch(2), false);
2513        tx_l.push_barrier(test_epoch(2), false);
2514        hash_join.next_unwrap_ready_barrier()?;
2515
2516        // push the 2nd right chunk
2517        tx_r.push_chunk(chunk_r2);
2518        let chunk = hash_join.next_unwrap_ready_chunk()?;
2519        assert_eq!(
2520            chunk,
2521            StreamChunk::from_pretty(
2522                "  I I
2523                 + 3 8 D
2524                 - 3 8 D",
2525            )
2526        );
2527
2528        // push the 1st left chunk
2529        tx_l.push_chunk(chunk_l1);
2530        let chunk = hash_join.next_unwrap_ready_chunk()?;
2531        assert_eq!(
2532            chunk,
2533            StreamChunk::from_pretty(
2534                " I I
2535                - 2 5"
2536            )
2537        );
2538
2539        // push the 2nd left chunk
2540        tx_l.push_chunk(chunk_l2);
2541        let chunk = hash_join.next_unwrap_ready_chunk()?;
2542        assert_eq!(
2543            chunk,
2544            StreamChunk::from_pretty(
2545                " I I
2546                - 3 6
2547                - 1 4"
2548            )
2549        );
2550
2551        // push the 3rd right chunk (tests forward_exactly_once)
2552        tx_r.push_chunk(chunk_r3);
2553        let chunk = hash_join.next_unwrap_ready_chunk()?;
2554        assert_eq!(
2555            chunk,
2556            StreamChunk::from_pretty(
2557                " I I
2558                + 9 10"
2559            )
2560        );
2561
2562        // push the 3rd left chunk
2563        // (tests that no change if there are still matches)
2564        tx_l.push_chunk(chunk_l3);
2565        hash_join.next_unwrap_pending();
2566
2567        // push the 4th left chunk
2568        // (tests that insertion occurs when there are no more matches)
2569        tx_l.push_chunk(chunk_l4);
2570        let chunk = hash_join.next_unwrap_ready_chunk()?;
2571        assert_eq!(
2572            chunk,
2573            StreamChunk::from_pretty(
2574                " I I
2575                + 1 4"
2576            )
2577        );
2578
2579        Ok(())
2580    }
2581
2582    #[tokio::test]
2583    async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2584        let chunk_l1 = StreamChunk::from_pretty(
2585            "  I I
2586             + 1 4
2587             + 2 5
2588             + 3 6",
2589        );
2590        let chunk_l2 = StreamChunk::from_pretty(
2591            "  I I
2592             + 6 8
2593             + 3 8",
2594        );
2595        let chunk_r1 = StreamChunk::from_pretty(
2596            "  I I
2597             + 2 7
2598             + 4 8
2599             + 6 9",
2600        );
2601        let chunk_r2 = StreamChunk::from_pretty(
2602            "  I  I
2603             + 3 10
2604             + 6 11",
2605        );
2606        let (mut tx_l, mut tx_r, mut hash_join) =
2607            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2608
2609        // push the init barrier for left and right
2610        tx_l.push_barrier(test_epoch(1), false);
2611        tx_r.push_barrier(test_epoch(1), false);
2612        hash_join.next_unwrap_ready_barrier()?;
2613
2614        // push the 1st left chunk
2615        tx_l.push_chunk(chunk_l1);
2616        hash_join.next_unwrap_pending();
2617
2618        // push a barrier to left side
2619        tx_l.push_barrier(test_epoch(2), false);
2620
2621        // push the 2nd left chunk
2622        tx_l.push_chunk(chunk_l2);
2623
2624        // join the first right chunk
2625        tx_r.push_chunk(chunk_r1);
2626
2627        // Consume stream chunk
2628        let chunk = hash_join.next_unwrap_ready_chunk()?;
2629        assert_eq!(
2630            chunk,
2631            StreamChunk::from_pretty(
2632                " I I I I
2633                + 2 5 2 7"
2634            )
2635        );
2636
2637        // push a barrier to right side
2638        tx_r.push_barrier(test_epoch(2), false);
2639
2640        // get the aligned barrier here
2641        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2642        assert!(matches!(
2643            hash_join.next_unwrap_ready_barrier()?,
2644            Barrier {
2645                epoch,
2646                mutation: None,
2647                ..
2648            } if epoch == expected_epoch
2649        ));
2650
2651        // join the 2nd left chunk
2652        let chunk = hash_join.next_unwrap_ready_chunk()?;
2653        assert_eq!(
2654            chunk,
2655            StreamChunk::from_pretty(
2656                " I I I I
2657                + 6 8 6 9"
2658            )
2659        );
2660
2661        // push the 2nd right chunk
2662        tx_r.push_chunk(chunk_r2);
2663        let chunk = hash_join.next_unwrap_ready_chunk()?;
2664        assert_eq!(
2665            chunk,
2666            StreamChunk::from_pretty(
2667                " I I I I
2668                + 3 6 3 10
2669                + 3 8 3 10
2670                + 6 8 6 11"
2671            )
2672        );
2673
2674        Ok(())
2675    }
2676
2677    #[tokio::test]
2678    async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2679        let chunk_l1 = StreamChunk::from_pretty(
2680            "  I I
2681             + 1 4
2682             + 2 .
2683             + 3 .",
2684        );
2685        let chunk_l2 = StreamChunk::from_pretty(
2686            "  I I
2687             + 6 .
2688             + 3 8",
2689        );
2690        let chunk_r1 = StreamChunk::from_pretty(
2691            "  I I
2692             + 2 7
2693             + 4 8
2694             + 6 9",
2695        );
2696        let chunk_r2 = StreamChunk::from_pretty(
2697            "  I  I
2698             + 3 10
2699             + 6 11",
2700        );
2701        let (mut tx_l, mut tx_r, mut hash_join) =
2702            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2703
2704        // push the init barrier for left and right
2705        tx_l.push_barrier(test_epoch(1), false);
2706        tx_r.push_barrier(test_epoch(1), false);
2707        hash_join.next_unwrap_ready_barrier()?;
2708
2709        // push the 1st left chunk
2710        tx_l.push_chunk(chunk_l1);
2711        hash_join.next_unwrap_pending();
2712
2713        // push a barrier to left side
2714        tx_l.push_barrier(test_epoch(2), false);
2715
2716        // push the 2nd left chunk
2717        tx_l.push_chunk(chunk_l2);
2718
2719        // join the first right chunk
2720        tx_r.push_chunk(chunk_r1);
2721
2722        // Consume stream chunk
2723        let chunk = hash_join.next_unwrap_ready_chunk()?;
2724        assert_eq!(
2725            chunk,
2726            StreamChunk::from_pretty(
2727                " I I I I
2728                + 2 . 2 7"
2729            )
2730        );
2731
2732        // push a barrier to right side
2733        tx_r.push_barrier(test_epoch(2), false);
2734
2735        // get the aligned barrier here
2736        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2737        assert!(matches!(
2738            hash_join.next_unwrap_ready_barrier()?,
2739            Barrier {
2740                epoch,
2741                mutation: None,
2742                ..
2743            } if epoch == expected_epoch
2744        ));
2745
2746        // join the 2nd left chunk
2747        let chunk = hash_join.next_unwrap_ready_chunk()?;
2748        assert_eq!(
2749            chunk,
2750            StreamChunk::from_pretty(
2751                " I I I I
2752                + 6 . 6 9"
2753            )
2754        );
2755
2756        // push the 2nd right chunk
2757        tx_r.push_chunk(chunk_r2);
2758        let chunk = hash_join.next_unwrap_ready_chunk()?;
2759        assert_eq!(
2760            chunk,
2761            StreamChunk::from_pretty(
2762                " I I I I
2763                + 3 8 3 10
2764                + 3 . 3 10
2765                + 6 . 6 11"
2766            )
2767        );
2768
2769        Ok(())
2770    }
2771
2772    #[tokio::test]
2773    async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2774        let chunk_l1 = StreamChunk::from_pretty(
2775            "  I I
2776             + 1 4
2777             + 2 5
2778             + 3 6",
2779        );
2780        let chunk_l2 = StreamChunk::from_pretty(
2781            "  I I
2782             + 3 8
2783             - 3 8",
2784        );
2785        let chunk_r1 = StreamChunk::from_pretty(
2786            "  I I
2787             + 2 7
2788             + 4 8
2789             + 6 9",
2790        );
2791        let chunk_r2 = StreamChunk::from_pretty(
2792            "  I  I
2793             + 3 10
2794             + 6 11",
2795        );
2796        let (mut tx_l, mut tx_r, mut hash_join) =
2797            create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2798
2799        // push the init barrier for left and right
2800        tx_l.push_barrier(test_epoch(1), false);
2801        tx_r.push_barrier(test_epoch(1), false);
2802        hash_join.next_unwrap_ready_barrier()?;
2803
2804        // push the 1st left chunk
2805        tx_l.push_chunk(chunk_l1);
2806        let chunk = hash_join.next_unwrap_ready_chunk()?;
2807        assert_eq!(
2808            chunk,
2809            StreamChunk::from_pretty(
2810                " I I I I
2811                + 1 4 . .
2812                + 2 5 . .
2813                + 3 6 . ."
2814            )
2815        );
2816
2817        // push the 2nd left chunk
2818        tx_l.push_chunk(chunk_l2);
2819        let chunk = hash_join.next_unwrap_ready_chunk()?;
2820        assert_eq!(
2821            chunk,
2822            StreamChunk::from_pretty(
2823                " I I I I
2824                + 3 8 . . D
2825                - 3 8 . . D"
2826            )
2827        );
2828
2829        // push the 1st right chunk
2830        tx_r.push_chunk(chunk_r1);
2831        let chunk = hash_join.next_unwrap_ready_chunk()?;
2832        assert_eq!(
2833            chunk,
2834            StreamChunk::from_pretty(
2835                " I I I I
2836                - 2 5 . .
2837                + 2 5 2 7"
2838            )
2839        );
2840
2841        // push the 2nd right chunk
2842        tx_r.push_chunk(chunk_r2);
2843        let chunk = hash_join.next_unwrap_ready_chunk()?;
2844        assert_eq!(
2845            chunk,
2846            StreamChunk::from_pretty(
2847                " I I I I
2848                - 3 6 . .
2849                + 3 6 3 10"
2850            )
2851        );
2852
2853        Ok(())
2854    }
2855
2856    #[tokio::test]
2857    async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2858        let chunk_l1 = StreamChunk::from_pretty(
2859            "  I I
2860             + 1 4
2861             + 2 5
2862             + . 6",
2863        );
2864        let chunk_l2 = StreamChunk::from_pretty(
2865            "  I I
2866             + . 8
2867             - . 8",
2868        );
2869        let chunk_r1 = StreamChunk::from_pretty(
2870            "  I I
2871             + 2 7
2872             + 4 8
2873             + 6 9",
2874        );
2875        let chunk_r2 = StreamChunk::from_pretty(
2876            "  I  I
2877             + . 10
2878             + 6 11",
2879        );
2880        let (mut tx_l, mut tx_r, mut hash_join) =
2881            create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2882
2883        // push the init barrier for left and right
2884        tx_l.push_barrier(test_epoch(1), false);
2885        tx_r.push_barrier(test_epoch(1), false);
2886        hash_join.next_unwrap_ready_barrier()?;
2887
2888        // push the 1st left chunk
2889        tx_l.push_chunk(chunk_l1);
2890        let chunk = hash_join.next_unwrap_ready_chunk()?;
2891        assert_eq!(
2892            chunk,
2893            StreamChunk::from_pretty(
2894                " I I I I
2895                + 1 4 . .
2896                + 2 5 . .
2897                + . 6 . ."
2898            )
2899        );
2900
2901        // push the 2nd left chunk
2902        tx_l.push_chunk(chunk_l2);
2903        let chunk = hash_join.next_unwrap_ready_chunk()?;
2904        assert_eq!(
2905            chunk,
2906            StreamChunk::from_pretty(
2907                " I I I I
2908                + . 8 . . D
2909                - . 8 . . D"
2910            )
2911        );
2912
2913        // push the 1st right chunk
2914        tx_r.push_chunk(chunk_r1);
2915        let chunk = hash_join.next_unwrap_ready_chunk()?;
2916        assert_eq!(
2917            chunk,
2918            StreamChunk::from_pretty(
2919                " I I I I
2920                - 2 5 . .
2921                + 2 5 2 7"
2922            )
2923        );
2924
2925        // push the 2nd right chunk
2926        tx_r.push_chunk(chunk_r2);
2927        let chunk = hash_join.next_unwrap_ready_chunk()?;
2928        assert_eq!(
2929            chunk,
2930            StreamChunk::from_pretty(
2931                " I I I I
2932                - . 6 . .
2933                + . 6 . 10"
2934            )
2935        );
2936
2937        Ok(())
2938    }
2939
2940    #[tokio::test]
2941    async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2942        let chunk_l1 = StreamChunk::from_pretty(
2943            "  I I
2944             + 1 4
2945             + 2 5
2946             + 3 6",
2947        );
2948        let chunk_l2 = StreamChunk::from_pretty(
2949            "  I I
2950             + 3 8
2951             - 3 8",
2952        );
2953        let chunk_r1 = StreamChunk::from_pretty(
2954            "  I I
2955             + 2 7
2956             + 4 8
2957             + 6 9",
2958        );
2959        let chunk_r2 = StreamChunk::from_pretty(
2960            "  I  I
2961             + 5 10
2962             - 5 10",
2963        );
2964        let (mut tx_l, mut tx_r, mut hash_join) =
2965            create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
2966
2967        // push the init barrier for left and right
2968        tx_l.push_barrier(test_epoch(1), false);
2969        tx_r.push_barrier(test_epoch(1), false);
2970        hash_join.next_unwrap_ready_barrier()?;
2971
2972        // push the 1st left chunk
2973        tx_l.push_chunk(chunk_l1);
2974        hash_join.next_unwrap_pending();
2975
2976        // push the 2nd left chunk
2977        tx_l.push_chunk(chunk_l2);
2978        hash_join.next_unwrap_pending();
2979
2980        // push the 1st right chunk
2981        tx_r.push_chunk(chunk_r1);
2982        let chunk = hash_join.next_unwrap_ready_chunk()?;
2983        assert_eq!(
2984            chunk,
2985            StreamChunk::from_pretty(
2986                " I I I I
2987                + 2 5 2 7
2988                + . . 4 8
2989                + . . 6 9"
2990            )
2991        );
2992
2993        // push the 2nd right chunk
2994        tx_r.push_chunk(chunk_r2);
2995        let chunk = hash_join.next_unwrap_ready_chunk()?;
2996        assert_eq!(
2997            chunk,
2998            StreamChunk::from_pretty(
2999                " I I I I
3000                + . . 5 10 D
3001                - . . 5 10 D"
3002            )
3003        );
3004
3005        Ok(())
3006    }
3007
3008    #[tokio::test]
3009    async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3010        let chunk_l1 = StreamChunk::from_pretty(
3011            "  I I I
3012             + 1 4 1
3013             + 2 5 2
3014             + 3 6 3",
3015        );
3016        let chunk_l2 = StreamChunk::from_pretty(
3017            "  I I I
3018             + 4 9 4
3019             + 5 10 5",
3020        );
3021        let chunk_r1 = StreamChunk::from_pretty(
3022            "  I I I
3023             + 2 5 1
3024             + 4 9 2
3025             + 6 9 3",
3026        );
3027        let chunk_r2 = StreamChunk::from_pretty(
3028            "  I I I
3029             + 1 4 4
3030             + 3 6 5",
3031        );
3032
3033        let (mut tx_l, mut tx_r, mut hash_join) =
3034            create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3035
3036        // push the init barrier for left and right
3037        tx_l.push_barrier(test_epoch(1), false);
3038        tx_r.push_barrier(test_epoch(1), false);
3039        hash_join.next_unwrap_ready_barrier()?;
3040
3041        // push the 1st left chunk
3042        tx_l.push_chunk(chunk_l1);
3043        let chunk = hash_join.next_unwrap_ready_chunk()?;
3044        assert_eq!(
3045            chunk,
3046            StreamChunk::from_pretty(
3047                " I I I I I I
3048                + 1 4 1 . . .
3049                + 2 5 2 . . .
3050                + 3 6 3 . . ."
3051            )
3052        );
3053
3054        // push the 2nd left chunk
3055        tx_l.push_chunk(chunk_l2);
3056        let chunk = hash_join.next_unwrap_ready_chunk()?;
3057        assert_eq!(
3058            chunk,
3059            StreamChunk::from_pretty(
3060                " I I I I I I
3061                + 4 9 4 . . .
3062                + 5 10 5 . . ."
3063            )
3064        );
3065
3066        // push the 1st right chunk
3067        tx_r.push_chunk(chunk_r1);
3068        let chunk = hash_join.next_unwrap_ready_chunk()?;
3069        assert_eq!(
3070            chunk,
3071            StreamChunk::from_pretty(
3072                " I I I I I I
3073                - 2 5 2 . . .
3074                + 2 5 2 2 5 1
3075                - 4 9 4 . . .
3076                + 4 9 4 4 9 2"
3077            )
3078        );
3079
3080        // push the 2nd right chunk
3081        tx_r.push_chunk(chunk_r2);
3082        let chunk = hash_join.next_unwrap_ready_chunk()?;
3083        assert_eq!(
3084            chunk,
3085            StreamChunk::from_pretty(
3086                " I I I I I I
3087                - 1 4 1 . . .
3088                + 1 4 1 1 4 4
3089                - 3 6 3 . . .
3090                + 3 6 3 3 6 5"
3091            )
3092        );
3093
3094        Ok(())
3095    }
3096
3097    #[tokio::test]
3098    async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3099        let chunk_l1 = StreamChunk::from_pretty(
3100            "  I I I
3101             + 1 4 1
3102             + 2 5 2
3103             + 3 6 3",
3104        );
3105        let chunk_l2 = StreamChunk::from_pretty(
3106            "  I I I
3107             + 4 9 4
3108             + 5 10 5",
3109        );
3110        let chunk_r1 = StreamChunk::from_pretty(
3111            "  I I I
3112             + 2 5 1
3113             + 4 9 2
3114             + 6 9 3",
3115        );
3116        let chunk_r2 = StreamChunk::from_pretty(
3117            "  I I I
3118             + 1 4 4
3119             + 3 6 5
3120             + 7 7 6",
3121        );
3122
3123        let (mut tx_l, mut tx_r, mut hash_join) =
3124            create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3125
3126        // push the init barrier for left and right
3127        tx_l.push_barrier(test_epoch(1), false);
3128        tx_r.push_barrier(test_epoch(1), false);
3129        hash_join.next_unwrap_ready_barrier()?;
3130
3131        // push the 1st left chunk
3132        tx_l.push_chunk(chunk_l1);
3133        hash_join.next_unwrap_pending();
3134
3135        // push the 2nd left chunk
3136        tx_l.push_chunk(chunk_l2);
3137        hash_join.next_unwrap_pending();
3138
3139        // push the 1st right chunk
3140        tx_r.push_chunk(chunk_r1);
3141        let chunk = hash_join.next_unwrap_ready_chunk()?;
3142        assert_eq!(
3143            chunk,
3144            StreamChunk::from_pretty(
3145                "  I I I I I I
3146                + 2 5 2 2 5 1
3147                + 4 9 4 4 9 2
3148                + . . . 6 9 3"
3149            )
3150        );
3151
3152        // push the 2nd right chunk
3153        tx_r.push_chunk(chunk_r2);
3154        let chunk = hash_join.next_unwrap_ready_chunk()?;
3155        assert_eq!(
3156            chunk,
3157            StreamChunk::from_pretty(
3158                "  I I I I I I
3159                + 1 4 1 1 4 4
3160                + 3 6 3 3 6 5
3161                + . . . 7 7 6"
3162            )
3163        );
3164
3165        Ok(())
3166    }
3167
3168    #[tokio::test]
3169    async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3170        let chunk_l1 = StreamChunk::from_pretty(
3171            "  I I
3172             + 1 4
3173             + 2 5
3174             + 3 6",
3175        );
3176        let chunk_l2 = StreamChunk::from_pretty(
3177            "  I I
3178             + 3 8
3179             - 3 8",
3180        );
3181        let chunk_r1 = StreamChunk::from_pretty(
3182            "  I I
3183             + 2 7
3184             + 4 8
3185             + 6 9",
3186        );
3187        let chunk_r2 = StreamChunk::from_pretty(
3188            "  I  I
3189             + 5 10
3190             - 5 10",
3191        );
3192        let (mut tx_l, mut tx_r, mut hash_join) =
3193            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3194
3195        // push the init barrier for left and right
3196        tx_l.push_barrier(test_epoch(1), false);
3197        tx_r.push_barrier(test_epoch(1), false);
3198        hash_join.next_unwrap_ready_barrier()?;
3199
3200        // push the 1st left chunk
3201        tx_l.push_chunk(chunk_l1);
3202        let chunk = hash_join.next_unwrap_ready_chunk()?;
3203        assert_eq!(
3204            chunk,
3205            StreamChunk::from_pretty(
3206                " I I I I
3207                + 1 4 . .
3208                + 2 5 . .
3209                + 3 6 . ."
3210            )
3211        );
3212
3213        // push the 2nd left chunk
3214        tx_l.push_chunk(chunk_l2);
3215        let chunk = hash_join.next_unwrap_ready_chunk()?;
3216        assert_eq!(
3217            chunk,
3218            StreamChunk::from_pretty(
3219                " I I I I
3220                + 3 8 . . D
3221                - 3 8 . . D"
3222            )
3223        );
3224
3225        // push the 1st right chunk
3226        tx_r.push_chunk(chunk_r1);
3227        let chunk = hash_join.next_unwrap_ready_chunk()?;
3228        assert_eq!(
3229            chunk,
3230            StreamChunk::from_pretty(
3231                " I I I I
3232                - 2 5 . .
3233                + 2 5 2 7
3234                + . . 4 8
3235                + . . 6 9"
3236            )
3237        );
3238
3239        // push the 2nd right chunk
3240        tx_r.push_chunk(chunk_r2);
3241        let chunk = hash_join.next_unwrap_ready_chunk()?;
3242        assert_eq!(
3243            chunk,
3244            StreamChunk::from_pretty(
3245                " I I I I
3246                + . . 5 10 D
3247                - . . 5 10 D"
3248            )
3249        );
3250
3251        Ok(())
3252    }
3253
3254    #[tokio::test]
3255    async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3256        let (mut tx_l, mut tx_r, mut hash_join) =
3257            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3258
3259        // push the init barrier for left and right
3260        tx_l.push_barrier(test_epoch(1), false);
3261        tx_r.push_barrier(test_epoch(1), false);
3262        hash_join.next_unwrap_ready_barrier()?;
3263
3264        tx_l.push_chunk(StreamChunk::from_pretty(
3265            "  I I
3266             + 1 1
3267            ",
3268        ));
3269        let chunk = hash_join.next_unwrap_ready_chunk()?;
3270        assert_eq!(
3271            chunk,
3272            StreamChunk::from_pretty(
3273                " I I I I
3274                + 1 1 . ."
3275            )
3276        );
3277
3278        tx_r.push_chunk(StreamChunk::from_pretty(
3279            "  I I
3280             + 1 1
3281            ",
3282        ));
3283        let chunk = hash_join.next_unwrap_ready_chunk()?;
3284
3285        assert_eq!(
3286            chunk,
3287            StreamChunk::from_pretty(
3288                " I I I I
3289                - 1 1 . .
3290                + 1 1 1 1"
3291            )
3292        );
3293
3294        tx_l.push_chunk(StreamChunk::from_pretty(
3295            "   I I
3296              - 1 1
3297              + 1 2
3298            ",
3299        ));
3300        let chunk = hash_join.next_unwrap_ready_chunk()?;
3301        let chunk = chunk.compact_vis();
3302        assert_eq!(
3303            chunk,
3304            StreamChunk::from_pretty(
3305                " I I I I
3306                - 1 1 1 1
3307                + 1 2 1 1
3308                "
3309            )
3310        );
3311
3312        Ok(())
3313    }
3314
3315    #[tokio::test]
3316    async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3317    {
3318        let chunk_l1 = StreamChunk::from_pretty(
3319            "  I I
3320             + 1 4
3321             + 2 5
3322             + 3 6
3323             + 3 7",
3324        );
3325        let chunk_l2 = StreamChunk::from_pretty(
3326            "  I I
3327             + 3 8
3328             - 3 8
3329             - 1 4", // delete row to cause an empty JoinHashEntry
3330        );
3331        let chunk_r1 = StreamChunk::from_pretty(
3332            "  I I
3333             + 2 6
3334             + 4 8
3335             + 3 4",
3336        );
3337        let chunk_r2 = StreamChunk::from_pretty(
3338            "  I  I
3339             + 5 10
3340             - 5 10
3341             + 1 2",
3342        );
3343        let (mut tx_l, mut tx_r, mut hash_join) =
3344            create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3345
3346        // push the init barrier for left and right
3347        tx_l.push_barrier(test_epoch(1), false);
3348        tx_r.push_barrier(test_epoch(1), false);
3349        hash_join.next_unwrap_ready_barrier()?;
3350
3351        // push the 1st left chunk
3352        tx_l.push_chunk(chunk_l1);
3353        let chunk = hash_join.next_unwrap_ready_chunk()?;
3354        assert_eq!(
3355            chunk,
3356            StreamChunk::from_pretty(
3357                " I I I I
3358                + 1 4 . .
3359                + 2 5 . .
3360                + 3 6 . .
3361                + 3 7 . ."
3362            )
3363        );
3364
3365        // push the 2nd left chunk
3366        tx_l.push_chunk(chunk_l2);
3367        let chunk = hash_join.next_unwrap_ready_chunk()?;
3368        assert_eq!(
3369            chunk,
3370            StreamChunk::from_pretty(
3371                " I I I I
3372                + 3 8 . . D
3373                - 3 8 . . D
3374                - 1 4 . ."
3375            )
3376        );
3377
3378        // push the 1st right chunk
3379        tx_r.push_chunk(chunk_r1);
3380        let chunk = hash_join.next_unwrap_ready_chunk()?;
3381        assert_eq!(
3382            chunk,
3383            StreamChunk::from_pretty(
3384                " I I I I
3385                - 2 5 . .
3386                + 2 5 2 6
3387                + . . 4 8
3388                + . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once
3389                            * despite matching on eq join on 2
3390                            * entries */
3391            )
3392        );
3393
3394        // push the 2nd right chunk
3395        tx_r.push_chunk(chunk_r2);
3396        let chunk = hash_join.next_unwrap_ready_chunk()?;
3397        assert_eq!(
3398            chunk,
3399            StreamChunk::from_pretty(
3400                " I I I I
3401                + . . 5 10 D
3402                - . . 5 10 D
3403                + . . 1 2" /* regression test (#2420): 1 2 forwarded even if matches on an empty
3404                            * join entry */
3405            )
3406        );
3407
3408        Ok(())
3409    }
3410
3411    #[tokio::test]
3412    async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3413        let chunk_l1 = StreamChunk::from_pretty(
3414            "  I I
3415             + 1 4
3416             + 2 10
3417             + 3 6",
3418        );
3419        let chunk_l2 = StreamChunk::from_pretty(
3420            "  I I
3421             + 3 8
3422             - 3 8",
3423        );
3424        let chunk_r1 = StreamChunk::from_pretty(
3425            "  I I
3426             + 2 7
3427             + 4 8
3428             + 6 9",
3429        );
3430        let chunk_r2 = StreamChunk::from_pretty(
3431            "  I  I
3432             + 3 10
3433             + 6 11",
3434        );
3435        let (mut tx_l, mut tx_r, mut hash_join) =
3436            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3437
3438        // push the init barrier for left and right
3439        tx_l.push_barrier(test_epoch(1), false);
3440        tx_r.push_barrier(test_epoch(1), false);
3441        hash_join.next_unwrap_ready_barrier()?;
3442
3443        // push the 1st left chunk
3444        tx_l.push_chunk(chunk_l1);
3445        hash_join.next_unwrap_pending();
3446
3447        // push the 2nd left chunk
3448        tx_l.push_chunk(chunk_l2);
3449        hash_join.next_unwrap_pending();
3450
3451        // push the 1st right chunk
3452        tx_r.push_chunk(chunk_r1);
3453        hash_join.next_unwrap_pending();
3454
3455        // push the 2nd right chunk
3456        tx_r.push_chunk(chunk_r2);
3457        let chunk = hash_join.next_unwrap_ready_chunk()?;
3458        assert_eq!(
3459            chunk,
3460            StreamChunk::from_pretty(
3461                " I I I I
3462                + 3 6 3 10"
3463            )
3464        );
3465
3466        Ok(())
3467    }
3468
3469    #[tokio::test]
3470    async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3471        let (mut tx_l, mut tx_r, mut hash_join) =
3472            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3473
3474        // push the init barrier for left and right
3475        tx_l.push_barrier(test_epoch(1), false);
3476        tx_r.push_barrier(test_epoch(1), false);
3477        hash_join.next_unwrap_ready_barrier()?;
3478
3479        tx_l.push_int64_watermark(0, 100);
3480
3481        tx_l.push_int64_watermark(0, 200);
3482
3483        tx_l.push_barrier(test_epoch(2), false);
3484        tx_r.push_barrier(test_epoch(2), false);
3485        hash_join.next_unwrap_ready_barrier()?;
3486
3487        tx_r.push_int64_watermark(0, 50);
3488
3489        let w1 = hash_join.next().await.unwrap().unwrap();
3490        let w1 = w1.as_watermark().unwrap();
3491
3492        let w2 = hash_join.next().await.unwrap().unwrap();
3493        let w2 = w2.as_watermark().unwrap();
3494
3495        tx_r.push_int64_watermark(0, 100);
3496
3497        let w3 = hash_join.next().await.unwrap().unwrap();
3498        let w3 = w3.as_watermark().unwrap();
3499
3500        let w4 = hash_join.next().await.unwrap().unwrap();
3501        let w4 = w4.as_watermark().unwrap();
3502
3503        assert_eq!(
3504            w1,
3505            &Watermark {
3506                col_idx: 2,
3507                data_type: DataType::Int64,
3508                val: ScalarImpl::Int64(50)
3509            }
3510        );
3511
3512        assert_eq!(
3513            w2,
3514            &Watermark {
3515                col_idx: 0,
3516                data_type: DataType::Int64,
3517                val: ScalarImpl::Int64(50)
3518            }
3519        );
3520
3521        assert_eq!(
3522            w3,
3523            &Watermark {
3524                col_idx: 2,
3525                data_type: DataType::Int64,
3526                val: ScalarImpl::Int64(100)
3527            }
3528        );
3529
3530        assert_eq!(
3531            w4,
3532            &Watermark {
3533                col_idx: 0,
3534                data_type: DataType::Int64,
3535                val: ScalarImpl::Int64(100)
3536            }
3537        );
3538
3539        Ok(())
3540    }
3541}