risingwave_stream/executor/
hash_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::assert_matches::assert_matches;
15use std::collections::{BTreeMap, HashSet};
16use std::marker::PhantomData;
17use std::time::Duration;
18
19use anyhow::Context;
20use either::Either;
21use itertools::Itertools;
22use multimap::MultiMap;
23use risingwave_common::array::Op;
24use risingwave_common::hash::{HashKey, NullBitmap};
25use risingwave_common::row::RowExt;
26use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
27use risingwave_common::util::epoch::EpochPair;
28use risingwave_common::util::iter_util::ZipEqDebug;
29use risingwave_expr::ExprError;
30use risingwave_expr::expr::NonStrictExpression;
31use tokio::time::Instant;
32
33use self::builder::JoinChunkBuilder;
34use super::barrier_align::*;
35use super::join::hash_join::*;
36use super::join::row::{JoinEncoding, JoinRow};
37use super::join::*;
38use super::watermark::*;
39use crate::executor::CachedJoinRow;
40use crate::executor::join::builder::JoinStreamChunkBuilder;
41use crate::executor::join::hash_join::CacheResult;
42use crate::executor::prelude::*;
43
44/// Evict the cache every n rows.
45const EVICT_EVERY_N_ROWS: u32 = 16;
46
47fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
48    HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
49}
50
51pub struct JoinParams {
52    /// Indices of the join keys
53    pub join_key_indices: Vec<usize>,
54    /// Indices of the input pk after dedup
55    pub deduped_pk_indices: Vec<usize>,
56}
57
58impl JoinParams {
59    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
60        Self {
61            join_key_indices,
62            deduped_pk_indices,
63        }
64    }
65}
66
67struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
68    /// Store all data from a one side stream
69    ht: JoinHashMap<K, S, E>,
70    /// Indices of the join key columns
71    join_key_indices: Vec<usize>,
72    /// The data type of all columns without degree.
73    all_data_types: Vec<DataType>,
74    /// The start position for the side in output new columns
75    start_pos: usize,
76    /// The mapping from input indices of a side to output columns.
77    i2o_mapping: Vec<(usize, usize)>,
78    i2o_mapping_indexed: MultiMap<usize, usize>,
79    /// The first field of the ith element indicates that when a watermark at the ith column of
80    /// this side comes, what band join conditions should be updated in order to possibly
81    /// generate a new watermark at that column or the corresponding column in the counterpart
82    /// join side.
83    ///
84    /// The second field indicates that whether the column is required less than the
85    /// the corresponding column in the counterpart join side in the band join condition.
86    input2inequality_index: Vec<Vec<(usize, bool)>>,
87    /// Some fields which are required non null to match due to inequalities.
88    non_null_fields: Vec<usize>,
89    /// (i, j) in this `Vec` means that state data in this join side can be cleaned if the value of
90    /// its ith column is less than the synthetic watermark of the jth band join condition.
91    state_clean_columns: Vec<(usize, usize)>,
92    /// Whether degree table is needed for this side.
93    need_degree_table: bool,
94    _marker: std::marker::PhantomData<E>,
95}
96
97impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        f.debug_struct("JoinSide")
100            .field("join_key_indices", &self.join_key_indices)
101            .field("col_types", &self.all_data_types)
102            .field("start_pos", &self.start_pos)
103            .field("i2o_mapping", &self.i2o_mapping)
104            .field("need_degree_table", &self.need_degree_table)
105            .finish()
106    }
107}
108
109impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
110    // WARNING: Please do not call this until we implement it.
111    fn is_dirty(&self) -> bool {
112        unimplemented!()
113    }
114
115    #[expect(dead_code)]
116    fn clear_cache(&mut self) {
117        assert!(
118            !self.is_dirty(),
119            "cannot clear cache while states of hash join are dirty"
120        );
121
122        // TODO: not working with rearranged chain
123        // self.ht.clear();
124    }
125
126    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
127        self.ht.init(epoch).await
128    }
129}
130
131/// `HashJoinExecutor` takes two input streams and runs equal hash join on them.
132/// The output columns are the concatenation of left and right columns.
133pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
134{
135    ctx: ActorContextRef,
136    info: ExecutorInfo,
137
138    /// Left input executor
139    input_l: Option<Executor>,
140    /// Right input executor
141    input_r: Option<Executor>,
142    /// The data types of the formed new columns
143    actual_output_data_types: Vec<DataType>,
144    /// The parameters of the left join executor
145    side_l: JoinSide<K, S, E>,
146    /// The parameters of the right join executor
147    side_r: JoinSide<K, S, E>,
148    /// Optional non-equi join conditions
149    cond: Option<NonStrictExpression>,
150    /// Column indices of watermark output and offset expression of each inequality, respectively.
151    inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
152    /// The output watermark of each inequality condition and its value is the minimum of the
153    /// calculation result of both side. It will be used to generate watermark into downstream
154    /// and do state cleaning if `clean_state` field of that inequality is `true`.
155    inequality_watermarks: Vec<Option<Watermark>>,
156
157    /// Whether the logic can be optimized for append-only stream
158    append_only_optimize: bool,
159
160    metrics: Arc<StreamingMetrics>,
161    /// The maximum size of the chunk produced by executor at a time
162    chunk_size: usize,
163    /// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
164    cnt_rows_received: u32,
165
166    /// watermark column index -> `BufferedWatermarks`
167    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
168
169    /// When to alert high join amplification
170    high_join_amplification_threshold: usize,
171
172    /// Max number of rows that will be cached in the entry state.
173    entry_state_max_rows: usize,
174}
175
176impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
177    for HashJoinExecutor<K, S, T, E>
178{
179    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180        f.debug_struct("HashJoinExecutor")
181            .field("join_type", &T)
182            .field("input_left", &self.input_l.as_ref().unwrap().identity())
183            .field("input_right", &self.input_r.as_ref().unwrap().identity())
184            .field("side_l", &self.side_l)
185            .field("side_r", &self.side_r)
186            .field("stream_key", &self.info.stream_key)
187            .field("schema", &self.info.schema)
188            .field("actual_output_data_types", &self.actual_output_data_types)
189            .finish()
190    }
191}
192
193impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
194    for HashJoinExecutor<K, S, T, E>
195{
196    fn execute(self: Box<Self>) -> BoxedMessageStream {
197        self.into_stream().boxed()
198    }
199}
200
201struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
202    ctx: &'a ActorContextRef,
203    side_l: &'a mut JoinSide<K, S, E>,
204    side_r: &'a mut JoinSide<K, S, E>,
205    actual_output_data_types: &'a [DataType],
206    cond: &'a mut Option<NonStrictExpression>,
207    inequality_watermarks: &'a [Option<Watermark>],
208    chunk: StreamChunk,
209    append_only_optimize: bool,
210    chunk_size: usize,
211    cnt_rows_received: &'a mut u32,
212    high_join_amplification_threshold: usize,
213    entry_state_max_rows: usize,
214}
215
216impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
217    HashJoinExecutor<K, S, T, E>
218{
219    #[allow(clippy::too_many_arguments)]
220    pub fn new(
221        ctx: ActorContextRef,
222        info: ExecutorInfo,
223        input_l: Executor,
224        input_r: Executor,
225        params_l: JoinParams,
226        params_r: JoinParams,
227        null_safe: Vec<bool>,
228        output_indices: Vec<usize>,
229        cond: Option<NonStrictExpression>,
230        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
231        state_table_l: StateTable<S>,
232        degree_state_table_l: StateTable<S>,
233        state_table_r: StateTable<S>,
234        degree_state_table_r: StateTable<S>,
235        watermark_epoch: AtomicU64Ref,
236        is_append_only: bool,
237        metrics: Arc<StreamingMetrics>,
238        chunk_size: usize,
239        high_join_amplification_threshold: usize,
240    ) -> Self {
241        Self::new_with_cache_size(
242            ctx,
243            info,
244            input_l,
245            input_r,
246            params_l,
247            params_r,
248            null_safe,
249            output_indices,
250            cond,
251            inequality_pairs,
252            state_table_l,
253            degree_state_table_l,
254            state_table_r,
255            degree_state_table_r,
256            watermark_epoch,
257            is_append_only,
258            metrics,
259            chunk_size,
260            high_join_amplification_threshold,
261            None,
262        )
263    }
264
265    #[allow(clippy::too_many_arguments)]
266    pub fn new_with_cache_size(
267        ctx: ActorContextRef,
268        info: ExecutorInfo,
269        input_l: Executor,
270        input_r: Executor,
271        params_l: JoinParams,
272        params_r: JoinParams,
273        null_safe: Vec<bool>,
274        output_indices: Vec<usize>,
275        cond: Option<NonStrictExpression>,
276        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
277        state_table_l: StateTable<S>,
278        degree_state_table_l: StateTable<S>,
279        state_table_r: StateTable<S>,
280        degree_state_table_r: StateTable<S>,
281        watermark_epoch: AtomicU64Ref,
282        is_append_only: bool,
283        metrics: Arc<StreamingMetrics>,
284        chunk_size: usize,
285        high_join_amplification_threshold: usize,
286        entry_state_max_rows: Option<usize>,
287    ) -> Self {
288        let entry_state_max_rows = match entry_state_max_rows {
289            None => ctx.config.developer.hash_join_entry_state_max_rows,
290            Some(entry_state_max_rows) => entry_state_max_rows,
291        };
292        let side_l_column_n = input_l.schema().len();
293
294        let schema_fields = match T {
295            JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
296            JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
297            _ => [
298                input_l.schema().fields.clone(),
299                input_r.schema().fields.clone(),
300            ]
301            .concat(),
302        };
303
304        let original_output_data_types = schema_fields
305            .iter()
306            .map(|field| field.data_type())
307            .collect_vec();
308        let actual_output_data_types = output_indices
309            .iter()
310            .map(|&idx| original_output_data_types[idx].clone())
311            .collect_vec();
312
313        // Data types of of hash join state.
314        let state_all_data_types_l = input_l.schema().data_types();
315        let state_all_data_types_r = input_r.schema().data_types();
316
317        let state_pk_indices_l = input_l.stream_key().to_vec();
318        let state_pk_indices_r = input_r.stream_key().to_vec();
319
320        let state_join_key_indices_l = params_l.join_key_indices;
321        let state_join_key_indices_r = params_r.join_key_indices;
322
323        let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
324        let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
325
326        let degree_pk_indices_l = (state_join_key_indices_l.len()
327            ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
328            .collect_vec();
329        let degree_pk_indices_r = (state_join_key_indices_r.len()
330            ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
331            .collect_vec();
332
333        // If pk is contained in join key.
334        let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
335        let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
336
337        // check whether join key contains pk in both side
338        let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
339
340        let join_key_data_types_l = state_join_key_indices_l
341            .iter()
342            .map(|idx| state_all_data_types_l[*idx].clone())
343            .collect_vec();
344
345        let join_key_data_types_r = state_join_key_indices_r
346            .iter()
347            .map(|idx| state_all_data_types_r[*idx].clone())
348            .collect_vec();
349
350        assert_eq!(join_key_data_types_l, join_key_data_types_r);
351
352        let null_matched = K::Bitmap::from_bool_vec(null_safe);
353
354        let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
355        let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
356
357        let degree_state_l = need_degree_table_l.then(|| {
358            TableInner::new(
359                degree_pk_indices_l,
360                degree_join_key_indices_l,
361                degree_state_table_l,
362            )
363        });
364        let degree_state_r = need_degree_table_r.then(|| {
365            TableInner::new(
366                degree_pk_indices_r,
367                degree_join_key_indices_r,
368                degree_state_table_r,
369            )
370        });
371
372        let (left_to_output, right_to_output) = {
373            let (left_len, right_len) = if is_left_semi_or_anti(T) {
374                (state_all_data_types_l.len(), 0usize)
375            } else if is_right_semi_or_anti(T) {
376                (0usize, state_all_data_types_r.len())
377            } else {
378                (state_all_data_types_l.len(), state_all_data_types_r.len())
379            };
380            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
381        };
382
383        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
384        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
385
386        let left_input_len = input_l.schema().len();
387        let right_input_len = input_r.schema().len();
388        let mut l2inequality_index = vec![vec![]; left_input_len];
389        let mut r2inequality_index = vec![vec![]; right_input_len];
390        let mut l_state_clean_columns = vec![];
391        let mut r_state_clean_columns = vec![];
392        let inequality_pairs = inequality_pairs
393            .into_iter()
394            .enumerate()
395            .map(
396                |(
397                    index,
398                    (key_required_larger, key_required_smaller, clean_state, delta_expression),
399                )| {
400                    let output_indices = if key_required_larger < key_required_smaller {
401                        if clean_state {
402                            l_state_clean_columns.push((key_required_larger, index));
403                        }
404                        l2inequality_index[key_required_larger].push((index, false));
405                        r2inequality_index[key_required_smaller - left_input_len]
406                            .push((index, true));
407                        l2o_indexed
408                            .get_vec(&key_required_larger)
409                            .cloned()
410                            .unwrap_or_default()
411                    } else {
412                        if clean_state {
413                            r_state_clean_columns
414                                .push((key_required_larger - left_input_len, index));
415                        }
416                        l2inequality_index[key_required_smaller].push((index, true));
417                        r2inequality_index[key_required_larger - left_input_len]
418                            .push((index, false));
419                        r2o_indexed
420                            .get_vec(&(key_required_larger - left_input_len))
421                            .cloned()
422                            .unwrap_or_default()
423                    };
424                    (output_indices, delta_expression)
425                },
426            )
427            .collect_vec();
428
429        let mut l_non_null_fields = l2inequality_index
430            .iter()
431            .positions(|inequalities| !inequalities.is_empty())
432            .collect_vec();
433        let mut r_non_null_fields = r2inequality_index
434            .iter()
435            .positions(|inequalities| !inequalities.is_empty())
436            .collect_vec();
437
438        if append_only_optimize {
439            l_state_clean_columns.clear();
440            r_state_clean_columns.clear();
441            l_non_null_fields.clear();
442            r_non_null_fields.clear();
443        }
444
445        let inequality_watermarks = vec![None; inequality_pairs.len()];
446        let watermark_buffers = BTreeMap::new();
447
448        Self {
449            ctx: ctx.clone(),
450            info,
451            input_l: Some(input_l),
452            input_r: Some(input_r),
453            actual_output_data_types,
454            side_l: JoinSide {
455                ht: JoinHashMap::new(
456                    watermark_epoch.clone(),
457                    join_key_data_types_l,
458                    state_join_key_indices_l.clone(),
459                    state_all_data_types_l.clone(),
460                    state_table_l,
461                    params_l.deduped_pk_indices,
462                    degree_state_l,
463                    null_matched.clone(),
464                    pk_contained_in_jk_l,
465                    None,
466                    metrics.clone(),
467                    ctx.id,
468                    ctx.fragment_id,
469                    "left",
470                ),
471                join_key_indices: state_join_key_indices_l,
472                all_data_types: state_all_data_types_l,
473                i2o_mapping: left_to_output,
474                i2o_mapping_indexed: l2o_indexed,
475                input2inequality_index: l2inequality_index,
476                non_null_fields: l_non_null_fields,
477                state_clean_columns: l_state_clean_columns,
478                start_pos: 0,
479                need_degree_table: need_degree_table_l,
480                _marker: PhantomData,
481            },
482            side_r: JoinSide {
483                ht: JoinHashMap::new(
484                    watermark_epoch,
485                    join_key_data_types_r,
486                    state_join_key_indices_r.clone(),
487                    state_all_data_types_r.clone(),
488                    state_table_r,
489                    params_r.deduped_pk_indices,
490                    degree_state_r,
491                    null_matched,
492                    pk_contained_in_jk_r,
493                    None,
494                    metrics.clone(),
495                    ctx.id,
496                    ctx.fragment_id,
497                    "right",
498                ),
499                join_key_indices: state_join_key_indices_r,
500                all_data_types: state_all_data_types_r,
501                start_pos: side_l_column_n,
502                i2o_mapping: right_to_output,
503                i2o_mapping_indexed: r2o_indexed,
504                input2inequality_index: r2inequality_index,
505                non_null_fields: r_non_null_fields,
506                state_clean_columns: r_state_clean_columns,
507                need_degree_table: need_degree_table_r,
508                _marker: PhantomData,
509            },
510            cond,
511            inequality_pairs,
512            inequality_watermarks,
513            append_only_optimize,
514            metrics,
515            chunk_size,
516            cnt_rows_received: 0,
517            watermark_buffers,
518            high_join_amplification_threshold,
519            entry_state_max_rows,
520        }
521    }
522
523    #[try_stream(ok = Message, error = StreamExecutorError)]
524    async fn into_stream(mut self) {
525        let input_l = self.input_l.take().unwrap();
526        let input_r = self.input_r.take().unwrap();
527        let aligned_stream = barrier_align(
528            input_l.execute(),
529            input_r.execute(),
530            self.ctx.id,
531            self.ctx.fragment_id,
532            self.metrics.clone(),
533            "Join",
534        );
535        pin_mut!(aligned_stream);
536
537        let actor_id = self.ctx.id;
538
539        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
540        let first_epoch = barrier.epoch;
541        // The first barrier message should be propagated.
542        yield Message::Barrier(barrier);
543        self.side_l.init(first_epoch).await?;
544        self.side_r.init(first_epoch).await?;
545
546        let actor_id_str = self.ctx.id.to_string();
547        let fragment_id_str = self.ctx.fragment_id.to_string();
548
549        // initialized some metrics
550        let join_actor_input_waiting_duration_ns = self
551            .metrics
552            .join_actor_input_waiting_duration_ns
553            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
554        let left_join_match_duration_ns = self
555            .metrics
556            .join_match_duration_ns
557            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
558        let right_join_match_duration_ns = self
559            .metrics
560            .join_match_duration_ns
561            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
562
563        let barrier_join_match_duration_ns = self
564            .metrics
565            .join_match_duration_ns
566            .with_guarded_label_values(&[
567                actor_id_str.as_str(),
568                fragment_id_str.as_str(),
569                "barrier",
570            ]);
571
572        let left_join_cached_entry_count = self
573            .metrics
574            .join_cached_entry_count
575            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
576
577        let right_join_cached_entry_count = self
578            .metrics
579            .join_cached_entry_count
580            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
581
582        let mut start_time = Instant::now();
583
584        while let Some(msg) = aligned_stream
585            .next()
586            .instrument_await("hash_join_barrier_align")
587            .await
588        {
589            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
590            match msg? {
591                AlignedMessage::WatermarkLeft(watermark) => {
592                    for watermark_to_emit in
593                        self.handle_watermark(SideType::Left, watermark).await?
594                    {
595                        yield Message::Watermark(watermark_to_emit);
596                    }
597                }
598                AlignedMessage::WatermarkRight(watermark) => {
599                    for watermark_to_emit in
600                        self.handle_watermark(SideType::Right, watermark).await?
601                    {
602                        yield Message::Watermark(watermark_to_emit);
603                    }
604                }
605                AlignedMessage::Left(chunk) => {
606                    let mut left_time = Duration::from_nanos(0);
607                    let mut left_start_time = Instant::now();
608                    #[for_await]
609                    for chunk in Self::eq_join_left(EqJoinArgs {
610                        ctx: &self.ctx,
611                        side_l: &mut self.side_l,
612                        side_r: &mut self.side_r,
613                        actual_output_data_types: &self.actual_output_data_types,
614                        cond: &mut self.cond,
615                        inequality_watermarks: &self.inequality_watermarks,
616                        chunk,
617                        append_only_optimize: self.append_only_optimize,
618                        chunk_size: self.chunk_size,
619                        cnt_rows_received: &mut self.cnt_rows_received,
620                        high_join_amplification_threshold: self.high_join_amplification_threshold,
621                        entry_state_max_rows: self.entry_state_max_rows,
622                    }) {
623                        left_time += left_start_time.elapsed();
624                        yield Message::Chunk(chunk?);
625                        left_start_time = Instant::now();
626                    }
627                    left_time += left_start_time.elapsed();
628                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
629                    self.try_flush_data().await?;
630                }
631                AlignedMessage::Right(chunk) => {
632                    let mut right_time = Duration::from_nanos(0);
633                    let mut right_start_time = Instant::now();
634                    #[for_await]
635                    for chunk in Self::eq_join_right(EqJoinArgs {
636                        ctx: &self.ctx,
637                        side_l: &mut self.side_l,
638                        side_r: &mut self.side_r,
639                        actual_output_data_types: &self.actual_output_data_types,
640                        cond: &mut self.cond,
641                        inequality_watermarks: &self.inequality_watermarks,
642                        chunk,
643                        append_only_optimize: self.append_only_optimize,
644                        chunk_size: self.chunk_size,
645                        cnt_rows_received: &mut self.cnt_rows_received,
646                        high_join_amplification_threshold: self.high_join_amplification_threshold,
647                        entry_state_max_rows: self.entry_state_max_rows,
648                    }) {
649                        right_time += right_start_time.elapsed();
650                        yield Message::Chunk(chunk?);
651                        right_start_time = Instant::now();
652                    }
653                    right_time += right_start_time.elapsed();
654                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
655                    self.try_flush_data().await?;
656                }
657                AlignedMessage::Barrier(barrier) => {
658                    let barrier_start_time = Instant::now();
659                    let (left_post_commit, right_post_commit) =
660                        self.flush_data(barrier.epoch).await?;
661
662                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
663
664                    // We don't include the time post yielding barrier because the vnode update
665                    // is a one-off and rare operation.
666                    barrier_join_match_duration_ns
667                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
668                    yield Message::Barrier(barrier);
669
670                    // Update the vnode bitmap for state tables of both sides if asked.
671                    right_post_commit
672                        .post_yield_barrier(update_vnode_bitmap.clone())
673                        .await?;
674                    if left_post_commit
675                        .post_yield_barrier(update_vnode_bitmap)
676                        .await?
677                        .unwrap_or(false)
678                    {
679                        self.watermark_buffers
680                            .values_mut()
681                            .for_each(|buffers| buffers.clear());
682                        self.inequality_watermarks.fill(None);
683                    }
684
685                    // Report metrics of cached join rows/entries
686                    for (join_cached_entry_count, ht) in [
687                        (&left_join_cached_entry_count, &self.side_l.ht),
688                        (&right_join_cached_entry_count, &self.side_r.ht),
689                    ] {
690                        join_cached_entry_count.set(ht.entry_count() as i64);
691                    }
692                }
693            }
694            start_time = Instant::now();
695        }
696    }
697
698    async fn flush_data(
699        &mut self,
700        epoch: EpochPair,
701    ) -> StreamExecutorResult<(
702        JoinHashMapPostCommit<'_, K, S, E>,
703        JoinHashMapPostCommit<'_, K, S, E>,
704    )> {
705        // All changes to the state has been buffered in the mem-table of the state table. Just
706        // `commit` them here.
707        let left = self.side_l.ht.flush(epoch).await?;
708        let right = self.side_r.ht.flush(epoch).await?;
709        Ok((left, right))
710    }
711
712    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
713        // All changes to the state has been buffered in the mem-table of the state table. Just
714        // `commit` them here.
715        self.side_l.ht.try_flush().await?;
716        self.side_r.ht.try_flush().await?;
717        Ok(())
718    }
719
720    // We need to manually evict the cache.
721    fn evict_cache(
722        side_update: &mut JoinSide<K, S, E>,
723        side_match: &mut JoinSide<K, S, E>,
724        cnt_rows_received: &mut u32,
725    ) {
726        *cnt_rows_received += 1;
727        if *cnt_rows_received == EVICT_EVERY_N_ROWS {
728            side_update.ht.evict();
729            side_match.ht.evict();
730            *cnt_rows_received = 0;
731        }
732    }
733
734    async fn handle_watermark(
735        &mut self,
736        side: SideTypePrimitive,
737        watermark: Watermark,
738    ) -> StreamExecutorResult<Vec<Watermark>> {
739        let (side_update, side_match) = if side == SideType::Left {
740            (&mut self.side_l, &mut self.side_r)
741        } else {
742            (&mut self.side_r, &mut self.side_l)
743        };
744
745        // State cleaning
746        if side_update.join_key_indices[0] == watermark.col_idx {
747            side_match.ht.update_watermark(watermark.val.clone());
748        }
749
750        // Select watermarks to yield.
751        let wm_in_jk = side_update
752            .join_key_indices
753            .iter()
754            .positions(|idx| *idx == watermark.col_idx);
755        let mut watermarks_to_emit = vec![];
756        for idx in wm_in_jk {
757            let buffers = self
758                .watermark_buffers
759                .entry(idx)
760                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
761            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
762                let empty_indices = vec![];
763                let output_indices = side_update
764                    .i2o_mapping_indexed
765                    .get_vec(&side_update.join_key_indices[idx])
766                    .unwrap_or(&empty_indices)
767                    .iter()
768                    .chain(
769                        side_match
770                            .i2o_mapping_indexed
771                            .get_vec(&side_match.join_key_indices[idx])
772                            .unwrap_or(&empty_indices),
773                    );
774                for output_idx in output_indices {
775                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
776                }
777            };
778        }
779        for (inequality_index, need_offset) in
780            &side_update.input2inequality_index[watermark.col_idx]
781        {
782            let buffers = self
783                .watermark_buffers
784                .entry(side_update.join_key_indices.len() + inequality_index)
785                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
786            let mut input_watermark = watermark.clone();
787            if *need_offset
788                && let Some(delta_expression) = self.inequality_pairs[*inequality_index].1.as_ref()
789            {
790                // allow since we will handle error manually.
791                #[allow(clippy::disallowed_methods)]
792                let eval_result = delta_expression
793                    .inner()
794                    .eval_row(&OwnedRow::new(vec![Some(input_watermark.val)]))
795                    .await;
796                match eval_result {
797                    Ok(value) => input_watermark.val = value.unwrap(),
798                    Err(err) => {
799                        if !matches!(err, ExprError::NumericOutOfRange) {
800                            self.ctx.on_compute_error(err, &self.info.identity);
801                        }
802                        continue;
803                    }
804                }
805            };
806            if let Some(selected_watermark) = buffers.handle_watermark(side, input_watermark) {
807                for output_idx in &self.inequality_pairs[*inequality_index].0 {
808                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
809                }
810                self.inequality_watermarks[*inequality_index] = Some(selected_watermark);
811            }
812        }
813        Ok(watermarks_to_emit)
814    }
815
816    fn row_concat(
817        row_update: impl Row,
818        update_start_pos: usize,
819        row_matched: impl Row,
820        matched_start_pos: usize,
821    ) -> OwnedRow {
822        let mut new_row = vec![None; row_update.len() + row_matched.len()];
823
824        for (i, datum_ref) in row_update.iter().enumerate() {
825            new_row[i + update_start_pos] = datum_ref.to_owned_datum();
826        }
827        for (i, datum_ref) in row_matched.iter().enumerate() {
828            new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
829        }
830        OwnedRow::new(new_row)
831    }
832
833    /// Used to forward `eq_join_oneside` to show join side in stack.
834    fn eq_join_left(
835        args: EqJoinArgs<'_, K, S, E>,
836    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
837        Self::eq_join_oneside::<{ SideType::Left }>(args)
838    }
839
840    /// Used to forward `eq_join_oneside` to show join side in stack.
841    fn eq_join_right(
842        args: EqJoinArgs<'_, K, S, E>,
843    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
844        Self::eq_join_oneside::<{ SideType::Right }>(args)
845    }
846
847    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
848    async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
849        let EqJoinArgs {
850            ctx,
851            side_l,
852            side_r,
853            actual_output_data_types,
854            cond,
855            inequality_watermarks,
856            chunk,
857            append_only_optimize,
858            chunk_size,
859            cnt_rows_received,
860            high_join_amplification_threshold,
861            entry_state_max_rows,
862            ..
863        } = args;
864
865        let (side_update, side_match) = if SIDE == SideType::Left {
866            (side_l, side_r)
867        } else {
868            (side_r, side_l)
869        };
870
871        let useful_state_clean_columns = side_match
872            .state_clean_columns
873            .iter()
874            .filter_map(|(column_idx, inequality_index)| {
875                inequality_watermarks[*inequality_index]
876                    .as_ref()
877                    .map(|watermark| (*column_idx, watermark))
878            })
879            .collect_vec();
880
881        let mut hashjoin_chunk_builder =
882            JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
883                chunk_size,
884                actual_output_data_types.to_vec(),
885                side_update.i2o_mapping.clone(),
886                side_match.i2o_mapping.clone(),
887            ));
888
889        let join_matched_join_keys = ctx
890            .streaming_metrics
891            .join_matched_join_keys
892            .with_guarded_label_values(&[
893                &ctx.id.to_string(),
894                &ctx.fragment_id.to_string(),
895                &side_update.ht.table_id().to_string(),
896            ]);
897
898        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
899        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
900            let Some((op, row)) = r else {
901                continue;
902            };
903            Self::evict_cache(side_update, side_match, cnt_rows_received);
904
905            let cache_lookup_result = {
906                let probe_non_null_requirement_satisfied = side_update
907                    .non_null_fields
908                    .iter()
909                    .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
910                let build_non_null_requirement_satisfied =
911                    key.null_bitmap().is_subset(side_match.ht.null_matched());
912                if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
913                    side_match.ht.take_state_opt(key)
914                } else {
915                    CacheResult::NeverMatch
916                }
917            };
918            let mut total_matches = 0;
919
920            macro_rules! match_rows {
921                ($op:ident) => {
922                    Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
923                        cache_lookup_result,
924                        row,
925                        key,
926                        &mut hashjoin_chunk_builder,
927                        side_match,
928                        side_update,
929                        &useful_state_clean_columns,
930                        cond,
931                        append_only_optimize,
932                        entry_state_max_rows,
933                    )
934                };
935            }
936
937            match op {
938                Op::Insert | Op::UpdateInsert =>
939                {
940                    #[for_await]
941                    for chunk in match_rows!(Insert) {
942                        let chunk = chunk?;
943                        total_matches += chunk.cardinality();
944                        yield chunk;
945                    }
946                }
947                Op::Delete | Op::UpdateDelete =>
948                {
949                    #[for_await]
950                    for chunk in match_rows!(Delete) {
951                        let chunk = chunk?;
952                        total_matches += chunk.cardinality();
953                        yield chunk;
954                    }
955                }
956            };
957
958            join_matched_join_keys.observe(total_matches as _);
959            if total_matches > high_join_amplification_threshold {
960                let join_key_data_types = side_update.ht.join_key_data_types();
961                let key = key.deserialize(join_key_data_types)?;
962                tracing::warn!(target: "high_join_amplification",
963                    matched_rows_len = total_matches,
964                    update_table_id = %side_update.ht.table_id(),
965                    match_table_id = %side_match.ht.table_id(),
966                    join_key = ?key,
967                    actor_id = %ctx.id,
968                    fragment_id = %ctx.fragment_id,
969                    "large rows matched for join key"
970                );
971            }
972        }
973        // NOTE(kwannoel): We don't track metrics for this last chunk.
974        if let Some(chunk) = hashjoin_chunk_builder.take() {
975            yield chunk;
976        }
977    }
978
979    /// For the probe-side row, we need to check if it has values in cache, if not, we need to
980    /// fetch the matched rows from the state table.
981    ///
982    /// Every matched build-side row being processed needs to go through the following phases:
983    /// 1. Handle join condition evaluation.
984    /// 2. Always do cache refill, if the state count is good.
985    /// 3. Handle state cleaning.
986    /// 4. Handle degree table update.
987    #[allow(clippy::too_many_arguments)]
988    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
989    async fn handle_match_rows<
990        'a,
991        const SIDE: SideTypePrimitive,
992        const JOIN_OP: JoinOpPrimitive,
993    >(
994        cached_lookup_result: CacheResult<E>,
995        row: RowRef<'a>,
996        key: &'a K,
997        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
998        side_match: &'a mut JoinSide<K, S, E>,
999        side_update: &'a mut JoinSide<K, S, E>,
1000        useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1001        cond: &'a mut Option<NonStrictExpression>,
1002        append_only_optimize: bool,
1003        entry_state_max_rows: usize,
1004    ) {
1005        let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1006        let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1007        let mut entry_state_count = 0;
1008
1009        let mut degree = 0;
1010        let mut append_only_matched_row = None;
1011        let mut matched_rows_to_clean = vec![];
1012
1013        macro_rules! match_row {
1014            (
1015                $match_order_key_indices:expr,
1016                $degree_table:expr,
1017                $matched_row:expr,
1018                $matched_row_ref:expr,
1019                $from_cache:literal,
1020                $map_output:expr,
1021            ) => {
1022                Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1023                    row,
1024                    $matched_row,
1025                    $matched_row_ref,
1026                    hashjoin_chunk_builder,
1027                    $match_order_key_indices,
1028                    $degree_table,
1029                    side_update.start_pos,
1030                    side_match.start_pos,
1031                    cond,
1032                    &mut degree,
1033                    useful_state_clean_columns,
1034                    append_only_optimize,
1035                    &mut append_only_matched_row,
1036                    &mut matched_rows_to_clean,
1037                    $map_output,
1038                )
1039            };
1040        }
1041
1042        let entry_state = match cached_lookup_result {
1043            CacheResult::NeverMatch => {
1044                let op = match JOIN_OP {
1045                    JoinOp::Insert => Op::Insert,
1046                    JoinOp::Delete => Op::Delete,
1047                };
1048                if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1049                    yield chunk;
1050                }
1051                return Ok(());
1052            }
1053            CacheResult::Hit(mut cached_rows) => {
1054                let (match_order_key_indices, match_degree_state) =
1055                    side_match.ht.get_degree_state_mut_ref();
1056                // Handle cached rows which match the probe-side row.
1057                for (matched_row_ref, matched_row) in
1058                    cached_rows.values_mut(&side_match.all_data_types)
1059                {
1060                    let matched_row = matched_row?;
1061                    if let Some(chunk) = match_row!(
1062                        match_order_key_indices,
1063                        match_degree_state,
1064                        matched_row,
1065                        Some(matched_row_ref),
1066                        true,
1067                        Either::Left,
1068                    )
1069                    .await
1070                    {
1071                        yield chunk;
1072                    }
1073                }
1074
1075                cached_rows
1076            }
1077            CacheResult::Miss => {
1078                // Handle rows which are not in cache.
1079                let (matched_rows, match_order_key_indices, degree_table) = side_match
1080                    .ht
1081                    .fetch_matched_rows_and_get_degree_table_ref(key)
1082                    .await?;
1083
1084                #[for_await]
1085                for matched_row in matched_rows {
1086                    let (encoded_pk, matched_row) = matched_row?;
1087
1088                    let mut matched_row_ref = None;
1089
1090                    // cache refill
1091                    if entry_state_count <= entry_state_max_rows {
1092                        let row_ref = entry_state
1093                            .insert(encoded_pk, E::encode(&matched_row), None) // TODO(kwannoel): handle ineq key for asof join.
1094                            .with_context(|| format!("row: {}", row.display(),))?;
1095                        matched_row_ref = Some(row_ref);
1096                        entry_state_count += 1;
1097                    }
1098                    if let Some(chunk) = match_row!(
1099                        match_order_key_indices,
1100                        degree_table,
1101                        matched_row,
1102                        matched_row_ref,
1103                        false,
1104                        Either::Right,
1105                    )
1106                    .await
1107                    {
1108                        yield chunk;
1109                    }
1110                }
1111                Box::new(entry_state)
1112            }
1113        };
1114
1115        // forward rows depending on join types
1116        let op = match JOIN_OP {
1117            JoinOp::Insert => Op::Insert,
1118            JoinOp::Delete => Op::Delete,
1119        };
1120        if degree == 0 {
1121            if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1122                yield chunk;
1123            }
1124        } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1125        {
1126            yield chunk;
1127        }
1128
1129        // cache refill
1130        if cache_hit || entry_state_count <= entry_state_max_rows {
1131            side_match.ht.update_state(key, entry_state);
1132        }
1133
1134        // watermark state cleaning
1135        for matched_row in matched_rows_to_clean {
1136            side_match.ht.delete_handle_degree(key, matched_row)?;
1137        }
1138
1139        // apply append_only optimization to clean matched_rows which have been persisted
1140        if append_only_optimize && let Some(row) = append_only_matched_row {
1141            assert_matches!(JOIN_OP, JoinOp::Insert);
1142            side_match.ht.delete_handle_degree(key, row)?;
1143            return Ok(());
1144        }
1145
1146        // no append_only optimization, update state table(s).
1147        match JOIN_OP {
1148            JoinOp::Insert => {
1149                side_update
1150                    .ht
1151                    .insert_handle_degree(key, JoinRow::new(row, degree))?;
1152            }
1153            JoinOp::Delete => {
1154                side_update
1155                    .ht
1156                    .delete_handle_degree(key, JoinRow::new(row, degree))?;
1157            }
1158        }
1159    }
1160
1161    #[allow(clippy::too_many_arguments)]
1162    #[inline]
1163    async fn handle_match_row<
1164        'a,
1165        R: Row,  // input row type
1166        RO: Row, // output row type
1167        const SIDE: SideTypePrimitive,
1168        const JOIN_OP: JoinOpPrimitive,
1169        const MATCHED_ROWS_FROM_CACHE: bool,
1170    >(
1171        update_row: RowRef<'a>,
1172        mut matched_row: JoinRow<R>,
1173        mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1174        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1175        match_order_key_indices: &[usize],
1176        match_degree_table: &mut Option<TableInner<S>>,
1177        side_update_start_pos: usize,
1178        side_match_start_pos: usize,
1179        cond: &Option<NonStrictExpression>,
1180        update_row_degree: &mut u64,
1181        useful_state_clean_columns: &[(usize, &'a Watermark)],
1182        append_only_optimize: bool,
1183        append_only_matched_row: &mut Option<JoinRow<RO>>,
1184        matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1185        map_output: impl Fn(R) -> RO,
1186    ) -> Option<StreamChunk> {
1187        let mut need_state_clean = false;
1188        let mut chunk_opt = None;
1189
1190        // TODO(kwannoel): Instead of evaluating this every loop,
1191        // we can call this only if there's a non-equi expression.
1192        // check join cond
1193        let join_condition_satisfied = Self::check_join_condition(
1194            update_row,
1195            side_update_start_pos,
1196            &matched_row.row,
1197            side_match_start_pos,
1198            cond,
1199        )
1200        .await;
1201
1202        if join_condition_satisfied {
1203            // update degree
1204            *update_row_degree += 1;
1205            // send matched row downstream
1206
1207            // Inserts must happen before degrees are updated,
1208            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1209            if matches!(JOIN_OP, JoinOp::Insert)
1210                && !forward_exactly_once(T, SIDE)
1211                && let Some(chunk) =
1212                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1213            {
1214                chunk_opt = Some(chunk);
1215            }
1216            // TODO(kwannoel): We can actually statically decide this, using join side + join type.
1217            if let Some(degree_table) = match_degree_table {
1218                update_degree::<S, { JOIN_OP }>(
1219                    match_order_key_indices,
1220                    degree_table,
1221                    &mut matched_row,
1222                );
1223                if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1224                    // update matched row in cache
1225                    match JOIN_OP {
1226                        JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1227                        JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1228                    }
1229                }
1230            }
1231
1232            // Deletes must happen after degree table is updated.
1233            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1234            if matches!(JOIN_OP, JoinOp::Delete)
1235                && !forward_exactly_once(T, SIDE)
1236                && let Some(chunk) =
1237                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1238            {
1239                chunk_opt = Some(chunk);
1240            }
1241        } else {
1242            // check if need state cleaning
1243            for (column_idx, watermark) in useful_state_clean_columns {
1244                if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1245                    scalar
1246                        .default_cmp(&watermark.val.as_scalar_ref_impl())
1247                        .is_lt()
1248                }) {
1249                    need_state_clean = true;
1250                    break;
1251                }
1252            }
1253        }
1254        // If the stream is append-only and the join key covers pk in both side,
1255        // then we can remove matched rows since pk is unique and will not be
1256        // inserted again
1257        if append_only_optimize {
1258            assert_matches!(JOIN_OP, JoinOp::Insert);
1259            // Since join key contains pk and pk is unique, there should be only
1260            // one row if matched.
1261            assert!(append_only_matched_row.is_none());
1262            *append_only_matched_row = Some(matched_row.map(map_output));
1263        } else if need_state_clean {
1264            // `append_only_optimize` and `need_state_clean` won't both be true.
1265            // 'else' here is only to suppress compiler error, otherwise
1266            // `matched_row` will be moved twice.
1267            matched_rows_to_clean.push(matched_row.map(map_output));
1268        }
1269
1270        chunk_opt
1271    }
1272
1273    // TODO(yuhao-su): We should find a better way to eval the expression
1274    // without concat two rows.
1275    // if there are non-equi expressions
1276    // NOTE(kwannoel): We can probably let `eval` use `impl Row` instead of `OwnedRow`.
1277    #[inline]
1278    async fn check_join_condition(
1279        row: impl Row,
1280        side_update_start_pos: usize,
1281        matched_row: impl Row,
1282        side_match_start_pos: usize,
1283        join_condition: &Option<NonStrictExpression>,
1284    ) -> bool {
1285        if let Some(join_condition) = join_condition {
1286            let new_row = Self::row_concat(
1287                row,
1288                side_update_start_pos,
1289                matched_row,
1290                side_match_start_pos,
1291            );
1292            join_condition
1293                .eval_row_infallible(&new_row)
1294                .await
1295                .map(|s| *s.as_bool())
1296                .unwrap_or(false)
1297        } else {
1298            true
1299        }
1300    }
1301}
1302
1303#[cfg(test)]
1304mod tests {
1305    use std::sync::atomic::AtomicU64;
1306
1307    use pretty_assertions::assert_eq;
1308    use risingwave_common::array::*;
1309    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1310    use risingwave_common::hash::{Key64, Key128};
1311    use risingwave_common::util::epoch::test_epoch;
1312    use risingwave_common::util::sort_util::OrderType;
1313    use risingwave_storage::memory::MemoryStateStore;
1314
1315    use super::*;
1316    use crate::common::table::test_utils::gen_pbtable;
1317    use crate::executor::MemoryEncoding;
1318    use crate::executor::test_utils::expr::build_from_pretty;
1319    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1320
1321    async fn create_in_memory_state_table(
1322        mem_state: MemoryStateStore,
1323        data_types: &[DataType],
1324        order_types: &[OrderType],
1325        pk_indices: &[usize],
1326        table_id: u32,
1327    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1328        let column_descs = data_types
1329            .iter()
1330            .enumerate()
1331            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1332            .collect_vec();
1333        let state_table = StateTable::from_table_catalog(
1334            &gen_pbtable(
1335                TableId::new(table_id),
1336                column_descs,
1337                order_types.to_vec(),
1338                pk_indices.to_vec(),
1339                0,
1340            ),
1341            mem_state.clone(),
1342            None,
1343        )
1344        .await;
1345
1346        // Create degree table
1347        let mut degree_table_column_descs = vec![];
1348        pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1349            degree_table_column_descs.push(ColumnDesc::unnamed(
1350                ColumnId::new(pk_id as i32),
1351                data_types[*idx].clone(),
1352            ))
1353        });
1354        degree_table_column_descs.push(ColumnDesc::unnamed(
1355            ColumnId::new(pk_indices.len() as i32),
1356            DataType::Int64,
1357        ));
1358        let degree_state_table = StateTable::from_table_catalog(
1359            &gen_pbtable(
1360                TableId::new(table_id + 1),
1361                degree_table_column_descs,
1362                order_types.to_vec(),
1363                pk_indices.to_vec(),
1364                0,
1365            ),
1366            mem_state,
1367            None,
1368        )
1369        .await;
1370        (state_table, degree_state_table)
1371    }
1372
1373    fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1374        build_from_pretty(
1375            condition_text
1376                .as_deref()
1377                .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1378        )
1379    }
1380
1381    async fn create_executor<const T: JoinTypePrimitive>(
1382        with_condition: bool,
1383        null_safe: bool,
1384        condition_text: Option<String>,
1385        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
1386    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1387        let schema = Schema {
1388            fields: vec![
1389                Field::unnamed(DataType::Int64), // join key
1390                Field::unnamed(DataType::Int64),
1391            ],
1392        };
1393        let (tx_l, source_l) = MockSource::channel();
1394        let source_l = source_l.into_executor(schema.clone(), vec![1]);
1395        let (tx_r, source_r) = MockSource::channel();
1396        let source_r = source_r.into_executor(schema, vec![1]);
1397        let params_l = JoinParams::new(vec![0], vec![1]);
1398        let params_r = JoinParams::new(vec![0], vec![1]);
1399        let cond = with_condition.then(|| create_cond(condition_text));
1400
1401        let mem_state = MemoryStateStore::new();
1402
1403        let (state_l, degree_state_l) = create_in_memory_state_table(
1404            mem_state.clone(),
1405            &[DataType::Int64, DataType::Int64],
1406            &[OrderType::ascending(), OrderType::ascending()],
1407            &[0, 1],
1408            0,
1409        )
1410        .await;
1411
1412        let (state_r, degree_state_r) = create_in_memory_state_table(
1413            mem_state,
1414            &[DataType::Int64, DataType::Int64],
1415            &[OrderType::ascending(), OrderType::ascending()],
1416            &[0, 1],
1417            2,
1418        )
1419        .await;
1420
1421        let schema = match T {
1422            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1423            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1424            _ => [source_l.schema().fields(), source_r.schema().fields()]
1425                .concat()
1426                .into_iter()
1427                .collect(),
1428        };
1429        let schema_len = schema.len();
1430        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1431
1432        let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1433            ActorContext::for_test(123),
1434            info,
1435            source_l,
1436            source_r,
1437            params_l,
1438            params_r,
1439            vec![null_safe],
1440            (0..schema_len).collect_vec(),
1441            cond,
1442            inequality_pairs,
1443            state_l,
1444            degree_state_l,
1445            state_r,
1446            degree_state_r,
1447            Arc::new(AtomicU64::new(0)),
1448            false,
1449            Arc::new(StreamingMetrics::unused()),
1450            1024,
1451            2048,
1452        );
1453        (tx_l, tx_r, executor.boxed().execute())
1454    }
1455
1456    async fn create_classical_executor<const T: JoinTypePrimitive>(
1457        with_condition: bool,
1458        null_safe: bool,
1459        condition_text: Option<String>,
1460    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1461        create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1462    }
1463
1464    async fn create_append_only_executor<const T: JoinTypePrimitive>(
1465        with_condition: bool,
1466    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1467        let schema = Schema {
1468            fields: vec![
1469                Field::unnamed(DataType::Int64),
1470                Field::unnamed(DataType::Int64),
1471                Field::unnamed(DataType::Int64),
1472            ],
1473        };
1474        let (tx_l, source_l) = MockSource::channel();
1475        let source_l = source_l.into_executor(schema.clone(), vec![0]);
1476        let (tx_r, source_r) = MockSource::channel();
1477        let source_r = source_r.into_executor(schema, vec![0]);
1478        let params_l = JoinParams::new(vec![0, 1], vec![]);
1479        let params_r = JoinParams::new(vec![0, 1], vec![]);
1480        let cond = with_condition.then(|| create_cond(None));
1481
1482        let mem_state = MemoryStateStore::new();
1483
1484        let (state_l, degree_state_l) = create_in_memory_state_table(
1485            mem_state.clone(),
1486            &[DataType::Int64, DataType::Int64, DataType::Int64],
1487            &[
1488                OrderType::ascending(),
1489                OrderType::ascending(),
1490                OrderType::ascending(),
1491            ],
1492            &[0, 1, 0],
1493            0,
1494        )
1495        .await;
1496
1497        let (state_r, degree_state_r) = create_in_memory_state_table(
1498            mem_state,
1499            &[DataType::Int64, DataType::Int64, DataType::Int64],
1500            &[
1501                OrderType::ascending(),
1502                OrderType::ascending(),
1503                OrderType::ascending(),
1504            ],
1505            &[0, 1, 1],
1506            1,
1507        )
1508        .await;
1509
1510        let schema = match T {
1511            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1512            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1513            _ => [source_l.schema().fields(), source_r.schema().fields()]
1514                .concat()
1515                .into_iter()
1516                .collect(),
1517        };
1518        let schema_len = schema.len();
1519        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1520
1521        let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1522            ActorContext::for_test(123),
1523            info,
1524            source_l,
1525            source_r,
1526            params_l,
1527            params_r,
1528            vec![false],
1529            (0..schema_len).collect_vec(),
1530            cond,
1531            vec![],
1532            state_l,
1533            degree_state_l,
1534            state_r,
1535            degree_state_r,
1536            Arc::new(AtomicU64::new(0)),
1537            true,
1538            Arc::new(StreamingMetrics::unused()),
1539            1024,
1540            2048,
1541        );
1542        (tx_l, tx_r, executor.boxed().execute())
1543    }
1544
1545    #[tokio::test]
1546    async fn test_interval_join() -> StreamExecutorResult<()> {
1547        let chunk_l1 = StreamChunk::from_pretty(
1548            "  I I
1549             + 1 4
1550             + 2 3
1551             + 2 5
1552             + 3 6",
1553        );
1554        let chunk_l2 = StreamChunk::from_pretty(
1555            "  I I
1556             + 3 8
1557             - 3 8",
1558        );
1559        let chunk_r1 = StreamChunk::from_pretty(
1560            "  I I
1561             + 2 6
1562             + 4 8
1563             + 6 9",
1564        );
1565        let chunk_r2 = StreamChunk::from_pretty(
1566            "  I  I
1567             + 2 3
1568             + 6 11",
1569        );
1570        let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1571            true,
1572            false,
1573            Some(String::from("(and:boolean (greater_than:boolean $1:int8 (subtract:int8 $3:int8 2:int8)) (greater_than:boolean $3:int8 (subtract:int8 $1:int8 2:int8)))")),
1574            vec![(1, 3, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)"))), (3, 1, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)")))],
1575        )
1576        .await;
1577
1578        // push the init barrier for left and right
1579        tx_l.push_barrier(test_epoch(1), false);
1580        tx_r.push_barrier(test_epoch(1), false);
1581        hash_join.next_unwrap_ready_barrier()?;
1582
1583        // push the 1st left chunk
1584        tx_l.push_chunk(chunk_l1);
1585        hash_join.next_unwrap_pending();
1586
1587        // push the init barrier for left and right
1588        tx_l.push_barrier(test_epoch(2), false);
1589        tx_r.push_barrier(test_epoch(2), false);
1590        hash_join.next_unwrap_ready_barrier()?;
1591
1592        // push the 2nd left chunk
1593        tx_l.push_chunk(chunk_l2);
1594        hash_join.next_unwrap_pending();
1595
1596        tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1597        hash_join.next_unwrap_pending();
1598
1599        tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1600        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1601        assert_eq!(
1602            output_watermark,
1603            Watermark::new(1, DataType::Int64, ScalarImpl::Int64(4))
1604        );
1605        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1606        assert_eq!(
1607            output_watermark,
1608            Watermark::new(3, DataType::Int64, ScalarImpl::Int64(6))
1609        );
1610
1611        // push the 1st right chunk
1612        tx_r.push_chunk(chunk_r1);
1613        let chunk = hash_join.next_unwrap_ready_chunk()?;
1614        // data "2 3" should have been cleaned
1615        assert_eq!(
1616            chunk,
1617            StreamChunk::from_pretty(
1618                " I I I I
1619                + 2 5 2 6"
1620            )
1621        );
1622
1623        // push the 2nd right chunk
1624        tx_r.push_chunk(chunk_r2);
1625        // pending means that state clean is successful, or the executor will yield a chunk "+ 2 3
1626        // 2 3" here.
1627        hash_join.next_unwrap_pending();
1628
1629        Ok(())
1630    }
1631
1632    #[tokio::test]
1633    async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1634        let chunk_l1 = StreamChunk::from_pretty(
1635            "  I I
1636             + 1 4
1637             + 2 5
1638             + 3 6",
1639        );
1640        let chunk_l2 = StreamChunk::from_pretty(
1641            "  I I
1642             + 3 8
1643             - 3 8",
1644        );
1645        let chunk_r1 = StreamChunk::from_pretty(
1646            "  I I
1647             + 2 7
1648             + 4 8
1649             + 6 9",
1650        );
1651        let chunk_r2 = StreamChunk::from_pretty(
1652            "  I  I
1653             + 3 10
1654             + 6 11",
1655        );
1656        let (mut tx_l, mut tx_r, mut hash_join) =
1657            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1658
1659        // push the init barrier for left and right
1660        tx_l.push_barrier(test_epoch(1), false);
1661        tx_r.push_barrier(test_epoch(1), false);
1662        hash_join.next_unwrap_ready_barrier()?;
1663
1664        // push the 1st left chunk
1665        tx_l.push_chunk(chunk_l1);
1666        hash_join.next_unwrap_pending();
1667
1668        // push the init barrier for left and right
1669        tx_l.push_barrier(test_epoch(2), false);
1670        tx_r.push_barrier(test_epoch(2), false);
1671        hash_join.next_unwrap_ready_barrier()?;
1672
1673        // push the 2nd left chunk
1674        tx_l.push_chunk(chunk_l2);
1675        hash_join.next_unwrap_pending();
1676
1677        // push the 1st right chunk
1678        tx_r.push_chunk(chunk_r1);
1679        let chunk = hash_join.next_unwrap_ready_chunk()?;
1680        assert_eq!(
1681            chunk,
1682            StreamChunk::from_pretty(
1683                " I I I I
1684                + 2 5 2 7"
1685            )
1686        );
1687
1688        // push the 2nd right chunk
1689        tx_r.push_chunk(chunk_r2);
1690        let chunk = hash_join.next_unwrap_ready_chunk()?;
1691        assert_eq!(
1692            chunk,
1693            StreamChunk::from_pretty(
1694                " I I I I
1695                + 3 6 3 10"
1696            )
1697        );
1698
1699        Ok(())
1700    }
1701
1702    #[tokio::test]
1703    async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1704        let chunk_l1 = StreamChunk::from_pretty(
1705            "  I I
1706             + 1 4
1707             + 2 5
1708             + . 6",
1709        );
1710        let chunk_l2 = StreamChunk::from_pretty(
1711            "  I I
1712             + . 8
1713             - . 8",
1714        );
1715        let chunk_r1 = StreamChunk::from_pretty(
1716            "  I I
1717             + 2 7
1718             + 4 8
1719             + 6 9",
1720        );
1721        let chunk_r2 = StreamChunk::from_pretty(
1722            "  I  I
1723             + . 10
1724             + 6 11",
1725        );
1726        let (mut tx_l, mut tx_r, mut hash_join) =
1727            create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1728
1729        // push the init barrier for left and right
1730        tx_l.push_barrier(test_epoch(1), false);
1731        tx_r.push_barrier(test_epoch(1), false);
1732        hash_join.next_unwrap_ready_barrier()?;
1733
1734        // push the 1st left chunk
1735        tx_l.push_chunk(chunk_l1);
1736        hash_join.next_unwrap_pending();
1737
1738        // push the init barrier for left and right
1739        tx_l.push_barrier(test_epoch(2), false);
1740        tx_r.push_barrier(test_epoch(2), false);
1741        hash_join.next_unwrap_ready_barrier()?;
1742
1743        // push the 2nd left chunk
1744        tx_l.push_chunk(chunk_l2);
1745        hash_join.next_unwrap_pending();
1746
1747        // push the 1st right chunk
1748        tx_r.push_chunk(chunk_r1);
1749        let chunk = hash_join.next_unwrap_ready_chunk()?;
1750        assert_eq!(
1751            chunk,
1752            StreamChunk::from_pretty(
1753                " I I I I
1754                + 2 5 2 7"
1755            )
1756        );
1757
1758        // push the 2nd right chunk
1759        tx_r.push_chunk(chunk_r2);
1760        let chunk = hash_join.next_unwrap_ready_chunk()?;
1761        assert_eq!(
1762            chunk,
1763            StreamChunk::from_pretty(
1764                " I I I I
1765                + . 6 . 10"
1766            )
1767        );
1768
1769        Ok(())
1770    }
1771
1772    #[tokio::test]
1773    async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1774        let chunk_l1 = StreamChunk::from_pretty(
1775            "  I I
1776             + 1 4
1777             + 2 5
1778             + 3 6",
1779        );
1780        let chunk_l2 = StreamChunk::from_pretty(
1781            "  I I
1782             + 3 8
1783             - 3 8",
1784        );
1785        let chunk_r1 = StreamChunk::from_pretty(
1786            "  I I
1787             + 2 7
1788             + 4 8
1789             + 6 9",
1790        );
1791        let chunk_r2 = StreamChunk::from_pretty(
1792            "  I  I
1793             + 3 10
1794             + 6 11",
1795        );
1796        let chunk_l3 = StreamChunk::from_pretty(
1797            "  I I
1798             + 6 10",
1799        );
1800        let chunk_r3 = StreamChunk::from_pretty(
1801            "  I  I
1802             - 6 11",
1803        );
1804        let chunk_r4 = StreamChunk::from_pretty(
1805            "  I  I
1806             - 6 9",
1807        );
1808        let (mut tx_l, mut tx_r, mut hash_join) =
1809            create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1810
1811        // push the init barrier for left and right
1812        tx_l.push_barrier(test_epoch(1), false);
1813        tx_r.push_barrier(test_epoch(1), false);
1814        hash_join.next_unwrap_ready_barrier()?;
1815
1816        // push the 1st left chunk
1817        tx_l.push_chunk(chunk_l1);
1818        hash_join.next_unwrap_pending();
1819
1820        // push the init barrier for left and right
1821        tx_l.push_barrier(test_epoch(2), false);
1822        tx_r.push_barrier(test_epoch(2), false);
1823        hash_join.next_unwrap_ready_barrier()?;
1824
1825        // push the 2nd left chunk
1826        tx_l.push_chunk(chunk_l2);
1827        hash_join.next_unwrap_pending();
1828
1829        // push the 1st right chunk
1830        tx_r.push_chunk(chunk_r1);
1831        let chunk = hash_join.next_unwrap_ready_chunk()?;
1832        assert_eq!(
1833            chunk,
1834            StreamChunk::from_pretty(
1835                " I I
1836                + 2 5"
1837            )
1838        );
1839
1840        // push the 2nd right chunk
1841        tx_r.push_chunk(chunk_r2);
1842        let chunk = hash_join.next_unwrap_ready_chunk()?;
1843        assert_eq!(
1844            chunk,
1845            StreamChunk::from_pretty(
1846                " I I
1847                + 3 6"
1848            )
1849        );
1850
1851        // push the 3rd left chunk (tests forward_exactly_once)
1852        tx_l.push_chunk(chunk_l3);
1853        let chunk = hash_join.next_unwrap_ready_chunk()?;
1854        assert_eq!(
1855            chunk,
1856            StreamChunk::from_pretty(
1857                " I I
1858                + 6 10"
1859            )
1860        );
1861
1862        // push the 3rd right chunk
1863        // (tests that no change if there are still matches)
1864        tx_r.push_chunk(chunk_r3);
1865        hash_join.next_unwrap_pending();
1866
1867        // push the 3rd left chunk
1868        // (tests that deletion occurs when there are no more matches)
1869        tx_r.push_chunk(chunk_r4);
1870        let chunk = hash_join.next_unwrap_ready_chunk()?;
1871        assert_eq!(
1872            chunk,
1873            StreamChunk::from_pretty(
1874                " I I
1875                - 6 10"
1876            )
1877        );
1878
1879        Ok(())
1880    }
1881
1882    #[tokio::test]
1883    async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1884        let chunk_l1 = StreamChunk::from_pretty(
1885            "  I I
1886             + 1 4
1887             + 2 5
1888             + . 6",
1889        );
1890        let chunk_l2 = StreamChunk::from_pretty(
1891            "  I I
1892             + . 8
1893             - . 8",
1894        );
1895        let chunk_r1 = StreamChunk::from_pretty(
1896            "  I I
1897             + 2 7
1898             + 4 8
1899             + 6 9",
1900        );
1901        let chunk_r2 = StreamChunk::from_pretty(
1902            "  I  I
1903             + . 10
1904             + 6 11",
1905        );
1906        let chunk_l3 = StreamChunk::from_pretty(
1907            "  I I
1908             + 6 10",
1909        );
1910        let chunk_r3 = StreamChunk::from_pretty(
1911            "  I  I
1912             - 6 11",
1913        );
1914        let chunk_r4 = StreamChunk::from_pretty(
1915            "  I  I
1916             - 6 9",
1917        );
1918        let (mut tx_l, mut tx_r, mut hash_join) =
1919            create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1920
1921        // push the init barrier for left and right
1922        tx_l.push_barrier(test_epoch(1), false);
1923        tx_r.push_barrier(test_epoch(1), false);
1924        hash_join.next_unwrap_ready_barrier()?;
1925
1926        // push the 1st left chunk
1927        tx_l.push_chunk(chunk_l1);
1928        hash_join.next_unwrap_pending();
1929
1930        // push the init barrier for left and right
1931        tx_l.push_barrier(test_epoch(2), false);
1932        tx_r.push_barrier(test_epoch(2), false);
1933        hash_join.next_unwrap_ready_barrier()?;
1934
1935        // push the 2nd left chunk
1936        tx_l.push_chunk(chunk_l2);
1937        hash_join.next_unwrap_pending();
1938
1939        // push the 1st right chunk
1940        tx_r.push_chunk(chunk_r1);
1941        let chunk = hash_join.next_unwrap_ready_chunk()?;
1942        assert_eq!(
1943            chunk,
1944            StreamChunk::from_pretty(
1945                " I I
1946                + 2 5"
1947            )
1948        );
1949
1950        // push the 2nd right chunk
1951        tx_r.push_chunk(chunk_r2);
1952        let chunk = hash_join.next_unwrap_ready_chunk()?;
1953        assert_eq!(
1954            chunk,
1955            StreamChunk::from_pretty(
1956                " I I
1957                + . 6"
1958            )
1959        );
1960
1961        // push the 3rd left chunk (tests forward_exactly_once)
1962        tx_l.push_chunk(chunk_l3);
1963        let chunk = hash_join.next_unwrap_ready_chunk()?;
1964        assert_eq!(
1965            chunk,
1966            StreamChunk::from_pretty(
1967                " I I
1968                + 6 10"
1969            )
1970        );
1971
1972        // push the 3rd right chunk
1973        // (tests that no change if there are still matches)
1974        tx_r.push_chunk(chunk_r3);
1975        hash_join.next_unwrap_pending();
1976
1977        // push the 3rd left chunk
1978        // (tests that deletion occurs when there are no more matches)
1979        tx_r.push_chunk(chunk_r4);
1980        let chunk = hash_join.next_unwrap_ready_chunk()?;
1981        assert_eq!(
1982            chunk,
1983            StreamChunk::from_pretty(
1984                " I I
1985                - 6 10"
1986            )
1987        );
1988
1989        Ok(())
1990    }
1991
1992    #[tokio::test]
1993    async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
1994        let chunk_l1 = StreamChunk::from_pretty(
1995            "  I I I
1996             + 1 4 1
1997             + 2 5 2
1998             + 3 6 3",
1999        );
2000        let chunk_l2 = StreamChunk::from_pretty(
2001            "  I I I
2002             + 4 9 4
2003             + 5 10 5",
2004        );
2005        let chunk_r1 = StreamChunk::from_pretty(
2006            "  I I I
2007             + 2 5 1
2008             + 4 9 2
2009             + 6 9 3",
2010        );
2011        let chunk_r2 = StreamChunk::from_pretty(
2012            "  I I I
2013             + 1 4 4
2014             + 3 6 5",
2015        );
2016
2017        let (mut tx_l, mut tx_r, mut hash_join) =
2018            create_append_only_executor::<{ JoinType::Inner }>(false).await;
2019
2020        // push the init barrier for left and right
2021        tx_l.push_barrier(test_epoch(1), false);
2022        tx_r.push_barrier(test_epoch(1), false);
2023        hash_join.next_unwrap_ready_barrier()?;
2024
2025        // push the 1st left chunk
2026        tx_l.push_chunk(chunk_l1);
2027        hash_join.next_unwrap_pending();
2028
2029        // push the init barrier for left and right
2030        tx_l.push_barrier(test_epoch(2), false);
2031        tx_r.push_barrier(test_epoch(2), false);
2032        hash_join.next_unwrap_ready_barrier()?;
2033
2034        // push the 2nd left chunk
2035        tx_l.push_chunk(chunk_l2);
2036        hash_join.next_unwrap_pending();
2037
2038        // push the 1st right chunk
2039        tx_r.push_chunk(chunk_r1);
2040        let chunk = hash_join.next_unwrap_ready_chunk()?;
2041        assert_eq!(
2042            chunk,
2043            StreamChunk::from_pretty(
2044                " I I I I I I
2045                + 2 5 2 2 5 1
2046                + 4 9 4 4 9 2"
2047            )
2048        );
2049
2050        // push the 2nd right chunk
2051        tx_r.push_chunk(chunk_r2);
2052        let chunk = hash_join.next_unwrap_ready_chunk()?;
2053        assert_eq!(
2054            chunk,
2055            StreamChunk::from_pretty(
2056                " I I I I I I
2057                + 1 4 1 1 4 4
2058                + 3 6 3 3 6 5"
2059            )
2060        );
2061
2062        Ok(())
2063    }
2064
2065    #[tokio::test]
2066    async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2067        let chunk_l1 = StreamChunk::from_pretty(
2068            "  I I I
2069             + 1 4 1
2070             + 2 5 2
2071             + 3 6 3",
2072        );
2073        let chunk_l2 = StreamChunk::from_pretty(
2074            "  I I I
2075             + 4 9 4
2076             + 5 10 5",
2077        );
2078        let chunk_r1 = StreamChunk::from_pretty(
2079            "  I I I
2080             + 2 5 1
2081             + 4 9 2
2082             + 6 9 3",
2083        );
2084        let chunk_r2 = StreamChunk::from_pretty(
2085            "  I I I
2086             + 1 4 4
2087             + 3 6 5",
2088        );
2089
2090        let (mut tx_l, mut tx_r, mut hash_join) =
2091            create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2092
2093        // push the init barrier for left and right
2094        tx_l.push_barrier(test_epoch(1), false);
2095        tx_r.push_barrier(test_epoch(1), false);
2096        hash_join.next_unwrap_ready_barrier()?;
2097
2098        // push the 1st left chunk
2099        tx_l.push_chunk(chunk_l1);
2100        hash_join.next_unwrap_pending();
2101
2102        // push the init barrier for left and right
2103        tx_l.push_barrier(test_epoch(2), false);
2104        tx_r.push_barrier(test_epoch(2), false);
2105        hash_join.next_unwrap_ready_barrier()?;
2106
2107        // push the 2nd left chunk
2108        tx_l.push_chunk(chunk_l2);
2109        hash_join.next_unwrap_pending();
2110
2111        // push the 1st right chunk
2112        tx_r.push_chunk(chunk_r1);
2113        let chunk = hash_join.next_unwrap_ready_chunk()?;
2114        assert_eq!(
2115            chunk,
2116            StreamChunk::from_pretty(
2117                " I I I
2118                + 2 5 2
2119                + 4 9 4"
2120            )
2121        );
2122
2123        // push the 2nd right chunk
2124        tx_r.push_chunk(chunk_r2);
2125        let chunk = hash_join.next_unwrap_ready_chunk()?;
2126        assert_eq!(
2127            chunk,
2128            StreamChunk::from_pretty(
2129                " I I I
2130                + 1 4 1
2131                + 3 6 3"
2132            )
2133        );
2134
2135        Ok(())
2136    }
2137
2138    #[tokio::test]
2139    async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2140        let chunk_l1 = StreamChunk::from_pretty(
2141            "  I I I
2142             + 1 4 1
2143             + 2 5 2
2144             + 3 6 3",
2145        );
2146        let chunk_l2 = StreamChunk::from_pretty(
2147            "  I I I
2148             + 4 9 4
2149             + 5 10 5",
2150        );
2151        let chunk_r1 = StreamChunk::from_pretty(
2152            "  I I I
2153             + 2 5 1
2154             + 4 9 2
2155             + 6 9 3",
2156        );
2157        let chunk_r2 = StreamChunk::from_pretty(
2158            "  I I I
2159             + 1 4 4
2160             + 3 6 5",
2161        );
2162
2163        let (mut tx_l, mut tx_r, mut hash_join) =
2164            create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2165
2166        // push the init barrier for left and right
2167        tx_l.push_barrier(test_epoch(1), false);
2168        tx_r.push_barrier(test_epoch(1), false);
2169        hash_join.next_unwrap_ready_barrier()?;
2170
2171        // push the 1st left chunk
2172        tx_l.push_chunk(chunk_l1);
2173        hash_join.next_unwrap_pending();
2174
2175        // push the init barrier for left and right
2176        tx_l.push_barrier(test_epoch(2), false);
2177        tx_r.push_barrier(test_epoch(2), false);
2178        hash_join.next_unwrap_ready_barrier()?;
2179
2180        // push the 2nd left chunk
2181        tx_l.push_chunk(chunk_l2);
2182        hash_join.next_unwrap_pending();
2183
2184        // push the 1st right chunk
2185        tx_r.push_chunk(chunk_r1);
2186        let chunk = hash_join.next_unwrap_ready_chunk()?;
2187        assert_eq!(
2188            chunk,
2189            StreamChunk::from_pretty(
2190                " I I I
2191                + 2 5 1
2192                + 4 9 2"
2193            )
2194        );
2195
2196        // push the 2nd right chunk
2197        tx_r.push_chunk(chunk_r2);
2198        let chunk = hash_join.next_unwrap_ready_chunk()?;
2199        assert_eq!(
2200            chunk,
2201            StreamChunk::from_pretty(
2202                " I I I
2203                + 1 4 4
2204                + 3 6 5"
2205            )
2206        );
2207
2208        Ok(())
2209    }
2210
2211    #[tokio::test]
2212    async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2213        let chunk_r1 = StreamChunk::from_pretty(
2214            "  I I
2215             + 1 4
2216             + 2 5
2217             + 3 6",
2218        );
2219        let chunk_r2 = StreamChunk::from_pretty(
2220            "  I I
2221             + 3 8
2222             - 3 8",
2223        );
2224        let chunk_l1 = StreamChunk::from_pretty(
2225            "  I I
2226             + 2 7
2227             + 4 8
2228             + 6 9",
2229        );
2230        let chunk_l2 = StreamChunk::from_pretty(
2231            "  I  I
2232             + 3 10
2233             + 6 11",
2234        );
2235        let chunk_r3 = StreamChunk::from_pretty(
2236            "  I I
2237             + 6 10",
2238        );
2239        let chunk_l3 = StreamChunk::from_pretty(
2240            "  I  I
2241             - 6 11",
2242        );
2243        let chunk_l4 = StreamChunk::from_pretty(
2244            "  I  I
2245             - 6 9",
2246        );
2247        let (mut tx_l, mut tx_r, mut hash_join) =
2248            create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2249
2250        // push the init barrier for left and right
2251        tx_l.push_barrier(test_epoch(1), false);
2252        tx_r.push_barrier(test_epoch(1), false);
2253        hash_join.next_unwrap_ready_barrier()?;
2254
2255        // push the 1st right chunk
2256        tx_r.push_chunk(chunk_r1);
2257        hash_join.next_unwrap_pending();
2258
2259        // push the init barrier for left and right
2260        tx_l.push_barrier(test_epoch(2), false);
2261        tx_r.push_barrier(test_epoch(2), false);
2262        hash_join.next_unwrap_ready_barrier()?;
2263
2264        // push the 2nd right chunk
2265        tx_r.push_chunk(chunk_r2);
2266        hash_join.next_unwrap_pending();
2267
2268        // push the 1st left chunk
2269        tx_l.push_chunk(chunk_l1);
2270        let chunk = hash_join.next_unwrap_ready_chunk()?;
2271        assert_eq!(
2272            chunk,
2273            StreamChunk::from_pretty(
2274                " I I
2275                + 2 5"
2276            )
2277        );
2278
2279        // push the 2nd left chunk
2280        tx_l.push_chunk(chunk_l2);
2281        let chunk = hash_join.next_unwrap_ready_chunk()?;
2282        assert_eq!(
2283            chunk,
2284            StreamChunk::from_pretty(
2285                " I I
2286                + 3 6"
2287            )
2288        );
2289
2290        // push the 3rd right chunk (tests forward_exactly_once)
2291        tx_r.push_chunk(chunk_r3);
2292        let chunk = hash_join.next_unwrap_ready_chunk()?;
2293        assert_eq!(
2294            chunk,
2295            StreamChunk::from_pretty(
2296                " I I
2297                + 6 10"
2298            )
2299        );
2300
2301        // push the 3rd left chunk
2302        // (tests that no change if there are still matches)
2303        tx_l.push_chunk(chunk_l3);
2304        hash_join.next_unwrap_pending();
2305
2306        // push the 3rd right chunk
2307        // (tests that deletion occurs when there are no more matches)
2308        tx_l.push_chunk(chunk_l4);
2309        let chunk = hash_join.next_unwrap_ready_chunk()?;
2310        assert_eq!(
2311            chunk,
2312            StreamChunk::from_pretty(
2313                " I I
2314                - 6 10"
2315            )
2316        );
2317
2318        Ok(())
2319    }
2320
2321    #[tokio::test]
2322    async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2323        let chunk_l1 = StreamChunk::from_pretty(
2324            "  I I
2325             + 1 4
2326             + 2 5
2327             + 3 6",
2328        );
2329        let chunk_l2 = StreamChunk::from_pretty(
2330            "  I I
2331             + 3 8
2332             - 3 8",
2333        );
2334        let chunk_r1 = StreamChunk::from_pretty(
2335            "  I I
2336             + 2 7
2337             + 4 8
2338             + 6 9",
2339        );
2340        let chunk_r2 = StreamChunk::from_pretty(
2341            "  I  I
2342             + 3 10
2343             + 6 11
2344             + 1 2
2345             + 1 3",
2346        );
2347        let chunk_l3 = StreamChunk::from_pretty(
2348            "  I I
2349             + 9 10",
2350        );
2351        let chunk_r3 = StreamChunk::from_pretty(
2352            "  I I
2353             - 1 2",
2354        );
2355        let chunk_r4 = StreamChunk::from_pretty(
2356            "  I I
2357             - 1 3",
2358        );
2359        let (mut tx_l, mut tx_r, mut hash_join) =
2360            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2361
2362        // push the init barrier for left and right
2363        tx_l.push_barrier(test_epoch(1), false);
2364        tx_r.push_barrier(test_epoch(1), false);
2365        hash_join.next_unwrap_ready_barrier()?;
2366
2367        // push the 1st left chunk
2368        tx_l.push_chunk(chunk_l1);
2369        let chunk = hash_join.next_unwrap_ready_chunk()?;
2370        assert_eq!(
2371            chunk,
2372            StreamChunk::from_pretty(
2373                " I I
2374                + 1 4
2375                + 2 5
2376                + 3 6",
2377            )
2378        );
2379
2380        // push the init barrier for left and right
2381        tx_l.push_barrier(test_epoch(2), false);
2382        tx_r.push_barrier(test_epoch(2), false);
2383        hash_join.next_unwrap_ready_barrier()?;
2384
2385        // push the 2nd left chunk
2386        tx_l.push_chunk(chunk_l2);
2387        let chunk = hash_join.next_unwrap_ready_chunk()?;
2388        assert_eq!(
2389            chunk,
2390            StreamChunk::from_pretty(
2391                "  I I
2392                 + 3 8 D
2393                 - 3 8 D",
2394            )
2395        );
2396
2397        // push the 1st right chunk
2398        tx_r.push_chunk(chunk_r1);
2399        let chunk = hash_join.next_unwrap_ready_chunk()?;
2400        assert_eq!(
2401            chunk,
2402            StreamChunk::from_pretty(
2403                " I I
2404                - 2 5"
2405            )
2406        );
2407
2408        // push the 2nd right chunk
2409        tx_r.push_chunk(chunk_r2);
2410        let chunk = hash_join.next_unwrap_ready_chunk()?;
2411        assert_eq!(
2412            chunk,
2413            StreamChunk::from_pretty(
2414                " I I
2415                - 3 6
2416                - 1 4"
2417            )
2418        );
2419
2420        // push the 3rd left chunk (tests forward_exactly_once)
2421        tx_l.push_chunk(chunk_l3);
2422        let chunk = hash_join.next_unwrap_ready_chunk()?;
2423        assert_eq!(
2424            chunk,
2425            StreamChunk::from_pretty(
2426                " I I
2427                + 9 10"
2428            )
2429        );
2430
2431        // push the 3rd right chunk
2432        // (tests that no change if there are still matches)
2433        tx_r.push_chunk(chunk_r3);
2434        hash_join.next_unwrap_pending();
2435
2436        // push the 4th right chunk
2437        // (tests that insertion occurs when there are no more matches)
2438        tx_r.push_chunk(chunk_r4);
2439        let chunk = hash_join.next_unwrap_ready_chunk()?;
2440        assert_eq!(
2441            chunk,
2442            StreamChunk::from_pretty(
2443                " I I
2444                + 1 4"
2445            )
2446        );
2447
2448        Ok(())
2449    }
2450
2451    #[tokio::test]
2452    async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2453        let chunk_r1 = StreamChunk::from_pretty(
2454            "  I I
2455             + 1 4
2456             + 2 5
2457             + 3 6",
2458        );
2459        let chunk_r2 = StreamChunk::from_pretty(
2460            "  I I
2461             + 3 8
2462             - 3 8",
2463        );
2464        let chunk_l1 = StreamChunk::from_pretty(
2465            "  I I
2466             + 2 7
2467             + 4 8
2468             + 6 9",
2469        );
2470        let chunk_l2 = StreamChunk::from_pretty(
2471            "  I  I
2472             + 3 10
2473             + 6 11
2474             + 1 2
2475             + 1 3",
2476        );
2477        let chunk_r3 = StreamChunk::from_pretty(
2478            "  I I
2479             + 9 10",
2480        );
2481        let chunk_l3 = StreamChunk::from_pretty(
2482            "  I I
2483             - 1 2",
2484        );
2485        let chunk_l4 = StreamChunk::from_pretty(
2486            "  I I
2487             - 1 3",
2488        );
2489        let (mut tx_r, mut tx_l, mut hash_join) =
2490            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2491
2492        // push the init barrier for left and right
2493        tx_r.push_barrier(test_epoch(1), false);
2494        tx_l.push_barrier(test_epoch(1), false);
2495        hash_join.next_unwrap_ready_barrier()?;
2496
2497        // push the 1st right chunk
2498        tx_r.push_chunk(chunk_r1);
2499        let chunk = hash_join.next_unwrap_ready_chunk()?;
2500        assert_eq!(
2501            chunk,
2502            StreamChunk::from_pretty(
2503                " I I
2504                + 1 4
2505                + 2 5
2506                + 3 6",
2507            )
2508        );
2509
2510        // push the init barrier for left and right
2511        tx_r.push_barrier(test_epoch(2), false);
2512        tx_l.push_barrier(test_epoch(2), false);
2513        hash_join.next_unwrap_ready_barrier()?;
2514
2515        // push the 2nd right chunk
2516        tx_r.push_chunk(chunk_r2);
2517        let chunk = hash_join.next_unwrap_ready_chunk()?;
2518        assert_eq!(
2519            chunk,
2520            StreamChunk::from_pretty(
2521                "  I I
2522                 + 3 8 D
2523                 - 3 8 D",
2524            )
2525        );
2526
2527        // push the 1st left chunk
2528        tx_l.push_chunk(chunk_l1);
2529        let chunk = hash_join.next_unwrap_ready_chunk()?;
2530        assert_eq!(
2531            chunk,
2532            StreamChunk::from_pretty(
2533                " I I
2534                - 2 5"
2535            )
2536        );
2537
2538        // push the 2nd left chunk
2539        tx_l.push_chunk(chunk_l2);
2540        let chunk = hash_join.next_unwrap_ready_chunk()?;
2541        assert_eq!(
2542            chunk,
2543            StreamChunk::from_pretty(
2544                " I I
2545                - 3 6
2546                - 1 4"
2547            )
2548        );
2549
2550        // push the 3rd right chunk (tests forward_exactly_once)
2551        tx_r.push_chunk(chunk_r3);
2552        let chunk = hash_join.next_unwrap_ready_chunk()?;
2553        assert_eq!(
2554            chunk,
2555            StreamChunk::from_pretty(
2556                " I I
2557                + 9 10"
2558            )
2559        );
2560
2561        // push the 3rd left chunk
2562        // (tests that no change if there are still matches)
2563        tx_l.push_chunk(chunk_l3);
2564        hash_join.next_unwrap_pending();
2565
2566        // push the 4th left chunk
2567        // (tests that insertion occurs when there are no more matches)
2568        tx_l.push_chunk(chunk_l4);
2569        let chunk = hash_join.next_unwrap_ready_chunk()?;
2570        assert_eq!(
2571            chunk,
2572            StreamChunk::from_pretty(
2573                " I I
2574                + 1 4"
2575            )
2576        );
2577
2578        Ok(())
2579    }
2580
2581    #[tokio::test]
2582    async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2583        let chunk_l1 = StreamChunk::from_pretty(
2584            "  I I
2585             + 1 4
2586             + 2 5
2587             + 3 6",
2588        );
2589        let chunk_l2 = StreamChunk::from_pretty(
2590            "  I I
2591             + 6 8
2592             + 3 8",
2593        );
2594        let chunk_r1 = StreamChunk::from_pretty(
2595            "  I I
2596             + 2 7
2597             + 4 8
2598             + 6 9",
2599        );
2600        let chunk_r2 = StreamChunk::from_pretty(
2601            "  I  I
2602             + 3 10
2603             + 6 11",
2604        );
2605        let (mut tx_l, mut tx_r, mut hash_join) =
2606            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2607
2608        // push the init barrier for left and right
2609        tx_l.push_barrier(test_epoch(1), false);
2610        tx_r.push_barrier(test_epoch(1), false);
2611        hash_join.next_unwrap_ready_barrier()?;
2612
2613        // push the 1st left chunk
2614        tx_l.push_chunk(chunk_l1);
2615        hash_join.next_unwrap_pending();
2616
2617        // push a barrier to left side
2618        tx_l.push_barrier(test_epoch(2), false);
2619
2620        // push the 2nd left chunk
2621        tx_l.push_chunk(chunk_l2);
2622
2623        // join the first right chunk
2624        tx_r.push_chunk(chunk_r1);
2625
2626        // Consume stream chunk
2627        let chunk = hash_join.next_unwrap_ready_chunk()?;
2628        assert_eq!(
2629            chunk,
2630            StreamChunk::from_pretty(
2631                " I I I I
2632                + 2 5 2 7"
2633            )
2634        );
2635
2636        // push a barrier to right side
2637        tx_r.push_barrier(test_epoch(2), false);
2638
2639        // get the aligned barrier here
2640        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2641        assert!(matches!(
2642            hash_join.next_unwrap_ready_barrier()?,
2643            Barrier {
2644                epoch,
2645                mutation: None,
2646                ..
2647            } if epoch == expected_epoch
2648        ));
2649
2650        // join the 2nd left chunk
2651        let chunk = hash_join.next_unwrap_ready_chunk()?;
2652        assert_eq!(
2653            chunk,
2654            StreamChunk::from_pretty(
2655                " I I I I
2656                + 6 8 6 9"
2657            )
2658        );
2659
2660        // push the 2nd right chunk
2661        tx_r.push_chunk(chunk_r2);
2662        let chunk = hash_join.next_unwrap_ready_chunk()?;
2663        assert_eq!(
2664            chunk,
2665            StreamChunk::from_pretty(
2666                " I I I I
2667                + 3 6 3 10
2668                + 3 8 3 10
2669                + 6 8 6 11"
2670            )
2671        );
2672
2673        Ok(())
2674    }
2675
2676    #[tokio::test]
2677    async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2678        let chunk_l1 = StreamChunk::from_pretty(
2679            "  I I
2680             + 1 4
2681             + 2 .
2682             + 3 .",
2683        );
2684        let chunk_l2 = StreamChunk::from_pretty(
2685            "  I I
2686             + 6 .
2687             + 3 8",
2688        );
2689        let chunk_r1 = StreamChunk::from_pretty(
2690            "  I I
2691             + 2 7
2692             + 4 8
2693             + 6 9",
2694        );
2695        let chunk_r2 = StreamChunk::from_pretty(
2696            "  I  I
2697             + 3 10
2698             + 6 11",
2699        );
2700        let (mut tx_l, mut tx_r, mut hash_join) =
2701            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2702
2703        // push the init barrier for left and right
2704        tx_l.push_barrier(test_epoch(1), false);
2705        tx_r.push_barrier(test_epoch(1), false);
2706        hash_join.next_unwrap_ready_barrier()?;
2707
2708        // push the 1st left chunk
2709        tx_l.push_chunk(chunk_l1);
2710        hash_join.next_unwrap_pending();
2711
2712        // push a barrier to left side
2713        tx_l.push_barrier(test_epoch(2), false);
2714
2715        // push the 2nd left chunk
2716        tx_l.push_chunk(chunk_l2);
2717
2718        // join the first right chunk
2719        tx_r.push_chunk(chunk_r1);
2720
2721        // Consume stream chunk
2722        let chunk = hash_join.next_unwrap_ready_chunk()?;
2723        assert_eq!(
2724            chunk,
2725            StreamChunk::from_pretty(
2726                " I I I I
2727                + 2 . 2 7"
2728            )
2729        );
2730
2731        // push a barrier to right side
2732        tx_r.push_barrier(test_epoch(2), false);
2733
2734        // get the aligned barrier here
2735        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2736        assert!(matches!(
2737            hash_join.next_unwrap_ready_barrier()?,
2738            Barrier {
2739                epoch,
2740                mutation: None,
2741                ..
2742            } if epoch == expected_epoch
2743        ));
2744
2745        // join the 2nd left chunk
2746        let chunk = hash_join.next_unwrap_ready_chunk()?;
2747        assert_eq!(
2748            chunk,
2749            StreamChunk::from_pretty(
2750                " I I I I
2751                + 6 . 6 9"
2752            )
2753        );
2754
2755        // push the 2nd right chunk
2756        tx_r.push_chunk(chunk_r2);
2757        let chunk = hash_join.next_unwrap_ready_chunk()?;
2758        assert_eq!(
2759            chunk,
2760            StreamChunk::from_pretty(
2761                " I I I I
2762                + 3 8 3 10
2763                + 3 . 3 10
2764                + 6 . 6 11"
2765            )
2766        );
2767
2768        Ok(())
2769    }
2770
2771    #[tokio::test]
2772    async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2773        let chunk_l1 = StreamChunk::from_pretty(
2774            "  I I
2775             + 1 4
2776             + 2 5
2777             + 3 6",
2778        );
2779        let chunk_l2 = StreamChunk::from_pretty(
2780            "  I I
2781             + 3 8
2782             - 3 8",
2783        );
2784        let chunk_r1 = StreamChunk::from_pretty(
2785            "  I I
2786             + 2 7
2787             + 4 8
2788             + 6 9",
2789        );
2790        let chunk_r2 = StreamChunk::from_pretty(
2791            "  I  I
2792             + 3 10
2793             + 6 11",
2794        );
2795        let (mut tx_l, mut tx_r, mut hash_join) =
2796            create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2797
2798        // push the init barrier for left and right
2799        tx_l.push_barrier(test_epoch(1), false);
2800        tx_r.push_barrier(test_epoch(1), false);
2801        hash_join.next_unwrap_ready_barrier()?;
2802
2803        // push the 1st left chunk
2804        tx_l.push_chunk(chunk_l1);
2805        let chunk = hash_join.next_unwrap_ready_chunk()?;
2806        assert_eq!(
2807            chunk,
2808            StreamChunk::from_pretty(
2809                " I I I I
2810                + 1 4 . .
2811                + 2 5 . .
2812                + 3 6 . ."
2813            )
2814        );
2815
2816        // push the 2nd left chunk
2817        tx_l.push_chunk(chunk_l2);
2818        let chunk = hash_join.next_unwrap_ready_chunk()?;
2819        assert_eq!(
2820            chunk,
2821            StreamChunk::from_pretty(
2822                " I I I I
2823                + 3 8 . . D
2824                - 3 8 . . D"
2825            )
2826        );
2827
2828        // push the 1st right chunk
2829        tx_r.push_chunk(chunk_r1);
2830        let chunk = hash_join.next_unwrap_ready_chunk()?;
2831        assert_eq!(
2832            chunk,
2833            StreamChunk::from_pretty(
2834                " I I I I
2835                - 2 5 . .
2836                + 2 5 2 7"
2837            )
2838        );
2839
2840        // push the 2nd right chunk
2841        tx_r.push_chunk(chunk_r2);
2842        let chunk = hash_join.next_unwrap_ready_chunk()?;
2843        assert_eq!(
2844            chunk,
2845            StreamChunk::from_pretty(
2846                " I I I I
2847                - 3 6 . .
2848                + 3 6 3 10"
2849            )
2850        );
2851
2852        Ok(())
2853    }
2854
2855    #[tokio::test]
2856    async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2857        let chunk_l1 = StreamChunk::from_pretty(
2858            "  I I
2859             + 1 4
2860             + 2 5
2861             + . 6",
2862        );
2863        let chunk_l2 = StreamChunk::from_pretty(
2864            "  I I
2865             + . 8
2866             - . 8",
2867        );
2868        let chunk_r1 = StreamChunk::from_pretty(
2869            "  I I
2870             + 2 7
2871             + 4 8
2872             + 6 9",
2873        );
2874        let chunk_r2 = StreamChunk::from_pretty(
2875            "  I  I
2876             + . 10
2877             + 6 11",
2878        );
2879        let (mut tx_l, mut tx_r, mut hash_join) =
2880            create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2881
2882        // push the init barrier for left and right
2883        tx_l.push_barrier(test_epoch(1), false);
2884        tx_r.push_barrier(test_epoch(1), false);
2885        hash_join.next_unwrap_ready_barrier()?;
2886
2887        // push the 1st left chunk
2888        tx_l.push_chunk(chunk_l1);
2889        let chunk = hash_join.next_unwrap_ready_chunk()?;
2890        assert_eq!(
2891            chunk,
2892            StreamChunk::from_pretty(
2893                " I I I I
2894                + 1 4 . .
2895                + 2 5 . .
2896                + . 6 . ."
2897            )
2898        );
2899
2900        // push the 2nd left chunk
2901        tx_l.push_chunk(chunk_l2);
2902        let chunk = hash_join.next_unwrap_ready_chunk()?;
2903        assert_eq!(
2904            chunk,
2905            StreamChunk::from_pretty(
2906                " I I I I
2907                + . 8 . . D
2908                - . 8 . . D"
2909            )
2910        );
2911
2912        // push the 1st right chunk
2913        tx_r.push_chunk(chunk_r1);
2914        let chunk = hash_join.next_unwrap_ready_chunk()?;
2915        assert_eq!(
2916            chunk,
2917            StreamChunk::from_pretty(
2918                " I I I I
2919                - 2 5 . .
2920                + 2 5 2 7"
2921            )
2922        );
2923
2924        // push the 2nd right chunk
2925        tx_r.push_chunk(chunk_r2);
2926        let chunk = hash_join.next_unwrap_ready_chunk()?;
2927        assert_eq!(
2928            chunk,
2929            StreamChunk::from_pretty(
2930                " I I I I
2931                - . 6 . .
2932                + . 6 . 10"
2933            )
2934        );
2935
2936        Ok(())
2937    }
2938
2939    #[tokio::test]
2940    async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2941        let chunk_l1 = StreamChunk::from_pretty(
2942            "  I I
2943             + 1 4
2944             + 2 5
2945             + 3 6",
2946        );
2947        let chunk_l2 = StreamChunk::from_pretty(
2948            "  I I
2949             + 3 8
2950             - 3 8",
2951        );
2952        let chunk_r1 = StreamChunk::from_pretty(
2953            "  I I
2954             + 2 7
2955             + 4 8
2956             + 6 9",
2957        );
2958        let chunk_r2 = StreamChunk::from_pretty(
2959            "  I  I
2960             + 5 10
2961             - 5 10",
2962        );
2963        let (mut tx_l, mut tx_r, mut hash_join) =
2964            create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
2965
2966        // push the init barrier for left and right
2967        tx_l.push_barrier(test_epoch(1), false);
2968        tx_r.push_barrier(test_epoch(1), false);
2969        hash_join.next_unwrap_ready_barrier()?;
2970
2971        // push the 1st left chunk
2972        tx_l.push_chunk(chunk_l1);
2973        hash_join.next_unwrap_pending();
2974
2975        // push the 2nd left chunk
2976        tx_l.push_chunk(chunk_l2);
2977        hash_join.next_unwrap_pending();
2978
2979        // push the 1st right chunk
2980        tx_r.push_chunk(chunk_r1);
2981        let chunk = hash_join.next_unwrap_ready_chunk()?;
2982        assert_eq!(
2983            chunk,
2984            StreamChunk::from_pretty(
2985                " I I I I
2986                + 2 5 2 7
2987                + . . 4 8
2988                + . . 6 9"
2989            )
2990        );
2991
2992        // push the 2nd right chunk
2993        tx_r.push_chunk(chunk_r2);
2994        let chunk = hash_join.next_unwrap_ready_chunk()?;
2995        assert_eq!(
2996            chunk,
2997            StreamChunk::from_pretty(
2998                " I I I I
2999                + . . 5 10 D
3000                - . . 5 10 D"
3001            )
3002        );
3003
3004        Ok(())
3005    }
3006
3007    #[tokio::test]
3008    async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3009        let chunk_l1 = StreamChunk::from_pretty(
3010            "  I I I
3011             + 1 4 1
3012             + 2 5 2
3013             + 3 6 3",
3014        );
3015        let chunk_l2 = StreamChunk::from_pretty(
3016            "  I I I
3017             + 4 9 4
3018             + 5 10 5",
3019        );
3020        let chunk_r1 = StreamChunk::from_pretty(
3021            "  I I I
3022             + 2 5 1
3023             + 4 9 2
3024             + 6 9 3",
3025        );
3026        let chunk_r2 = StreamChunk::from_pretty(
3027            "  I I I
3028             + 1 4 4
3029             + 3 6 5",
3030        );
3031
3032        let (mut tx_l, mut tx_r, mut hash_join) =
3033            create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3034
3035        // push the init barrier for left and right
3036        tx_l.push_barrier(test_epoch(1), false);
3037        tx_r.push_barrier(test_epoch(1), false);
3038        hash_join.next_unwrap_ready_barrier()?;
3039
3040        // push the 1st left chunk
3041        tx_l.push_chunk(chunk_l1);
3042        let chunk = hash_join.next_unwrap_ready_chunk()?;
3043        assert_eq!(
3044            chunk,
3045            StreamChunk::from_pretty(
3046                " I I I I I I
3047                + 1 4 1 . . .
3048                + 2 5 2 . . .
3049                + 3 6 3 . . ."
3050            )
3051        );
3052
3053        // push the 2nd left chunk
3054        tx_l.push_chunk(chunk_l2);
3055        let chunk = hash_join.next_unwrap_ready_chunk()?;
3056        assert_eq!(
3057            chunk,
3058            StreamChunk::from_pretty(
3059                " I I I I I I
3060                + 4 9 4 . . .
3061                + 5 10 5 . . ."
3062            )
3063        );
3064
3065        // push the 1st right chunk
3066        tx_r.push_chunk(chunk_r1);
3067        let chunk = hash_join.next_unwrap_ready_chunk()?;
3068        assert_eq!(
3069            chunk,
3070            StreamChunk::from_pretty(
3071                " I I I I I I
3072                - 2 5 2 . . .
3073                + 2 5 2 2 5 1
3074                - 4 9 4 . . .
3075                + 4 9 4 4 9 2"
3076            )
3077        );
3078
3079        // push the 2nd right chunk
3080        tx_r.push_chunk(chunk_r2);
3081        let chunk = hash_join.next_unwrap_ready_chunk()?;
3082        assert_eq!(
3083            chunk,
3084            StreamChunk::from_pretty(
3085                " I I I I I I
3086                - 1 4 1 . . .
3087                + 1 4 1 1 4 4
3088                - 3 6 3 . . .
3089                + 3 6 3 3 6 5"
3090            )
3091        );
3092
3093        Ok(())
3094    }
3095
3096    #[tokio::test]
3097    async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3098        let chunk_l1 = StreamChunk::from_pretty(
3099            "  I I I
3100             + 1 4 1
3101             + 2 5 2
3102             + 3 6 3",
3103        );
3104        let chunk_l2 = StreamChunk::from_pretty(
3105            "  I I I
3106             + 4 9 4
3107             + 5 10 5",
3108        );
3109        let chunk_r1 = StreamChunk::from_pretty(
3110            "  I I I
3111             + 2 5 1
3112             + 4 9 2
3113             + 6 9 3",
3114        );
3115        let chunk_r2 = StreamChunk::from_pretty(
3116            "  I I I
3117             + 1 4 4
3118             + 3 6 5
3119             + 7 7 6",
3120        );
3121
3122        let (mut tx_l, mut tx_r, mut hash_join) =
3123            create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3124
3125        // push the init barrier for left and right
3126        tx_l.push_barrier(test_epoch(1), false);
3127        tx_r.push_barrier(test_epoch(1), false);
3128        hash_join.next_unwrap_ready_barrier()?;
3129
3130        // push the 1st left chunk
3131        tx_l.push_chunk(chunk_l1);
3132        hash_join.next_unwrap_pending();
3133
3134        // push the 2nd left chunk
3135        tx_l.push_chunk(chunk_l2);
3136        hash_join.next_unwrap_pending();
3137
3138        // push the 1st right chunk
3139        tx_r.push_chunk(chunk_r1);
3140        let chunk = hash_join.next_unwrap_ready_chunk()?;
3141        assert_eq!(
3142            chunk,
3143            StreamChunk::from_pretty(
3144                "  I I I I I I
3145                + 2 5 2 2 5 1
3146                + 4 9 4 4 9 2
3147                + . . . 6 9 3"
3148            )
3149        );
3150
3151        // push the 2nd right chunk
3152        tx_r.push_chunk(chunk_r2);
3153        let chunk = hash_join.next_unwrap_ready_chunk()?;
3154        assert_eq!(
3155            chunk,
3156            StreamChunk::from_pretty(
3157                "  I I I I I I
3158                + 1 4 1 1 4 4
3159                + 3 6 3 3 6 5
3160                + . . . 7 7 6"
3161            )
3162        );
3163
3164        Ok(())
3165    }
3166
3167    #[tokio::test]
3168    async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3169        let chunk_l1 = StreamChunk::from_pretty(
3170            "  I I
3171             + 1 4
3172             + 2 5
3173             + 3 6",
3174        );
3175        let chunk_l2 = StreamChunk::from_pretty(
3176            "  I I
3177             + 3 8
3178             - 3 8",
3179        );
3180        let chunk_r1 = StreamChunk::from_pretty(
3181            "  I I
3182             + 2 7
3183             + 4 8
3184             + 6 9",
3185        );
3186        let chunk_r2 = StreamChunk::from_pretty(
3187            "  I  I
3188             + 5 10
3189             - 5 10",
3190        );
3191        let (mut tx_l, mut tx_r, mut hash_join) =
3192            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3193
3194        // push the init barrier for left and right
3195        tx_l.push_barrier(test_epoch(1), false);
3196        tx_r.push_barrier(test_epoch(1), false);
3197        hash_join.next_unwrap_ready_barrier()?;
3198
3199        // push the 1st left chunk
3200        tx_l.push_chunk(chunk_l1);
3201        let chunk = hash_join.next_unwrap_ready_chunk()?;
3202        assert_eq!(
3203            chunk,
3204            StreamChunk::from_pretty(
3205                " I I I I
3206                + 1 4 . .
3207                + 2 5 . .
3208                + 3 6 . ."
3209            )
3210        );
3211
3212        // push the 2nd left chunk
3213        tx_l.push_chunk(chunk_l2);
3214        let chunk = hash_join.next_unwrap_ready_chunk()?;
3215        assert_eq!(
3216            chunk,
3217            StreamChunk::from_pretty(
3218                " I I I I
3219                + 3 8 . . D
3220                - 3 8 . . D"
3221            )
3222        );
3223
3224        // push the 1st right chunk
3225        tx_r.push_chunk(chunk_r1);
3226        let chunk = hash_join.next_unwrap_ready_chunk()?;
3227        assert_eq!(
3228            chunk,
3229            StreamChunk::from_pretty(
3230                " I I I I
3231                - 2 5 . .
3232                + 2 5 2 7
3233                + . . 4 8
3234                + . . 6 9"
3235            )
3236        );
3237
3238        // push the 2nd right chunk
3239        tx_r.push_chunk(chunk_r2);
3240        let chunk = hash_join.next_unwrap_ready_chunk()?;
3241        assert_eq!(
3242            chunk,
3243            StreamChunk::from_pretty(
3244                " I I I I
3245                + . . 5 10 D
3246                - . . 5 10 D"
3247            )
3248        );
3249
3250        Ok(())
3251    }
3252
3253    #[tokio::test]
3254    async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3255        let (mut tx_l, mut tx_r, mut hash_join) =
3256            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3257
3258        // push the init barrier for left and right
3259        tx_l.push_barrier(test_epoch(1), false);
3260        tx_r.push_barrier(test_epoch(1), false);
3261        hash_join.next_unwrap_ready_barrier()?;
3262
3263        tx_l.push_chunk(StreamChunk::from_pretty(
3264            "  I I
3265             + 1 1
3266            ",
3267        ));
3268        let chunk = hash_join.next_unwrap_ready_chunk()?;
3269        assert_eq!(
3270            chunk,
3271            StreamChunk::from_pretty(
3272                " I I I I
3273                + 1 1 . ."
3274            )
3275        );
3276
3277        tx_r.push_chunk(StreamChunk::from_pretty(
3278            "  I I
3279             + 1 1
3280            ",
3281        ));
3282        let chunk = hash_join.next_unwrap_ready_chunk()?;
3283
3284        assert_eq!(
3285            chunk,
3286            StreamChunk::from_pretty(
3287                " I I I I
3288                - 1 1 . .
3289                + 1 1 1 1"
3290            )
3291        );
3292
3293        tx_l.push_chunk(StreamChunk::from_pretty(
3294            "   I I
3295              - 1 1
3296              + 1 2
3297            ",
3298        ));
3299        let chunk = hash_join.next_unwrap_ready_chunk()?;
3300        let chunk = chunk.compact_vis();
3301        assert_eq!(
3302            chunk,
3303            StreamChunk::from_pretty(
3304                " I I I I
3305                - 1 1 1 1
3306                + 1 2 1 1
3307                "
3308            )
3309        );
3310
3311        Ok(())
3312    }
3313
3314    #[tokio::test]
3315    async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3316    {
3317        let chunk_l1 = StreamChunk::from_pretty(
3318            "  I I
3319             + 1 4
3320             + 2 5
3321             + 3 6
3322             + 3 7",
3323        );
3324        let chunk_l2 = StreamChunk::from_pretty(
3325            "  I I
3326             + 3 8
3327             - 3 8
3328             - 1 4", // delete row to cause an empty JoinHashEntry
3329        );
3330        let chunk_r1 = StreamChunk::from_pretty(
3331            "  I I
3332             + 2 6
3333             + 4 8
3334             + 3 4",
3335        );
3336        let chunk_r2 = StreamChunk::from_pretty(
3337            "  I  I
3338             + 5 10
3339             - 5 10
3340             + 1 2",
3341        );
3342        let (mut tx_l, mut tx_r, mut hash_join) =
3343            create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3344
3345        // push the init barrier for left and right
3346        tx_l.push_barrier(test_epoch(1), false);
3347        tx_r.push_barrier(test_epoch(1), false);
3348        hash_join.next_unwrap_ready_barrier()?;
3349
3350        // push the 1st left chunk
3351        tx_l.push_chunk(chunk_l1);
3352        let chunk = hash_join.next_unwrap_ready_chunk()?;
3353        assert_eq!(
3354            chunk,
3355            StreamChunk::from_pretty(
3356                " I I I I
3357                + 1 4 . .
3358                + 2 5 . .
3359                + 3 6 . .
3360                + 3 7 . ."
3361            )
3362        );
3363
3364        // push the 2nd left chunk
3365        tx_l.push_chunk(chunk_l2);
3366        let chunk = hash_join.next_unwrap_ready_chunk()?;
3367        assert_eq!(
3368            chunk,
3369            StreamChunk::from_pretty(
3370                " I I I I
3371                + 3 8 . . D
3372                - 3 8 . . D
3373                - 1 4 . ."
3374            )
3375        );
3376
3377        // push the 1st right chunk
3378        tx_r.push_chunk(chunk_r1);
3379        let chunk = hash_join.next_unwrap_ready_chunk()?;
3380        assert_eq!(
3381            chunk,
3382            StreamChunk::from_pretty(
3383                " I I I I
3384                - 2 5 . .
3385                + 2 5 2 6
3386                + . . 4 8
3387                + . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once
3388                            * despite matching on eq join on 2
3389                            * entries */
3390            )
3391        );
3392
3393        // push the 2nd right chunk
3394        tx_r.push_chunk(chunk_r2);
3395        let chunk = hash_join.next_unwrap_ready_chunk()?;
3396        assert_eq!(
3397            chunk,
3398            StreamChunk::from_pretty(
3399                " I I I I
3400                + . . 5 10 D
3401                - . . 5 10 D
3402                + . . 1 2" /* regression test (#2420): 1 2 forwarded even if matches on an empty
3403                            * join entry */
3404            )
3405        );
3406
3407        Ok(())
3408    }
3409
3410    #[tokio::test]
3411    async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3412        let chunk_l1 = StreamChunk::from_pretty(
3413            "  I I
3414             + 1 4
3415             + 2 10
3416             + 3 6",
3417        );
3418        let chunk_l2 = StreamChunk::from_pretty(
3419            "  I I
3420             + 3 8
3421             - 3 8",
3422        );
3423        let chunk_r1 = StreamChunk::from_pretty(
3424            "  I I
3425             + 2 7
3426             + 4 8
3427             + 6 9",
3428        );
3429        let chunk_r2 = StreamChunk::from_pretty(
3430            "  I  I
3431             + 3 10
3432             + 6 11",
3433        );
3434        let (mut tx_l, mut tx_r, mut hash_join) =
3435            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3436
3437        // push the init barrier for left and right
3438        tx_l.push_barrier(test_epoch(1), false);
3439        tx_r.push_barrier(test_epoch(1), false);
3440        hash_join.next_unwrap_ready_barrier()?;
3441
3442        // push the 1st left chunk
3443        tx_l.push_chunk(chunk_l1);
3444        hash_join.next_unwrap_pending();
3445
3446        // push the 2nd left chunk
3447        tx_l.push_chunk(chunk_l2);
3448        hash_join.next_unwrap_pending();
3449
3450        // push the 1st right chunk
3451        tx_r.push_chunk(chunk_r1);
3452        hash_join.next_unwrap_pending();
3453
3454        // push the 2nd right chunk
3455        tx_r.push_chunk(chunk_r2);
3456        let chunk = hash_join.next_unwrap_ready_chunk()?;
3457        assert_eq!(
3458            chunk,
3459            StreamChunk::from_pretty(
3460                " I I I I
3461                + 3 6 3 10"
3462            )
3463        );
3464
3465        Ok(())
3466    }
3467
3468    #[tokio::test]
3469    async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3470        let (mut tx_l, mut tx_r, mut hash_join) =
3471            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3472
3473        // push the init barrier for left and right
3474        tx_l.push_barrier(test_epoch(1), false);
3475        tx_r.push_barrier(test_epoch(1), false);
3476        hash_join.next_unwrap_ready_barrier()?;
3477
3478        tx_l.push_int64_watermark(0, 100);
3479
3480        tx_l.push_int64_watermark(0, 200);
3481
3482        tx_l.push_barrier(test_epoch(2), false);
3483        tx_r.push_barrier(test_epoch(2), false);
3484        hash_join.next_unwrap_ready_barrier()?;
3485
3486        tx_r.push_int64_watermark(0, 50);
3487
3488        let w1 = hash_join.next().await.unwrap().unwrap();
3489        let w1 = w1.as_watermark().unwrap();
3490
3491        let w2 = hash_join.next().await.unwrap().unwrap();
3492        let w2 = w2.as_watermark().unwrap();
3493
3494        tx_r.push_int64_watermark(0, 100);
3495
3496        let w3 = hash_join.next().await.unwrap().unwrap();
3497        let w3 = w3.as_watermark().unwrap();
3498
3499        let w4 = hash_join.next().await.unwrap().unwrap();
3500        let w4 = w4.as_watermark().unwrap();
3501
3502        assert_eq!(
3503            w1,
3504            &Watermark {
3505                col_idx: 2,
3506                data_type: DataType::Int64,
3507                val: ScalarImpl::Int64(50)
3508            }
3509        );
3510
3511        assert_eq!(
3512            w2,
3513            &Watermark {
3514                col_idx: 0,
3515                data_type: DataType::Int64,
3516                val: ScalarImpl::Int64(50)
3517            }
3518        );
3519
3520        assert_eq!(
3521            w3,
3522            &Watermark {
3523                col_idx: 2,
3524                data_type: DataType::Int64,
3525                val: ScalarImpl::Int64(100)
3526            }
3527        );
3528
3529        assert_eq!(
3530            w4,
3531            &Watermark {
3532                col_idx: 0,
3533                data_type: DataType::Int64,
3534                val: ScalarImpl::Int64(100)
3535            }
3536        );
3537
3538        Ok(())
3539    }
3540}