risingwave_stream/executor/
hash_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::assert_matches::assert_matches;
15use std::collections::{BTreeMap, HashSet};
16use std::marker::PhantomData;
17use std::time::Duration;
18
19use anyhow::Context;
20use either::Either;
21use itertools::Itertools;
22use multimap::MultiMap;
23use risingwave_common::array::Op;
24use risingwave_common::hash::{HashKey, NullBitmap};
25use risingwave_common::row::RowExt;
26use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
27use risingwave_common::util::epoch::EpochPair;
28use risingwave_common::util::iter_util::ZipEqDebug;
29use risingwave_expr::ExprError;
30use risingwave_expr::expr::NonStrictExpression;
31use tokio::time::Instant;
32
33use self::builder::JoinChunkBuilder;
34use super::barrier_align::*;
35use super::join::hash_join::*;
36use super::join::row::{JoinEncoding, JoinRow};
37use super::join::*;
38use super::watermark::*;
39use crate::executor::CachedJoinRow;
40use crate::executor::join::builder::JoinStreamChunkBuilder;
41use crate::executor::join::hash_join::CacheResult;
42use crate::executor::prelude::*;
43
44/// Evict the cache every n rows.
45const EVICT_EVERY_N_ROWS: u32 = 16;
46
47fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
48    HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
49}
50
51pub struct JoinParams {
52    /// Indices of the join keys
53    pub join_key_indices: Vec<usize>,
54    /// Indices of the input pk after dedup
55    pub deduped_pk_indices: Vec<usize>,
56}
57
58impl JoinParams {
59    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
60        Self {
61            join_key_indices,
62            deduped_pk_indices,
63        }
64    }
65}
66
67struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
68    /// Store all data from a one side stream
69    ht: JoinHashMap<K, S, E>,
70    /// Indices of the join key columns
71    join_key_indices: Vec<usize>,
72    /// The data type of all columns without degree.
73    all_data_types: Vec<DataType>,
74    /// The start position for the side in output new columns
75    start_pos: usize,
76    /// The mapping from input indices of a side to output columns.
77    i2o_mapping: Vec<(usize, usize)>,
78    i2o_mapping_indexed: MultiMap<usize, usize>,
79    /// The first field of the ith element indicates that when a watermark at the ith column of
80    /// this side comes, what band join conditions should be updated in order to possibly
81    /// generate a new watermark at that column or the corresponding column in the counterpart
82    /// join side.
83    ///
84    /// The second field indicates that whether the column is required less than the
85    /// the corresponding column in the counterpart join side in the band join condition.
86    input2inequality_index: Vec<Vec<(usize, bool)>>,
87    /// Some fields which are required non null to match due to inequalities.
88    non_null_fields: Vec<usize>,
89    /// (i, j) in this `Vec` means that state data in this join side can be cleaned if the value of
90    /// its ith column is less than the synthetic watermark of the jth band join condition.
91    state_clean_columns: Vec<(usize, usize)>,
92    /// Whether degree table is needed for this side.
93    need_degree_table: bool,
94    _marker: std::marker::PhantomData<E>,
95}
96
97impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        f.debug_struct("JoinSide")
100            .field("join_key_indices", &self.join_key_indices)
101            .field("col_types", &self.all_data_types)
102            .field("start_pos", &self.start_pos)
103            .field("i2o_mapping", &self.i2o_mapping)
104            .field("need_degree_table", &self.need_degree_table)
105            .finish()
106    }
107}
108
109impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
110    // WARNING: Please do not call this until we implement it.
111    fn is_dirty(&self) -> bool {
112        unimplemented!()
113    }
114
115    #[expect(dead_code)]
116    fn clear_cache(&mut self) {
117        assert!(
118            !self.is_dirty(),
119            "cannot clear cache while states of hash join are dirty"
120        );
121
122        // TODO: not working with rearranged chain
123        // self.ht.clear();
124    }
125
126    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
127        self.ht.init(epoch).await
128    }
129}
130
131/// `HashJoinExecutor` takes two input streams and runs equal hash join on them.
132/// The output columns are the concatenation of left and right columns.
133pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
134{
135    ctx: ActorContextRef,
136    info: ExecutorInfo,
137
138    /// Left input executor
139    input_l: Option<Executor>,
140    /// Right input executor
141    input_r: Option<Executor>,
142    /// The data types of the formed new columns
143    actual_output_data_types: Vec<DataType>,
144    /// The parameters of the left join executor
145    side_l: JoinSide<K, S, E>,
146    /// The parameters of the right join executor
147    side_r: JoinSide<K, S, E>,
148    /// Optional non-equi join conditions
149    cond: Option<NonStrictExpression>,
150    /// Column indices of watermark output and offset expression of each inequality, respectively.
151    inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
152    /// The output watermark of each inequality condition and its value is the minimum of the
153    /// calculation result of both side. It will be used to generate watermark into downstream
154    /// and do state cleaning if `clean_state` field of that inequality is `true`.
155    inequality_watermarks: Vec<Option<Watermark>>,
156
157    /// Whether the logic can be optimized for append-only stream
158    append_only_optimize: bool,
159
160    metrics: Arc<StreamingMetrics>,
161    /// The maximum size of the chunk produced by executor at a time
162    chunk_size: usize,
163    /// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
164    cnt_rows_received: u32,
165
166    /// watermark column index -> `BufferedWatermarks`
167    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
168
169    /// When to alert high join amplification
170    high_join_amplification_threshold: usize,
171
172    /// Max number of rows that will be cached in the entry state.
173    entry_state_max_rows: usize,
174}
175
176impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
177    for HashJoinExecutor<K, S, T, E>
178{
179    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180        f.debug_struct("HashJoinExecutor")
181            .field("join_type", &T)
182            .field("input_left", &self.input_l.as_ref().unwrap().identity())
183            .field("input_right", &self.input_r.as_ref().unwrap().identity())
184            .field("side_l", &self.side_l)
185            .field("side_r", &self.side_r)
186            .field("pk_indices", &self.info.pk_indices)
187            .field("schema", &self.info.schema)
188            .field("actual_output_data_types", &self.actual_output_data_types)
189            .finish()
190    }
191}
192
193impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
194    for HashJoinExecutor<K, S, T, E>
195{
196    fn execute(self: Box<Self>) -> BoxedMessageStream {
197        self.into_stream().boxed()
198    }
199}
200
201struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
202    ctx: &'a ActorContextRef,
203    side_l: &'a mut JoinSide<K, S, E>,
204    side_r: &'a mut JoinSide<K, S, E>,
205    actual_output_data_types: &'a [DataType],
206    cond: &'a mut Option<NonStrictExpression>,
207    inequality_watermarks: &'a [Option<Watermark>],
208    chunk: StreamChunk,
209    append_only_optimize: bool,
210    chunk_size: usize,
211    cnt_rows_received: &'a mut u32,
212    high_join_amplification_threshold: usize,
213    entry_state_max_rows: usize,
214}
215
216impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
217    HashJoinExecutor<K, S, T, E>
218{
219    #[allow(clippy::too_many_arguments)]
220    pub fn new(
221        ctx: ActorContextRef,
222        info: ExecutorInfo,
223        input_l: Executor,
224        input_r: Executor,
225        params_l: JoinParams,
226        params_r: JoinParams,
227        null_safe: Vec<bool>,
228        output_indices: Vec<usize>,
229        cond: Option<NonStrictExpression>,
230        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
231        state_table_l: StateTable<S>,
232        degree_state_table_l: StateTable<S>,
233        state_table_r: StateTable<S>,
234        degree_state_table_r: StateTable<S>,
235        watermark_epoch: AtomicU64Ref,
236        is_append_only: bool,
237        metrics: Arc<StreamingMetrics>,
238        chunk_size: usize,
239        high_join_amplification_threshold: usize,
240    ) -> Self {
241        Self::new_with_cache_size(
242            ctx,
243            info,
244            input_l,
245            input_r,
246            params_l,
247            params_r,
248            null_safe,
249            output_indices,
250            cond,
251            inequality_pairs,
252            state_table_l,
253            degree_state_table_l,
254            state_table_r,
255            degree_state_table_r,
256            watermark_epoch,
257            is_append_only,
258            metrics,
259            chunk_size,
260            high_join_amplification_threshold,
261            None,
262        )
263    }
264
265    #[allow(clippy::too_many_arguments)]
266    pub fn new_with_cache_size(
267        ctx: ActorContextRef,
268        info: ExecutorInfo,
269        input_l: Executor,
270        input_r: Executor,
271        params_l: JoinParams,
272        params_r: JoinParams,
273        null_safe: Vec<bool>,
274        output_indices: Vec<usize>,
275        cond: Option<NonStrictExpression>,
276        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
277        state_table_l: StateTable<S>,
278        degree_state_table_l: StateTable<S>,
279        state_table_r: StateTable<S>,
280        degree_state_table_r: StateTable<S>,
281        watermark_epoch: AtomicU64Ref,
282        is_append_only: bool,
283        metrics: Arc<StreamingMetrics>,
284        chunk_size: usize,
285        high_join_amplification_threshold: usize,
286        entry_state_max_rows: Option<usize>,
287    ) -> Self {
288        let entry_state_max_rows = match entry_state_max_rows {
289            None => {
290                ctx.streaming_config
291                    .developer
292                    .hash_join_entry_state_max_rows
293            }
294            Some(entry_state_max_rows) => entry_state_max_rows,
295        };
296        let side_l_column_n = input_l.schema().len();
297
298        let schema_fields = match T {
299            JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
300            JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
301            _ => [
302                input_l.schema().fields.clone(),
303                input_r.schema().fields.clone(),
304            ]
305            .concat(),
306        };
307
308        let original_output_data_types = schema_fields
309            .iter()
310            .map(|field| field.data_type())
311            .collect_vec();
312        let actual_output_data_types = output_indices
313            .iter()
314            .map(|&idx| original_output_data_types[idx].clone())
315            .collect_vec();
316
317        // Data types of of hash join state.
318        let state_all_data_types_l = input_l.schema().data_types();
319        let state_all_data_types_r = input_r.schema().data_types();
320
321        let state_pk_indices_l = input_l.pk_indices().to_vec();
322        let state_pk_indices_r = input_r.pk_indices().to_vec();
323
324        let state_join_key_indices_l = params_l.join_key_indices;
325        let state_join_key_indices_r = params_r.join_key_indices;
326
327        let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
328        let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
329
330        let degree_pk_indices_l = (state_join_key_indices_l.len()
331            ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
332            .collect_vec();
333        let degree_pk_indices_r = (state_join_key_indices_r.len()
334            ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
335            .collect_vec();
336
337        // If pk is contained in join key.
338        let pk_contained_in_jk_l =
339            is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
340        let pk_contained_in_jk_r =
341            is_subset(state_pk_indices_r.clone(), state_join_key_indices_r.clone());
342
343        // check whether join key contains pk in both side
344        let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
345
346        let join_key_data_types_l = state_join_key_indices_l
347            .iter()
348            .map(|idx| state_all_data_types_l[*idx].clone())
349            .collect_vec();
350
351        let join_key_data_types_r = state_join_key_indices_r
352            .iter()
353            .map(|idx| state_all_data_types_r[*idx].clone())
354            .collect_vec();
355
356        assert_eq!(join_key_data_types_l, join_key_data_types_r);
357
358        let null_matched = K::Bitmap::from_bool_vec(null_safe);
359
360        let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
361        let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
362
363        let degree_state_l = need_degree_table_l.then(|| {
364            TableInner::new(
365                degree_pk_indices_l,
366                degree_join_key_indices_l,
367                degree_state_table_l,
368            )
369        });
370        let degree_state_r = need_degree_table_r.then(|| {
371            TableInner::new(
372                degree_pk_indices_r,
373                degree_join_key_indices_r,
374                degree_state_table_r,
375            )
376        });
377
378        let (left_to_output, right_to_output) = {
379            let (left_len, right_len) = if is_left_semi_or_anti(T) {
380                (state_all_data_types_l.len(), 0usize)
381            } else if is_right_semi_or_anti(T) {
382                (0usize, state_all_data_types_r.len())
383            } else {
384                (state_all_data_types_l.len(), state_all_data_types_r.len())
385            };
386            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
387        };
388
389        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
390        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
391
392        let left_input_len = input_l.schema().len();
393        let right_input_len = input_r.schema().len();
394        let mut l2inequality_index = vec![vec![]; left_input_len];
395        let mut r2inequality_index = vec![vec![]; right_input_len];
396        let mut l_state_clean_columns = vec![];
397        let mut r_state_clean_columns = vec![];
398        let inequality_pairs = inequality_pairs
399            .into_iter()
400            .enumerate()
401            .map(
402                |(
403                    index,
404                    (key_required_larger, key_required_smaller, clean_state, delta_expression),
405                )| {
406                    let output_indices = if key_required_larger < key_required_smaller {
407                        if clean_state {
408                            l_state_clean_columns.push((key_required_larger, index));
409                        }
410                        l2inequality_index[key_required_larger].push((index, false));
411                        r2inequality_index[key_required_smaller - left_input_len]
412                            .push((index, true));
413                        l2o_indexed
414                            .get_vec(&key_required_larger)
415                            .cloned()
416                            .unwrap_or_default()
417                    } else {
418                        if clean_state {
419                            r_state_clean_columns
420                                .push((key_required_larger - left_input_len, index));
421                        }
422                        l2inequality_index[key_required_smaller].push((index, true));
423                        r2inequality_index[key_required_larger - left_input_len]
424                            .push((index, false));
425                        r2o_indexed
426                            .get_vec(&(key_required_larger - left_input_len))
427                            .cloned()
428                            .unwrap_or_default()
429                    };
430                    (output_indices, delta_expression)
431                },
432            )
433            .collect_vec();
434
435        let mut l_non_null_fields = l2inequality_index
436            .iter()
437            .positions(|inequalities| !inequalities.is_empty())
438            .collect_vec();
439        let mut r_non_null_fields = r2inequality_index
440            .iter()
441            .positions(|inequalities| !inequalities.is_empty())
442            .collect_vec();
443
444        if append_only_optimize {
445            l_state_clean_columns.clear();
446            r_state_clean_columns.clear();
447            l_non_null_fields.clear();
448            r_non_null_fields.clear();
449        }
450
451        let inequality_watermarks = vec![None; inequality_pairs.len()];
452        let watermark_buffers = BTreeMap::new();
453
454        Self {
455            ctx: ctx.clone(),
456            info,
457            input_l: Some(input_l),
458            input_r: Some(input_r),
459            actual_output_data_types,
460            side_l: JoinSide {
461                ht: JoinHashMap::new(
462                    watermark_epoch.clone(),
463                    join_key_data_types_l,
464                    state_join_key_indices_l.clone(),
465                    state_all_data_types_l.clone(),
466                    state_table_l,
467                    params_l.deduped_pk_indices,
468                    degree_state_l,
469                    null_matched.clone(),
470                    pk_contained_in_jk_l,
471                    None,
472                    metrics.clone(),
473                    ctx.id,
474                    ctx.fragment_id,
475                    "left",
476                ),
477                join_key_indices: state_join_key_indices_l,
478                all_data_types: state_all_data_types_l,
479                i2o_mapping: left_to_output,
480                i2o_mapping_indexed: l2o_indexed,
481                input2inequality_index: l2inequality_index,
482                non_null_fields: l_non_null_fields,
483                state_clean_columns: l_state_clean_columns,
484                start_pos: 0,
485                need_degree_table: need_degree_table_l,
486                _marker: PhantomData,
487            },
488            side_r: JoinSide {
489                ht: JoinHashMap::new(
490                    watermark_epoch,
491                    join_key_data_types_r,
492                    state_join_key_indices_r.clone(),
493                    state_all_data_types_r.clone(),
494                    state_table_r,
495                    params_r.deduped_pk_indices,
496                    degree_state_r,
497                    null_matched,
498                    pk_contained_in_jk_r,
499                    None,
500                    metrics.clone(),
501                    ctx.id,
502                    ctx.fragment_id,
503                    "right",
504                ),
505                join_key_indices: state_join_key_indices_r,
506                all_data_types: state_all_data_types_r,
507                start_pos: side_l_column_n,
508                i2o_mapping: right_to_output,
509                i2o_mapping_indexed: r2o_indexed,
510                input2inequality_index: r2inequality_index,
511                non_null_fields: r_non_null_fields,
512                state_clean_columns: r_state_clean_columns,
513                need_degree_table: need_degree_table_r,
514                _marker: PhantomData,
515            },
516            cond,
517            inequality_pairs,
518            inequality_watermarks,
519            append_only_optimize,
520            metrics,
521            chunk_size,
522            cnt_rows_received: 0,
523            watermark_buffers,
524            high_join_amplification_threshold,
525            entry_state_max_rows,
526        }
527    }
528
529    #[try_stream(ok = Message, error = StreamExecutorError)]
530    async fn into_stream(mut self) {
531        let input_l = self.input_l.take().unwrap();
532        let input_r = self.input_r.take().unwrap();
533        let aligned_stream = barrier_align(
534            input_l.execute(),
535            input_r.execute(),
536            self.ctx.id,
537            self.ctx.fragment_id,
538            self.metrics.clone(),
539            "Join",
540        );
541        pin_mut!(aligned_stream);
542
543        let actor_id = self.ctx.id;
544
545        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
546        let first_epoch = barrier.epoch;
547        // The first barrier message should be propagated.
548        yield Message::Barrier(barrier);
549        self.side_l.init(first_epoch).await?;
550        self.side_r.init(first_epoch).await?;
551
552        let actor_id_str = self.ctx.id.to_string();
553        let fragment_id_str = self.ctx.fragment_id.to_string();
554
555        // initialized some metrics
556        let join_actor_input_waiting_duration_ns = self
557            .metrics
558            .join_actor_input_waiting_duration_ns
559            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
560        let left_join_match_duration_ns = self
561            .metrics
562            .join_match_duration_ns
563            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
564        let right_join_match_duration_ns = self
565            .metrics
566            .join_match_duration_ns
567            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
568
569        let barrier_join_match_duration_ns = self
570            .metrics
571            .join_match_duration_ns
572            .with_guarded_label_values(&[
573                actor_id_str.as_str(),
574                fragment_id_str.as_str(),
575                "barrier",
576            ]);
577
578        let left_join_cached_entry_count = self
579            .metrics
580            .join_cached_entry_count
581            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
582
583        let right_join_cached_entry_count = self
584            .metrics
585            .join_cached_entry_count
586            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
587
588        let mut start_time = Instant::now();
589
590        while let Some(msg) = aligned_stream
591            .next()
592            .instrument_await("hash_join_barrier_align")
593            .await
594        {
595            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
596            match msg? {
597                AlignedMessage::WatermarkLeft(watermark) => {
598                    for watermark_to_emit in
599                        self.handle_watermark(SideType::Left, watermark).await?
600                    {
601                        yield Message::Watermark(watermark_to_emit);
602                    }
603                }
604                AlignedMessage::WatermarkRight(watermark) => {
605                    for watermark_to_emit in
606                        self.handle_watermark(SideType::Right, watermark).await?
607                    {
608                        yield Message::Watermark(watermark_to_emit);
609                    }
610                }
611                AlignedMessage::Left(chunk) => {
612                    let mut left_time = Duration::from_nanos(0);
613                    let mut left_start_time = Instant::now();
614                    #[for_await]
615                    for chunk in Self::eq_join_left(EqJoinArgs {
616                        ctx: &self.ctx,
617                        side_l: &mut self.side_l,
618                        side_r: &mut self.side_r,
619                        actual_output_data_types: &self.actual_output_data_types,
620                        cond: &mut self.cond,
621                        inequality_watermarks: &self.inequality_watermarks,
622                        chunk,
623                        append_only_optimize: self.append_only_optimize,
624                        chunk_size: self.chunk_size,
625                        cnt_rows_received: &mut self.cnt_rows_received,
626                        high_join_amplification_threshold: self.high_join_amplification_threshold,
627                        entry_state_max_rows: self.entry_state_max_rows,
628                    }) {
629                        left_time += left_start_time.elapsed();
630                        yield Message::Chunk(chunk?);
631                        left_start_time = Instant::now();
632                    }
633                    left_time += left_start_time.elapsed();
634                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
635                    self.try_flush_data().await?;
636                }
637                AlignedMessage::Right(chunk) => {
638                    let mut right_time = Duration::from_nanos(0);
639                    let mut right_start_time = Instant::now();
640                    #[for_await]
641                    for chunk in Self::eq_join_right(EqJoinArgs {
642                        ctx: &self.ctx,
643                        side_l: &mut self.side_l,
644                        side_r: &mut self.side_r,
645                        actual_output_data_types: &self.actual_output_data_types,
646                        cond: &mut self.cond,
647                        inequality_watermarks: &self.inequality_watermarks,
648                        chunk,
649                        append_only_optimize: self.append_only_optimize,
650                        chunk_size: self.chunk_size,
651                        cnt_rows_received: &mut self.cnt_rows_received,
652                        high_join_amplification_threshold: self.high_join_amplification_threshold,
653                        entry_state_max_rows: self.entry_state_max_rows,
654                    }) {
655                        right_time += right_start_time.elapsed();
656                        yield Message::Chunk(chunk?);
657                        right_start_time = Instant::now();
658                    }
659                    right_time += right_start_time.elapsed();
660                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
661                    self.try_flush_data().await?;
662                }
663                AlignedMessage::Barrier(barrier) => {
664                    let barrier_start_time = Instant::now();
665                    let (left_post_commit, right_post_commit) =
666                        self.flush_data(barrier.epoch).await?;
667
668                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
669
670                    // We don't include the time post yielding barrier because the vnode update
671                    // is a one-off and rare operation.
672                    barrier_join_match_duration_ns
673                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
674                    yield Message::Barrier(barrier);
675
676                    // Update the vnode bitmap for state tables of both sides if asked.
677                    right_post_commit
678                        .post_yield_barrier(update_vnode_bitmap.clone())
679                        .await?;
680                    if left_post_commit
681                        .post_yield_barrier(update_vnode_bitmap)
682                        .await?
683                        .unwrap_or(false)
684                    {
685                        self.watermark_buffers
686                            .values_mut()
687                            .for_each(|buffers| buffers.clear());
688                        self.inequality_watermarks.fill(None);
689                    }
690
691                    // Report metrics of cached join rows/entries
692                    for (join_cached_entry_count, ht) in [
693                        (&left_join_cached_entry_count, &self.side_l.ht),
694                        (&right_join_cached_entry_count, &self.side_r.ht),
695                    ] {
696                        join_cached_entry_count.set(ht.entry_count() as i64);
697                    }
698                }
699            }
700            start_time = Instant::now();
701        }
702    }
703
704    async fn flush_data(
705        &mut self,
706        epoch: EpochPair,
707    ) -> StreamExecutorResult<(
708        JoinHashMapPostCommit<'_, K, S, E>,
709        JoinHashMapPostCommit<'_, K, S, E>,
710    )> {
711        // All changes to the state has been buffered in the mem-table of the state table. Just
712        // `commit` them here.
713        let left = self.side_l.ht.flush(epoch).await?;
714        let right = self.side_r.ht.flush(epoch).await?;
715        Ok((left, right))
716    }
717
718    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
719        // All changes to the state has been buffered in the mem-table of the state table. Just
720        // `commit` them here.
721        self.side_l.ht.try_flush().await?;
722        self.side_r.ht.try_flush().await?;
723        Ok(())
724    }
725
726    // We need to manually evict the cache.
727    fn evict_cache(
728        side_update: &mut JoinSide<K, S, E>,
729        side_match: &mut JoinSide<K, S, E>,
730        cnt_rows_received: &mut u32,
731    ) {
732        *cnt_rows_received += 1;
733        if *cnt_rows_received == EVICT_EVERY_N_ROWS {
734            side_update.ht.evict();
735            side_match.ht.evict();
736            *cnt_rows_received = 0;
737        }
738    }
739
740    async fn handle_watermark(
741        &mut self,
742        side: SideTypePrimitive,
743        watermark: Watermark,
744    ) -> StreamExecutorResult<Vec<Watermark>> {
745        let (side_update, side_match) = if side == SideType::Left {
746            (&mut self.side_l, &mut self.side_r)
747        } else {
748            (&mut self.side_r, &mut self.side_l)
749        };
750
751        // State cleaning
752        if side_update.join_key_indices[0] == watermark.col_idx {
753            side_match.ht.update_watermark(watermark.val.clone());
754        }
755
756        // Select watermarks to yield.
757        let wm_in_jk = side_update
758            .join_key_indices
759            .iter()
760            .positions(|idx| *idx == watermark.col_idx);
761        let mut watermarks_to_emit = vec![];
762        for idx in wm_in_jk {
763            let buffers = self
764                .watermark_buffers
765                .entry(idx)
766                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
767            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
768                let empty_indices = vec![];
769                let output_indices = side_update
770                    .i2o_mapping_indexed
771                    .get_vec(&side_update.join_key_indices[idx])
772                    .unwrap_or(&empty_indices)
773                    .iter()
774                    .chain(
775                        side_match
776                            .i2o_mapping_indexed
777                            .get_vec(&side_match.join_key_indices[idx])
778                            .unwrap_or(&empty_indices),
779                    );
780                for output_idx in output_indices {
781                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
782                }
783            };
784        }
785        for (inequality_index, need_offset) in
786            &side_update.input2inequality_index[watermark.col_idx]
787        {
788            let buffers = self
789                .watermark_buffers
790                .entry(side_update.join_key_indices.len() + inequality_index)
791                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
792            let mut input_watermark = watermark.clone();
793            if *need_offset
794                && let Some(delta_expression) = self.inequality_pairs[*inequality_index].1.as_ref()
795            {
796                // allow since we will handle error manually.
797                #[allow(clippy::disallowed_methods)]
798                let eval_result = delta_expression
799                    .inner()
800                    .eval_row(&OwnedRow::new(vec![Some(input_watermark.val)]))
801                    .await;
802                match eval_result {
803                    Ok(value) => input_watermark.val = value.unwrap(),
804                    Err(err) => {
805                        if !matches!(err, ExprError::NumericOutOfRange) {
806                            self.ctx.on_compute_error(err, &self.info.identity);
807                        }
808                        continue;
809                    }
810                }
811            };
812            if let Some(selected_watermark) = buffers.handle_watermark(side, input_watermark) {
813                for output_idx in &self.inequality_pairs[*inequality_index].0 {
814                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
815                }
816                self.inequality_watermarks[*inequality_index] = Some(selected_watermark);
817            }
818        }
819        Ok(watermarks_to_emit)
820    }
821
822    fn row_concat(
823        row_update: impl Row,
824        update_start_pos: usize,
825        row_matched: impl Row,
826        matched_start_pos: usize,
827    ) -> OwnedRow {
828        let mut new_row = vec![None; row_update.len() + row_matched.len()];
829
830        for (i, datum_ref) in row_update.iter().enumerate() {
831            new_row[i + update_start_pos] = datum_ref.to_owned_datum();
832        }
833        for (i, datum_ref) in row_matched.iter().enumerate() {
834            new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
835        }
836        OwnedRow::new(new_row)
837    }
838
839    /// Used to forward `eq_join_oneside` to show join side in stack.
840    fn eq_join_left(
841        args: EqJoinArgs<'_, K, S, E>,
842    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
843        Self::eq_join_oneside::<{ SideType::Left }>(args)
844    }
845
846    /// Used to forward `eq_join_oneside` to show join side in stack.
847    fn eq_join_right(
848        args: EqJoinArgs<'_, K, S, E>,
849    ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
850        Self::eq_join_oneside::<{ SideType::Right }>(args)
851    }
852
853    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
854    async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
855        let EqJoinArgs {
856            ctx,
857            side_l,
858            side_r,
859            actual_output_data_types,
860            cond,
861            inequality_watermarks,
862            chunk,
863            append_only_optimize,
864            chunk_size,
865            cnt_rows_received,
866            high_join_amplification_threshold,
867            entry_state_max_rows,
868            ..
869        } = args;
870
871        let (side_update, side_match) = if SIDE == SideType::Left {
872            (side_l, side_r)
873        } else {
874            (side_r, side_l)
875        };
876
877        let useful_state_clean_columns = side_match
878            .state_clean_columns
879            .iter()
880            .filter_map(|(column_idx, inequality_index)| {
881                inequality_watermarks[*inequality_index]
882                    .as_ref()
883                    .map(|watermark| (*column_idx, watermark))
884            })
885            .collect_vec();
886
887        let mut hashjoin_chunk_builder =
888            JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
889                chunk_size,
890                actual_output_data_types.to_vec(),
891                side_update.i2o_mapping.clone(),
892                side_match.i2o_mapping.clone(),
893            ));
894
895        let join_matched_join_keys = ctx
896            .streaming_metrics
897            .join_matched_join_keys
898            .with_guarded_label_values(&[
899                &ctx.id.to_string(),
900                &ctx.fragment_id.to_string(),
901                &side_update.ht.table_id().to_string(),
902            ]);
903
904        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
905        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
906            let Some((op, row)) = r else {
907                continue;
908            };
909            Self::evict_cache(side_update, side_match, cnt_rows_received);
910
911            let cache_lookup_result = {
912                let probe_non_null_requirement_satisfied = side_update
913                    .non_null_fields
914                    .iter()
915                    .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
916                let build_non_null_requirement_satisfied =
917                    key.null_bitmap().is_subset(side_match.ht.null_matched());
918                if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
919                    side_match.ht.take_state_opt(key)
920                } else {
921                    CacheResult::NeverMatch
922                }
923            };
924            let mut total_matches = 0;
925
926            macro_rules! match_rows {
927                ($op:ident) => {
928                    Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
929                        cache_lookup_result,
930                        row,
931                        key,
932                        &mut hashjoin_chunk_builder,
933                        side_match,
934                        side_update,
935                        &useful_state_clean_columns,
936                        cond,
937                        append_only_optimize,
938                        entry_state_max_rows,
939                    )
940                };
941            }
942
943            match op {
944                Op::Insert | Op::UpdateInsert =>
945                {
946                    #[for_await]
947                    for chunk in match_rows!(Insert) {
948                        let chunk = chunk?;
949                        total_matches += chunk.cardinality();
950                        yield chunk;
951                    }
952                }
953                Op::Delete | Op::UpdateDelete =>
954                {
955                    #[for_await]
956                    for chunk in match_rows!(Delete) {
957                        let chunk = chunk?;
958                        total_matches += chunk.cardinality();
959                        yield chunk;
960                    }
961                }
962            };
963
964            join_matched_join_keys.observe(total_matches as _);
965            if total_matches > high_join_amplification_threshold {
966                let join_key_data_types = side_update.ht.join_key_data_types();
967                let key = key.deserialize(join_key_data_types)?;
968                tracing::warn!(target: "high_join_amplification",
969                    matched_rows_len = total_matches,
970                    update_table_id = side_update.ht.table_id(),
971                    match_table_id = side_match.ht.table_id(),
972                    join_key = ?key,
973                    actor_id = ctx.id,
974                    fragment_id = ctx.fragment_id,
975                    "large rows matched for join key"
976                );
977            }
978        }
979        // NOTE(kwannoel): We don't track metrics for this last chunk.
980        if let Some(chunk) = hashjoin_chunk_builder.take() {
981            yield chunk;
982        }
983    }
984
985    /// For the probe-side row, we need to check if it has values in cache, if not, we need to
986    /// fetch the matched rows from the state table.
987    ///
988    /// Every matched build-side row being processed needs to go through the following phases:
989    /// 1. Handle join condition evaluation.
990    /// 2. Always do cache refill, if the state count is good.
991    /// 3. Handle state cleaning.
992    /// 4. Handle degree table update.
993    #[allow(clippy::too_many_arguments)]
994    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
995    async fn handle_match_rows<
996        'a,
997        const SIDE: SideTypePrimitive,
998        const JOIN_OP: JoinOpPrimitive,
999    >(
1000        cached_lookup_result: CacheResult<E>,
1001        row: RowRef<'a>,
1002        key: &'a K,
1003        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1004        side_match: &'a mut JoinSide<K, S, E>,
1005        side_update: &'a mut JoinSide<K, S, E>,
1006        useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1007        cond: &'a mut Option<NonStrictExpression>,
1008        append_only_optimize: bool,
1009        entry_state_max_rows: usize,
1010    ) {
1011        let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1012        let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1013        let mut entry_state_count = 0;
1014
1015        let mut degree = 0;
1016        let mut append_only_matched_row = None;
1017        let mut matched_rows_to_clean = vec![];
1018
1019        macro_rules! match_row {
1020            (
1021                $match_order_key_indices:expr,
1022                $degree_table:expr,
1023                $matched_row:expr,
1024                $matched_row_ref:expr,
1025                $from_cache:literal,
1026                $map_output:expr,
1027            ) => {
1028                Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1029                    row,
1030                    $matched_row,
1031                    $matched_row_ref,
1032                    hashjoin_chunk_builder,
1033                    $match_order_key_indices,
1034                    $degree_table,
1035                    side_update.start_pos,
1036                    side_match.start_pos,
1037                    cond,
1038                    &mut degree,
1039                    useful_state_clean_columns,
1040                    append_only_optimize,
1041                    &mut append_only_matched_row,
1042                    &mut matched_rows_to_clean,
1043                    $map_output,
1044                )
1045            };
1046        }
1047
1048        let entry_state = match cached_lookup_result {
1049            CacheResult::NeverMatch => {
1050                let op = match JOIN_OP {
1051                    JoinOp::Insert => Op::Insert,
1052                    JoinOp::Delete => Op::Delete,
1053                };
1054                if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1055                    yield chunk;
1056                }
1057                return Ok(());
1058            }
1059            CacheResult::Hit(mut cached_rows) => {
1060                let (match_order_key_indices, match_degree_state) =
1061                    side_match.ht.get_degree_state_mut_ref();
1062                // Handle cached rows which match the probe-side row.
1063                for (matched_row_ref, matched_row) in
1064                    cached_rows.values_mut(&side_match.all_data_types)
1065                {
1066                    let matched_row = matched_row?;
1067                    if let Some(chunk) = match_row!(
1068                        match_order_key_indices,
1069                        match_degree_state,
1070                        matched_row,
1071                        Some(matched_row_ref),
1072                        true,
1073                        Either::Left,
1074                    )
1075                    .await
1076                    {
1077                        yield chunk;
1078                    }
1079                }
1080
1081                cached_rows
1082            }
1083            CacheResult::Miss => {
1084                // Handle rows which are not in cache.
1085                let (matched_rows, match_order_key_indices, degree_table) = side_match
1086                    .ht
1087                    .fetch_matched_rows_and_get_degree_table_ref(key)
1088                    .await?;
1089
1090                #[for_await]
1091                for matched_row in matched_rows {
1092                    let (encoded_pk, matched_row) = matched_row?;
1093
1094                    let mut matched_row_ref = None;
1095
1096                    // cache refill
1097                    if entry_state_count <= entry_state_max_rows {
1098                        let row_ref = entry_state
1099                            .insert(encoded_pk, E::encode(&matched_row), None) // TODO(kwannoel): handle ineq key for asof join.
1100                            .with_context(|| format!("row: {}", row.display(),))?;
1101                        matched_row_ref = Some(row_ref);
1102                        entry_state_count += 1;
1103                    }
1104                    if let Some(chunk) = match_row!(
1105                        match_order_key_indices,
1106                        degree_table,
1107                        matched_row,
1108                        matched_row_ref,
1109                        false,
1110                        Either::Right,
1111                    )
1112                    .await
1113                    {
1114                        yield chunk;
1115                    }
1116                }
1117                Box::new(entry_state)
1118            }
1119        };
1120
1121        // forward rows depending on join types
1122        let op = match JOIN_OP {
1123            JoinOp::Insert => Op::Insert,
1124            JoinOp::Delete => Op::Delete,
1125        };
1126        if degree == 0 {
1127            if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1128                yield chunk;
1129            }
1130        } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1131        {
1132            yield chunk;
1133        }
1134
1135        // cache refill
1136        if cache_hit || entry_state_count <= entry_state_max_rows {
1137            side_match.ht.update_state(key, entry_state);
1138        }
1139
1140        // watermark state cleaning
1141        for matched_row in matched_rows_to_clean {
1142            side_match.ht.delete_handle_degree(key, matched_row)?;
1143        }
1144
1145        // apply append_only optimization to clean matched_rows which have been persisted
1146        if append_only_optimize && let Some(row) = append_only_matched_row {
1147            assert_matches!(JOIN_OP, JoinOp::Insert);
1148            side_match.ht.delete_handle_degree(key, row)?;
1149            return Ok(());
1150        }
1151
1152        // no append_only optimization, update state table(s).
1153        match JOIN_OP {
1154            JoinOp::Insert => {
1155                side_update
1156                    .ht
1157                    .insert_handle_degree(key, JoinRow::new(row, degree))?;
1158            }
1159            JoinOp::Delete => {
1160                side_update
1161                    .ht
1162                    .delete_handle_degree(key, JoinRow::new(row, degree))?;
1163            }
1164        }
1165    }
1166
1167    #[allow(clippy::too_many_arguments)]
1168    #[inline]
1169    async fn handle_match_row<
1170        'a,
1171        R: Row,  // input row type
1172        RO: Row, // output row type
1173        const SIDE: SideTypePrimitive,
1174        const JOIN_OP: JoinOpPrimitive,
1175        const MATCHED_ROWS_FROM_CACHE: bool,
1176    >(
1177        update_row: RowRef<'a>,
1178        mut matched_row: JoinRow<R>,
1179        mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1180        hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1181        match_order_key_indices: &[usize],
1182        match_degree_table: &mut Option<TableInner<S>>,
1183        side_update_start_pos: usize,
1184        side_match_start_pos: usize,
1185        cond: &Option<NonStrictExpression>,
1186        update_row_degree: &mut u64,
1187        useful_state_clean_columns: &[(usize, &'a Watermark)],
1188        append_only_optimize: bool,
1189        append_only_matched_row: &mut Option<JoinRow<RO>>,
1190        matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1191        map_output: impl Fn(R) -> RO,
1192    ) -> Option<StreamChunk> {
1193        let mut need_state_clean = false;
1194        let mut chunk_opt = None;
1195
1196        // TODO(kwannoel): Instead of evaluating this every loop,
1197        // we can call this only if there's a non-equi expression.
1198        // check join cond
1199        let join_condition_satisfied = Self::check_join_condition(
1200            update_row,
1201            side_update_start_pos,
1202            &matched_row.row,
1203            side_match_start_pos,
1204            cond,
1205        )
1206        .await;
1207
1208        if join_condition_satisfied {
1209            // update degree
1210            *update_row_degree += 1;
1211            // send matched row downstream
1212
1213            // Inserts must happen before degrees are updated,
1214            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1215            if matches!(JOIN_OP, JoinOp::Insert)
1216                && !forward_exactly_once(T, SIDE)
1217                && let Some(chunk) =
1218                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1219            {
1220                chunk_opt = Some(chunk);
1221            }
1222            // TODO(kwannoel): We can actually statically decide this, using join side + join type.
1223            if let Some(degree_table) = match_degree_table {
1224                update_degree::<S, { JOIN_OP }>(
1225                    match_order_key_indices,
1226                    degree_table,
1227                    &mut matched_row,
1228                );
1229                if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1230                    // update matched row in cache
1231                    match JOIN_OP {
1232                        JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1233                        JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1234                    }
1235                }
1236            }
1237
1238            // Deletes must happen after degree table is updated.
1239            // FIXME(kwannoel): We should let deletes and inserts happen BEFORE degree updates.
1240            if matches!(JOIN_OP, JoinOp::Delete)
1241                && !forward_exactly_once(T, SIDE)
1242                && let Some(chunk) =
1243                    hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1244            {
1245                chunk_opt = Some(chunk);
1246            }
1247        } else {
1248            // check if need state cleaning
1249            for (column_idx, watermark) in useful_state_clean_columns {
1250                if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1251                    scalar
1252                        .default_cmp(&watermark.val.as_scalar_ref_impl())
1253                        .is_lt()
1254                }) {
1255                    need_state_clean = true;
1256                    break;
1257                }
1258            }
1259        }
1260        // If the stream is append-only and the join key covers pk in both side,
1261        // then we can remove matched rows since pk is unique and will not be
1262        // inserted again
1263        if append_only_optimize {
1264            assert_matches!(JOIN_OP, JoinOp::Insert);
1265            // Since join key contains pk and pk is unique, there should be only
1266            // one row if matched.
1267            assert!(append_only_matched_row.is_none());
1268            *append_only_matched_row = Some(matched_row.map(map_output));
1269        } else if need_state_clean {
1270            // `append_only_optimize` and `need_state_clean` won't both be true.
1271            // 'else' here is only to suppress compiler error, otherwise
1272            // `matched_row` will be moved twice.
1273            matched_rows_to_clean.push(matched_row.map(map_output));
1274        }
1275
1276        chunk_opt
1277    }
1278
1279    // TODO(yuhao-su): We should find a better way to eval the expression
1280    // without concat two rows.
1281    // if there are non-equi expressions
1282    // NOTE(kwannoel): We can probably let `eval` use `impl Row` instead of `OwnedRow`.
1283    #[inline]
1284    async fn check_join_condition(
1285        row: impl Row,
1286        side_update_start_pos: usize,
1287        matched_row: impl Row,
1288        side_match_start_pos: usize,
1289        join_condition: &Option<NonStrictExpression>,
1290    ) -> bool {
1291        if let Some(join_condition) = join_condition {
1292            let new_row = Self::row_concat(
1293                row,
1294                side_update_start_pos,
1295                matched_row,
1296                side_match_start_pos,
1297            );
1298            join_condition
1299                .eval_row_infallible(&new_row)
1300                .await
1301                .map(|s| *s.as_bool())
1302                .unwrap_or(false)
1303        } else {
1304            true
1305        }
1306    }
1307}
1308
1309#[cfg(test)]
1310mod tests {
1311    use std::sync::atomic::AtomicU64;
1312
1313    use pretty_assertions::assert_eq;
1314    use risingwave_common::array::*;
1315    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1316    use risingwave_common::hash::{Key64, Key128};
1317    use risingwave_common::util::epoch::test_epoch;
1318    use risingwave_common::util::sort_util::OrderType;
1319    use risingwave_storage::memory::MemoryStateStore;
1320
1321    use super::*;
1322    use crate::common::table::test_utils::gen_pbtable;
1323    use crate::executor::MemoryEncoding;
1324    use crate::executor::test_utils::expr::build_from_pretty;
1325    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1326
1327    async fn create_in_memory_state_table(
1328        mem_state: MemoryStateStore,
1329        data_types: &[DataType],
1330        order_types: &[OrderType],
1331        pk_indices: &[usize],
1332        table_id: u32,
1333    ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1334        let column_descs = data_types
1335            .iter()
1336            .enumerate()
1337            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1338            .collect_vec();
1339        let state_table = StateTable::from_table_catalog(
1340            &gen_pbtable(
1341                TableId::new(table_id),
1342                column_descs,
1343                order_types.to_vec(),
1344                pk_indices.to_vec(),
1345                0,
1346            ),
1347            mem_state.clone(),
1348            None,
1349        )
1350        .await;
1351
1352        // Create degree table
1353        let mut degree_table_column_descs = vec![];
1354        pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1355            degree_table_column_descs.push(ColumnDesc::unnamed(
1356                ColumnId::new(pk_id as i32),
1357                data_types[*idx].clone(),
1358            ))
1359        });
1360        degree_table_column_descs.push(ColumnDesc::unnamed(
1361            ColumnId::new(pk_indices.len() as i32),
1362            DataType::Int64,
1363        ));
1364        let degree_state_table = StateTable::from_table_catalog(
1365            &gen_pbtable(
1366                TableId::new(table_id + 1),
1367                degree_table_column_descs,
1368                order_types.to_vec(),
1369                pk_indices.to_vec(),
1370                0,
1371            ),
1372            mem_state,
1373            None,
1374        )
1375        .await;
1376        (state_table, degree_state_table)
1377    }
1378
1379    fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1380        build_from_pretty(
1381            condition_text
1382                .as_deref()
1383                .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1384        )
1385    }
1386
1387    async fn create_executor<const T: JoinTypePrimitive>(
1388        with_condition: bool,
1389        null_safe: bool,
1390        condition_text: Option<String>,
1391        inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
1392    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1393        let schema = Schema {
1394            fields: vec![
1395                Field::unnamed(DataType::Int64), // join key
1396                Field::unnamed(DataType::Int64),
1397            ],
1398        };
1399        let (tx_l, source_l) = MockSource::channel();
1400        let source_l = source_l.into_executor(schema.clone(), vec![1]);
1401        let (tx_r, source_r) = MockSource::channel();
1402        let source_r = source_r.into_executor(schema, vec![1]);
1403        let params_l = JoinParams::new(vec![0], vec![1]);
1404        let params_r = JoinParams::new(vec![0], vec![1]);
1405        let cond = with_condition.then(|| create_cond(condition_text));
1406
1407        let mem_state = MemoryStateStore::new();
1408
1409        let (state_l, degree_state_l) = create_in_memory_state_table(
1410            mem_state.clone(),
1411            &[DataType::Int64, DataType::Int64],
1412            &[OrderType::ascending(), OrderType::ascending()],
1413            &[0, 1],
1414            0,
1415        )
1416        .await;
1417
1418        let (state_r, degree_state_r) = create_in_memory_state_table(
1419            mem_state,
1420            &[DataType::Int64, DataType::Int64],
1421            &[OrderType::ascending(), OrderType::ascending()],
1422            &[0, 1],
1423            2,
1424        )
1425        .await;
1426
1427        let schema = match T {
1428            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1429            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1430            _ => [source_l.schema().fields(), source_r.schema().fields()]
1431                .concat()
1432                .into_iter()
1433                .collect(),
1434        };
1435        let schema_len = schema.len();
1436        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1437
1438        let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1439            ActorContext::for_test(123),
1440            info,
1441            source_l,
1442            source_r,
1443            params_l,
1444            params_r,
1445            vec![null_safe],
1446            (0..schema_len).collect_vec(),
1447            cond,
1448            inequality_pairs,
1449            state_l,
1450            degree_state_l,
1451            state_r,
1452            degree_state_r,
1453            Arc::new(AtomicU64::new(0)),
1454            false,
1455            Arc::new(StreamingMetrics::unused()),
1456            1024,
1457            2048,
1458        );
1459        (tx_l, tx_r, executor.boxed().execute())
1460    }
1461
1462    async fn create_classical_executor<const T: JoinTypePrimitive>(
1463        with_condition: bool,
1464        null_safe: bool,
1465        condition_text: Option<String>,
1466    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1467        create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1468    }
1469
1470    async fn create_append_only_executor<const T: JoinTypePrimitive>(
1471        with_condition: bool,
1472    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1473        let schema = Schema {
1474            fields: vec![
1475                Field::unnamed(DataType::Int64),
1476                Field::unnamed(DataType::Int64),
1477                Field::unnamed(DataType::Int64),
1478            ],
1479        };
1480        let (tx_l, source_l) = MockSource::channel();
1481        let source_l = source_l.into_executor(schema.clone(), vec![0]);
1482        let (tx_r, source_r) = MockSource::channel();
1483        let source_r = source_r.into_executor(schema, vec![0]);
1484        let params_l = JoinParams::new(vec![0, 1], vec![]);
1485        let params_r = JoinParams::new(vec![0, 1], vec![]);
1486        let cond = with_condition.then(|| create_cond(None));
1487
1488        let mem_state = MemoryStateStore::new();
1489
1490        let (state_l, degree_state_l) = create_in_memory_state_table(
1491            mem_state.clone(),
1492            &[DataType::Int64, DataType::Int64, DataType::Int64],
1493            &[
1494                OrderType::ascending(),
1495                OrderType::ascending(),
1496                OrderType::ascending(),
1497            ],
1498            &[0, 1, 0],
1499            0,
1500        )
1501        .await;
1502
1503        let (state_r, degree_state_r) = create_in_memory_state_table(
1504            mem_state,
1505            &[DataType::Int64, DataType::Int64, DataType::Int64],
1506            &[
1507                OrderType::ascending(),
1508                OrderType::ascending(),
1509                OrderType::ascending(),
1510            ],
1511            &[0, 1, 1],
1512            1,
1513        )
1514        .await;
1515
1516        let schema = match T {
1517            JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1518            JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1519            _ => [source_l.schema().fields(), source_r.schema().fields()]
1520                .concat()
1521                .into_iter()
1522                .collect(),
1523        };
1524        let schema_len = schema.len();
1525        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1526
1527        let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1528            ActorContext::for_test(123),
1529            info,
1530            source_l,
1531            source_r,
1532            params_l,
1533            params_r,
1534            vec![false],
1535            (0..schema_len).collect_vec(),
1536            cond,
1537            vec![],
1538            state_l,
1539            degree_state_l,
1540            state_r,
1541            degree_state_r,
1542            Arc::new(AtomicU64::new(0)),
1543            true,
1544            Arc::new(StreamingMetrics::unused()),
1545            1024,
1546            2048,
1547        );
1548        (tx_l, tx_r, executor.boxed().execute())
1549    }
1550
1551    #[tokio::test]
1552    async fn test_interval_join() -> StreamExecutorResult<()> {
1553        let chunk_l1 = StreamChunk::from_pretty(
1554            "  I I
1555             + 1 4
1556             + 2 3
1557             + 2 5
1558             + 3 6",
1559        );
1560        let chunk_l2 = StreamChunk::from_pretty(
1561            "  I I
1562             + 3 8
1563             - 3 8",
1564        );
1565        let chunk_r1 = StreamChunk::from_pretty(
1566            "  I I
1567             + 2 6
1568             + 4 8
1569             + 6 9",
1570        );
1571        let chunk_r2 = StreamChunk::from_pretty(
1572            "  I  I
1573             + 2 3
1574             + 6 11",
1575        );
1576        let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1577            true,
1578            false,
1579            Some(String::from("(and:boolean (greater_than:boolean $1:int8 (subtract:int8 $3:int8 2:int8)) (greater_than:boolean $3:int8 (subtract:int8 $1:int8 2:int8)))")),
1580            vec![(1, 3, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)"))), (3, 1, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)")))],
1581        )
1582        .await;
1583
1584        // push the init barrier for left and right
1585        tx_l.push_barrier(test_epoch(1), false);
1586        tx_r.push_barrier(test_epoch(1), false);
1587        hash_join.next_unwrap_ready_barrier()?;
1588
1589        // push the 1st left chunk
1590        tx_l.push_chunk(chunk_l1);
1591        hash_join.next_unwrap_pending();
1592
1593        // push the init barrier for left and right
1594        tx_l.push_barrier(test_epoch(2), false);
1595        tx_r.push_barrier(test_epoch(2), false);
1596        hash_join.next_unwrap_ready_barrier()?;
1597
1598        // push the 2nd left chunk
1599        tx_l.push_chunk(chunk_l2);
1600        hash_join.next_unwrap_pending();
1601
1602        tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1603        hash_join.next_unwrap_pending();
1604
1605        tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1606        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1607        assert_eq!(
1608            output_watermark,
1609            Watermark::new(1, DataType::Int64, ScalarImpl::Int64(4))
1610        );
1611        let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1612        assert_eq!(
1613            output_watermark,
1614            Watermark::new(3, DataType::Int64, ScalarImpl::Int64(6))
1615        );
1616
1617        // push the 1st right chunk
1618        tx_r.push_chunk(chunk_r1);
1619        let chunk = hash_join.next_unwrap_ready_chunk()?;
1620        // data "2 3" should have been cleaned
1621        assert_eq!(
1622            chunk,
1623            StreamChunk::from_pretty(
1624                " I I I I
1625                + 2 5 2 6"
1626            )
1627        );
1628
1629        // push the 2nd right chunk
1630        tx_r.push_chunk(chunk_r2);
1631        // pending means that state clean is successful, or the executor will yield a chunk "+ 2 3
1632        // 2 3" here.
1633        hash_join.next_unwrap_pending();
1634
1635        Ok(())
1636    }
1637
1638    #[tokio::test]
1639    async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1640        let chunk_l1 = StreamChunk::from_pretty(
1641            "  I I
1642             + 1 4
1643             + 2 5
1644             + 3 6",
1645        );
1646        let chunk_l2 = StreamChunk::from_pretty(
1647            "  I I
1648             + 3 8
1649             - 3 8",
1650        );
1651        let chunk_r1 = StreamChunk::from_pretty(
1652            "  I I
1653             + 2 7
1654             + 4 8
1655             + 6 9",
1656        );
1657        let chunk_r2 = StreamChunk::from_pretty(
1658            "  I  I
1659             + 3 10
1660             + 6 11",
1661        );
1662        let (mut tx_l, mut tx_r, mut hash_join) =
1663            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1664
1665        // push the init barrier for left and right
1666        tx_l.push_barrier(test_epoch(1), false);
1667        tx_r.push_barrier(test_epoch(1), false);
1668        hash_join.next_unwrap_ready_barrier()?;
1669
1670        // push the 1st left chunk
1671        tx_l.push_chunk(chunk_l1);
1672        hash_join.next_unwrap_pending();
1673
1674        // push the init barrier for left and right
1675        tx_l.push_barrier(test_epoch(2), false);
1676        tx_r.push_barrier(test_epoch(2), false);
1677        hash_join.next_unwrap_ready_barrier()?;
1678
1679        // push the 2nd left chunk
1680        tx_l.push_chunk(chunk_l2);
1681        hash_join.next_unwrap_pending();
1682
1683        // push the 1st right chunk
1684        tx_r.push_chunk(chunk_r1);
1685        let chunk = hash_join.next_unwrap_ready_chunk()?;
1686        assert_eq!(
1687            chunk,
1688            StreamChunk::from_pretty(
1689                " I I I I
1690                + 2 5 2 7"
1691            )
1692        );
1693
1694        // push the 2nd right chunk
1695        tx_r.push_chunk(chunk_r2);
1696        let chunk = hash_join.next_unwrap_ready_chunk()?;
1697        assert_eq!(
1698            chunk,
1699            StreamChunk::from_pretty(
1700                " I I I I
1701                + 3 6 3 10"
1702            )
1703        );
1704
1705        Ok(())
1706    }
1707
1708    #[tokio::test]
1709    async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1710        let chunk_l1 = StreamChunk::from_pretty(
1711            "  I I
1712             + 1 4
1713             + 2 5
1714             + . 6",
1715        );
1716        let chunk_l2 = StreamChunk::from_pretty(
1717            "  I I
1718             + . 8
1719             - . 8",
1720        );
1721        let chunk_r1 = StreamChunk::from_pretty(
1722            "  I I
1723             + 2 7
1724             + 4 8
1725             + 6 9",
1726        );
1727        let chunk_r2 = StreamChunk::from_pretty(
1728            "  I  I
1729             + . 10
1730             + 6 11",
1731        );
1732        let (mut tx_l, mut tx_r, mut hash_join) =
1733            create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1734
1735        // push the init barrier for left and right
1736        tx_l.push_barrier(test_epoch(1), false);
1737        tx_r.push_barrier(test_epoch(1), false);
1738        hash_join.next_unwrap_ready_barrier()?;
1739
1740        // push the 1st left chunk
1741        tx_l.push_chunk(chunk_l1);
1742        hash_join.next_unwrap_pending();
1743
1744        // push the init barrier for left and right
1745        tx_l.push_barrier(test_epoch(2), false);
1746        tx_r.push_barrier(test_epoch(2), false);
1747        hash_join.next_unwrap_ready_barrier()?;
1748
1749        // push the 2nd left chunk
1750        tx_l.push_chunk(chunk_l2);
1751        hash_join.next_unwrap_pending();
1752
1753        // push the 1st right chunk
1754        tx_r.push_chunk(chunk_r1);
1755        let chunk = hash_join.next_unwrap_ready_chunk()?;
1756        assert_eq!(
1757            chunk,
1758            StreamChunk::from_pretty(
1759                " I I I I
1760                + 2 5 2 7"
1761            )
1762        );
1763
1764        // push the 2nd right chunk
1765        tx_r.push_chunk(chunk_r2);
1766        let chunk = hash_join.next_unwrap_ready_chunk()?;
1767        assert_eq!(
1768            chunk,
1769            StreamChunk::from_pretty(
1770                " I I I I
1771                + . 6 . 10"
1772            )
1773        );
1774
1775        Ok(())
1776    }
1777
1778    #[tokio::test]
1779    async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1780        let chunk_l1 = StreamChunk::from_pretty(
1781            "  I I
1782             + 1 4
1783             + 2 5
1784             + 3 6",
1785        );
1786        let chunk_l2 = StreamChunk::from_pretty(
1787            "  I I
1788             + 3 8
1789             - 3 8",
1790        );
1791        let chunk_r1 = StreamChunk::from_pretty(
1792            "  I I
1793             + 2 7
1794             + 4 8
1795             + 6 9",
1796        );
1797        let chunk_r2 = StreamChunk::from_pretty(
1798            "  I  I
1799             + 3 10
1800             + 6 11",
1801        );
1802        let chunk_l3 = StreamChunk::from_pretty(
1803            "  I I
1804             + 6 10",
1805        );
1806        let chunk_r3 = StreamChunk::from_pretty(
1807            "  I  I
1808             - 6 11",
1809        );
1810        let chunk_r4 = StreamChunk::from_pretty(
1811            "  I  I
1812             - 6 9",
1813        );
1814        let (mut tx_l, mut tx_r, mut hash_join) =
1815            create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1816
1817        // push the init barrier for left and right
1818        tx_l.push_barrier(test_epoch(1), false);
1819        tx_r.push_barrier(test_epoch(1), false);
1820        hash_join.next_unwrap_ready_barrier()?;
1821
1822        // push the 1st left chunk
1823        tx_l.push_chunk(chunk_l1);
1824        hash_join.next_unwrap_pending();
1825
1826        // push the init barrier for left and right
1827        tx_l.push_barrier(test_epoch(2), false);
1828        tx_r.push_barrier(test_epoch(2), false);
1829        hash_join.next_unwrap_ready_barrier()?;
1830
1831        // push the 2nd left chunk
1832        tx_l.push_chunk(chunk_l2);
1833        hash_join.next_unwrap_pending();
1834
1835        // push the 1st right chunk
1836        tx_r.push_chunk(chunk_r1);
1837        let chunk = hash_join.next_unwrap_ready_chunk()?;
1838        assert_eq!(
1839            chunk,
1840            StreamChunk::from_pretty(
1841                " I I
1842                + 2 5"
1843            )
1844        );
1845
1846        // push the 2nd right chunk
1847        tx_r.push_chunk(chunk_r2);
1848        let chunk = hash_join.next_unwrap_ready_chunk()?;
1849        assert_eq!(
1850            chunk,
1851            StreamChunk::from_pretty(
1852                " I I
1853                + 3 6"
1854            )
1855        );
1856
1857        // push the 3rd left chunk (tests forward_exactly_once)
1858        tx_l.push_chunk(chunk_l3);
1859        let chunk = hash_join.next_unwrap_ready_chunk()?;
1860        assert_eq!(
1861            chunk,
1862            StreamChunk::from_pretty(
1863                " I I
1864                + 6 10"
1865            )
1866        );
1867
1868        // push the 3rd right chunk
1869        // (tests that no change if there are still matches)
1870        tx_r.push_chunk(chunk_r3);
1871        hash_join.next_unwrap_pending();
1872
1873        // push the 3rd left chunk
1874        // (tests that deletion occurs when there are no more matches)
1875        tx_r.push_chunk(chunk_r4);
1876        let chunk = hash_join.next_unwrap_ready_chunk()?;
1877        assert_eq!(
1878            chunk,
1879            StreamChunk::from_pretty(
1880                " I I
1881                - 6 10"
1882            )
1883        );
1884
1885        Ok(())
1886    }
1887
1888    #[tokio::test]
1889    async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1890        let chunk_l1 = StreamChunk::from_pretty(
1891            "  I I
1892             + 1 4
1893             + 2 5
1894             + . 6",
1895        );
1896        let chunk_l2 = StreamChunk::from_pretty(
1897            "  I I
1898             + . 8
1899             - . 8",
1900        );
1901        let chunk_r1 = StreamChunk::from_pretty(
1902            "  I I
1903             + 2 7
1904             + 4 8
1905             + 6 9",
1906        );
1907        let chunk_r2 = StreamChunk::from_pretty(
1908            "  I  I
1909             + . 10
1910             + 6 11",
1911        );
1912        let chunk_l3 = StreamChunk::from_pretty(
1913            "  I I
1914             + 6 10",
1915        );
1916        let chunk_r3 = StreamChunk::from_pretty(
1917            "  I  I
1918             - 6 11",
1919        );
1920        let chunk_r4 = StreamChunk::from_pretty(
1921            "  I  I
1922             - 6 9",
1923        );
1924        let (mut tx_l, mut tx_r, mut hash_join) =
1925            create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1926
1927        // push the init barrier for left and right
1928        tx_l.push_barrier(test_epoch(1), false);
1929        tx_r.push_barrier(test_epoch(1), false);
1930        hash_join.next_unwrap_ready_barrier()?;
1931
1932        // push the 1st left chunk
1933        tx_l.push_chunk(chunk_l1);
1934        hash_join.next_unwrap_pending();
1935
1936        // push the init barrier for left and right
1937        tx_l.push_barrier(test_epoch(2), false);
1938        tx_r.push_barrier(test_epoch(2), false);
1939        hash_join.next_unwrap_ready_barrier()?;
1940
1941        // push the 2nd left chunk
1942        tx_l.push_chunk(chunk_l2);
1943        hash_join.next_unwrap_pending();
1944
1945        // push the 1st right chunk
1946        tx_r.push_chunk(chunk_r1);
1947        let chunk = hash_join.next_unwrap_ready_chunk()?;
1948        assert_eq!(
1949            chunk,
1950            StreamChunk::from_pretty(
1951                " I I
1952                + 2 5"
1953            )
1954        );
1955
1956        // push the 2nd right chunk
1957        tx_r.push_chunk(chunk_r2);
1958        let chunk = hash_join.next_unwrap_ready_chunk()?;
1959        assert_eq!(
1960            chunk,
1961            StreamChunk::from_pretty(
1962                " I I
1963                + . 6"
1964            )
1965        );
1966
1967        // push the 3rd left chunk (tests forward_exactly_once)
1968        tx_l.push_chunk(chunk_l3);
1969        let chunk = hash_join.next_unwrap_ready_chunk()?;
1970        assert_eq!(
1971            chunk,
1972            StreamChunk::from_pretty(
1973                " I I
1974                + 6 10"
1975            )
1976        );
1977
1978        // push the 3rd right chunk
1979        // (tests that no change if there are still matches)
1980        tx_r.push_chunk(chunk_r3);
1981        hash_join.next_unwrap_pending();
1982
1983        // push the 3rd left chunk
1984        // (tests that deletion occurs when there are no more matches)
1985        tx_r.push_chunk(chunk_r4);
1986        let chunk = hash_join.next_unwrap_ready_chunk()?;
1987        assert_eq!(
1988            chunk,
1989            StreamChunk::from_pretty(
1990                " I I
1991                - 6 10"
1992            )
1993        );
1994
1995        Ok(())
1996    }
1997
1998    #[tokio::test]
1999    async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
2000        let chunk_l1 = StreamChunk::from_pretty(
2001            "  I I I
2002             + 1 4 1
2003             + 2 5 2
2004             + 3 6 3",
2005        );
2006        let chunk_l2 = StreamChunk::from_pretty(
2007            "  I I I
2008             + 4 9 4
2009             + 5 10 5",
2010        );
2011        let chunk_r1 = StreamChunk::from_pretty(
2012            "  I I I
2013             + 2 5 1
2014             + 4 9 2
2015             + 6 9 3",
2016        );
2017        let chunk_r2 = StreamChunk::from_pretty(
2018            "  I I I
2019             + 1 4 4
2020             + 3 6 5",
2021        );
2022
2023        let (mut tx_l, mut tx_r, mut hash_join) =
2024            create_append_only_executor::<{ JoinType::Inner }>(false).await;
2025
2026        // push the init barrier for left and right
2027        tx_l.push_barrier(test_epoch(1), false);
2028        tx_r.push_barrier(test_epoch(1), false);
2029        hash_join.next_unwrap_ready_barrier()?;
2030
2031        // push the 1st left chunk
2032        tx_l.push_chunk(chunk_l1);
2033        hash_join.next_unwrap_pending();
2034
2035        // push the init barrier for left and right
2036        tx_l.push_barrier(test_epoch(2), false);
2037        tx_r.push_barrier(test_epoch(2), false);
2038        hash_join.next_unwrap_ready_barrier()?;
2039
2040        // push the 2nd left chunk
2041        tx_l.push_chunk(chunk_l2);
2042        hash_join.next_unwrap_pending();
2043
2044        // push the 1st right chunk
2045        tx_r.push_chunk(chunk_r1);
2046        let chunk = hash_join.next_unwrap_ready_chunk()?;
2047        assert_eq!(
2048            chunk,
2049            StreamChunk::from_pretty(
2050                " I I I I I I
2051                + 2 5 2 2 5 1
2052                + 4 9 4 4 9 2"
2053            )
2054        );
2055
2056        // push the 2nd right chunk
2057        tx_r.push_chunk(chunk_r2);
2058        let chunk = hash_join.next_unwrap_ready_chunk()?;
2059        assert_eq!(
2060            chunk,
2061            StreamChunk::from_pretty(
2062                " I I I I I I
2063                + 1 4 1 1 4 4
2064                + 3 6 3 3 6 5"
2065            )
2066        );
2067
2068        Ok(())
2069    }
2070
2071    #[tokio::test]
2072    async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2073        let chunk_l1 = StreamChunk::from_pretty(
2074            "  I I I
2075             + 1 4 1
2076             + 2 5 2
2077             + 3 6 3",
2078        );
2079        let chunk_l2 = StreamChunk::from_pretty(
2080            "  I I I
2081             + 4 9 4
2082             + 5 10 5",
2083        );
2084        let chunk_r1 = StreamChunk::from_pretty(
2085            "  I I I
2086             + 2 5 1
2087             + 4 9 2
2088             + 6 9 3",
2089        );
2090        let chunk_r2 = StreamChunk::from_pretty(
2091            "  I I I
2092             + 1 4 4
2093             + 3 6 5",
2094        );
2095
2096        let (mut tx_l, mut tx_r, mut hash_join) =
2097            create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2098
2099        // push the init barrier for left and right
2100        tx_l.push_barrier(test_epoch(1), false);
2101        tx_r.push_barrier(test_epoch(1), false);
2102        hash_join.next_unwrap_ready_barrier()?;
2103
2104        // push the 1st left chunk
2105        tx_l.push_chunk(chunk_l1);
2106        hash_join.next_unwrap_pending();
2107
2108        // push the init barrier for left and right
2109        tx_l.push_barrier(test_epoch(2), false);
2110        tx_r.push_barrier(test_epoch(2), false);
2111        hash_join.next_unwrap_ready_barrier()?;
2112
2113        // push the 2nd left chunk
2114        tx_l.push_chunk(chunk_l2);
2115        hash_join.next_unwrap_pending();
2116
2117        // push the 1st right chunk
2118        tx_r.push_chunk(chunk_r1);
2119        let chunk = hash_join.next_unwrap_ready_chunk()?;
2120        assert_eq!(
2121            chunk,
2122            StreamChunk::from_pretty(
2123                " I I I
2124                + 2 5 2
2125                + 4 9 4"
2126            )
2127        );
2128
2129        // push the 2nd right chunk
2130        tx_r.push_chunk(chunk_r2);
2131        let chunk = hash_join.next_unwrap_ready_chunk()?;
2132        assert_eq!(
2133            chunk,
2134            StreamChunk::from_pretty(
2135                " I I I
2136                + 1 4 1
2137                + 3 6 3"
2138            )
2139        );
2140
2141        Ok(())
2142    }
2143
2144    #[tokio::test]
2145    async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2146        let chunk_l1 = StreamChunk::from_pretty(
2147            "  I I I
2148             + 1 4 1
2149             + 2 5 2
2150             + 3 6 3",
2151        );
2152        let chunk_l2 = StreamChunk::from_pretty(
2153            "  I I I
2154             + 4 9 4
2155             + 5 10 5",
2156        );
2157        let chunk_r1 = StreamChunk::from_pretty(
2158            "  I I I
2159             + 2 5 1
2160             + 4 9 2
2161             + 6 9 3",
2162        );
2163        let chunk_r2 = StreamChunk::from_pretty(
2164            "  I I I
2165             + 1 4 4
2166             + 3 6 5",
2167        );
2168
2169        let (mut tx_l, mut tx_r, mut hash_join) =
2170            create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2171
2172        // push the init barrier for left and right
2173        tx_l.push_barrier(test_epoch(1), false);
2174        tx_r.push_barrier(test_epoch(1), false);
2175        hash_join.next_unwrap_ready_barrier()?;
2176
2177        // push the 1st left chunk
2178        tx_l.push_chunk(chunk_l1);
2179        hash_join.next_unwrap_pending();
2180
2181        // push the init barrier for left and right
2182        tx_l.push_barrier(test_epoch(2), false);
2183        tx_r.push_barrier(test_epoch(2), false);
2184        hash_join.next_unwrap_ready_barrier()?;
2185
2186        // push the 2nd left chunk
2187        tx_l.push_chunk(chunk_l2);
2188        hash_join.next_unwrap_pending();
2189
2190        // push the 1st right chunk
2191        tx_r.push_chunk(chunk_r1);
2192        let chunk = hash_join.next_unwrap_ready_chunk()?;
2193        assert_eq!(
2194            chunk,
2195            StreamChunk::from_pretty(
2196                " I I I
2197                + 2 5 1
2198                + 4 9 2"
2199            )
2200        );
2201
2202        // push the 2nd right chunk
2203        tx_r.push_chunk(chunk_r2);
2204        let chunk = hash_join.next_unwrap_ready_chunk()?;
2205        assert_eq!(
2206            chunk,
2207            StreamChunk::from_pretty(
2208                " I I I
2209                + 1 4 4
2210                + 3 6 5"
2211            )
2212        );
2213
2214        Ok(())
2215    }
2216
2217    #[tokio::test]
2218    async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2219        let chunk_r1 = StreamChunk::from_pretty(
2220            "  I I
2221             + 1 4
2222             + 2 5
2223             + 3 6",
2224        );
2225        let chunk_r2 = StreamChunk::from_pretty(
2226            "  I I
2227             + 3 8
2228             - 3 8",
2229        );
2230        let chunk_l1 = StreamChunk::from_pretty(
2231            "  I I
2232             + 2 7
2233             + 4 8
2234             + 6 9",
2235        );
2236        let chunk_l2 = StreamChunk::from_pretty(
2237            "  I  I
2238             + 3 10
2239             + 6 11",
2240        );
2241        let chunk_r3 = StreamChunk::from_pretty(
2242            "  I I
2243             + 6 10",
2244        );
2245        let chunk_l3 = StreamChunk::from_pretty(
2246            "  I  I
2247             - 6 11",
2248        );
2249        let chunk_l4 = StreamChunk::from_pretty(
2250            "  I  I
2251             - 6 9",
2252        );
2253        let (mut tx_l, mut tx_r, mut hash_join) =
2254            create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2255
2256        // push the init barrier for left and right
2257        tx_l.push_barrier(test_epoch(1), false);
2258        tx_r.push_barrier(test_epoch(1), false);
2259        hash_join.next_unwrap_ready_barrier()?;
2260
2261        // push the 1st right chunk
2262        tx_r.push_chunk(chunk_r1);
2263        hash_join.next_unwrap_pending();
2264
2265        // push the init barrier for left and right
2266        tx_l.push_barrier(test_epoch(2), false);
2267        tx_r.push_barrier(test_epoch(2), false);
2268        hash_join.next_unwrap_ready_barrier()?;
2269
2270        // push the 2nd right chunk
2271        tx_r.push_chunk(chunk_r2);
2272        hash_join.next_unwrap_pending();
2273
2274        // push the 1st left chunk
2275        tx_l.push_chunk(chunk_l1);
2276        let chunk = hash_join.next_unwrap_ready_chunk()?;
2277        assert_eq!(
2278            chunk,
2279            StreamChunk::from_pretty(
2280                " I I
2281                + 2 5"
2282            )
2283        );
2284
2285        // push the 2nd left chunk
2286        tx_l.push_chunk(chunk_l2);
2287        let chunk = hash_join.next_unwrap_ready_chunk()?;
2288        assert_eq!(
2289            chunk,
2290            StreamChunk::from_pretty(
2291                " I I
2292                + 3 6"
2293            )
2294        );
2295
2296        // push the 3rd right chunk (tests forward_exactly_once)
2297        tx_r.push_chunk(chunk_r3);
2298        let chunk = hash_join.next_unwrap_ready_chunk()?;
2299        assert_eq!(
2300            chunk,
2301            StreamChunk::from_pretty(
2302                " I I
2303                + 6 10"
2304            )
2305        );
2306
2307        // push the 3rd left chunk
2308        // (tests that no change if there are still matches)
2309        tx_l.push_chunk(chunk_l3);
2310        hash_join.next_unwrap_pending();
2311
2312        // push the 3rd right chunk
2313        // (tests that deletion occurs when there are no more matches)
2314        tx_l.push_chunk(chunk_l4);
2315        let chunk = hash_join.next_unwrap_ready_chunk()?;
2316        assert_eq!(
2317            chunk,
2318            StreamChunk::from_pretty(
2319                " I I
2320                - 6 10"
2321            )
2322        );
2323
2324        Ok(())
2325    }
2326
2327    #[tokio::test]
2328    async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2329        let chunk_l1 = StreamChunk::from_pretty(
2330            "  I I
2331             + 1 4
2332             + 2 5
2333             + 3 6",
2334        );
2335        let chunk_l2 = StreamChunk::from_pretty(
2336            "  I I
2337             + 3 8
2338             - 3 8",
2339        );
2340        let chunk_r1 = StreamChunk::from_pretty(
2341            "  I I
2342             + 2 7
2343             + 4 8
2344             + 6 9",
2345        );
2346        let chunk_r2 = StreamChunk::from_pretty(
2347            "  I  I
2348             + 3 10
2349             + 6 11
2350             + 1 2
2351             + 1 3",
2352        );
2353        let chunk_l3 = StreamChunk::from_pretty(
2354            "  I I
2355             + 9 10",
2356        );
2357        let chunk_r3 = StreamChunk::from_pretty(
2358            "  I I
2359             - 1 2",
2360        );
2361        let chunk_r4 = StreamChunk::from_pretty(
2362            "  I I
2363             - 1 3",
2364        );
2365        let (mut tx_l, mut tx_r, mut hash_join) =
2366            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2367
2368        // push the init barrier for left and right
2369        tx_l.push_barrier(test_epoch(1), false);
2370        tx_r.push_barrier(test_epoch(1), false);
2371        hash_join.next_unwrap_ready_barrier()?;
2372
2373        // push the 1st left chunk
2374        tx_l.push_chunk(chunk_l1);
2375        let chunk = hash_join.next_unwrap_ready_chunk()?;
2376        assert_eq!(
2377            chunk,
2378            StreamChunk::from_pretty(
2379                " I I
2380                + 1 4
2381                + 2 5
2382                + 3 6",
2383            )
2384        );
2385
2386        // push the init barrier for left and right
2387        tx_l.push_barrier(test_epoch(2), false);
2388        tx_r.push_barrier(test_epoch(2), false);
2389        hash_join.next_unwrap_ready_barrier()?;
2390
2391        // push the 2nd left chunk
2392        tx_l.push_chunk(chunk_l2);
2393        let chunk = hash_join.next_unwrap_ready_chunk()?;
2394        assert_eq!(
2395            chunk,
2396            StreamChunk::from_pretty(
2397                "  I I
2398                 + 3 8 D
2399                 - 3 8 D",
2400            )
2401        );
2402
2403        // push the 1st right chunk
2404        tx_r.push_chunk(chunk_r1);
2405        let chunk = hash_join.next_unwrap_ready_chunk()?;
2406        assert_eq!(
2407            chunk,
2408            StreamChunk::from_pretty(
2409                " I I
2410                - 2 5"
2411            )
2412        );
2413
2414        // push the 2nd right chunk
2415        tx_r.push_chunk(chunk_r2);
2416        let chunk = hash_join.next_unwrap_ready_chunk()?;
2417        assert_eq!(
2418            chunk,
2419            StreamChunk::from_pretty(
2420                " I I
2421                - 3 6
2422                - 1 4"
2423            )
2424        );
2425
2426        // push the 3rd left chunk (tests forward_exactly_once)
2427        tx_l.push_chunk(chunk_l3);
2428        let chunk = hash_join.next_unwrap_ready_chunk()?;
2429        assert_eq!(
2430            chunk,
2431            StreamChunk::from_pretty(
2432                " I I
2433                + 9 10"
2434            )
2435        );
2436
2437        // push the 3rd right chunk
2438        // (tests that no change if there are still matches)
2439        tx_r.push_chunk(chunk_r3);
2440        hash_join.next_unwrap_pending();
2441
2442        // push the 4th right chunk
2443        // (tests that insertion occurs when there are no more matches)
2444        tx_r.push_chunk(chunk_r4);
2445        let chunk = hash_join.next_unwrap_ready_chunk()?;
2446        assert_eq!(
2447            chunk,
2448            StreamChunk::from_pretty(
2449                " I I
2450                + 1 4"
2451            )
2452        );
2453
2454        Ok(())
2455    }
2456
2457    #[tokio::test]
2458    async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2459        let chunk_r1 = StreamChunk::from_pretty(
2460            "  I I
2461             + 1 4
2462             + 2 5
2463             + 3 6",
2464        );
2465        let chunk_r2 = StreamChunk::from_pretty(
2466            "  I I
2467             + 3 8
2468             - 3 8",
2469        );
2470        let chunk_l1 = StreamChunk::from_pretty(
2471            "  I I
2472             + 2 7
2473             + 4 8
2474             + 6 9",
2475        );
2476        let chunk_l2 = StreamChunk::from_pretty(
2477            "  I  I
2478             + 3 10
2479             + 6 11
2480             + 1 2
2481             + 1 3",
2482        );
2483        let chunk_r3 = StreamChunk::from_pretty(
2484            "  I I
2485             + 9 10",
2486        );
2487        let chunk_l3 = StreamChunk::from_pretty(
2488            "  I I
2489             - 1 2",
2490        );
2491        let chunk_l4 = StreamChunk::from_pretty(
2492            "  I I
2493             - 1 3",
2494        );
2495        let (mut tx_r, mut tx_l, mut hash_join) =
2496            create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2497
2498        // push the init barrier for left and right
2499        tx_r.push_barrier(test_epoch(1), false);
2500        tx_l.push_barrier(test_epoch(1), false);
2501        hash_join.next_unwrap_ready_barrier()?;
2502
2503        // push the 1st right chunk
2504        tx_r.push_chunk(chunk_r1);
2505        let chunk = hash_join.next_unwrap_ready_chunk()?;
2506        assert_eq!(
2507            chunk,
2508            StreamChunk::from_pretty(
2509                " I I
2510                + 1 4
2511                + 2 5
2512                + 3 6",
2513            )
2514        );
2515
2516        // push the init barrier for left and right
2517        tx_r.push_barrier(test_epoch(2), false);
2518        tx_l.push_barrier(test_epoch(2), false);
2519        hash_join.next_unwrap_ready_barrier()?;
2520
2521        // push the 2nd right chunk
2522        tx_r.push_chunk(chunk_r2);
2523        let chunk = hash_join.next_unwrap_ready_chunk()?;
2524        assert_eq!(
2525            chunk,
2526            StreamChunk::from_pretty(
2527                "  I I
2528                 + 3 8 D
2529                 - 3 8 D",
2530            )
2531        );
2532
2533        // push the 1st left chunk
2534        tx_l.push_chunk(chunk_l1);
2535        let chunk = hash_join.next_unwrap_ready_chunk()?;
2536        assert_eq!(
2537            chunk,
2538            StreamChunk::from_pretty(
2539                " I I
2540                - 2 5"
2541            )
2542        );
2543
2544        // push the 2nd left chunk
2545        tx_l.push_chunk(chunk_l2);
2546        let chunk = hash_join.next_unwrap_ready_chunk()?;
2547        assert_eq!(
2548            chunk,
2549            StreamChunk::from_pretty(
2550                " I I
2551                - 3 6
2552                - 1 4"
2553            )
2554        );
2555
2556        // push the 3rd right chunk (tests forward_exactly_once)
2557        tx_r.push_chunk(chunk_r3);
2558        let chunk = hash_join.next_unwrap_ready_chunk()?;
2559        assert_eq!(
2560            chunk,
2561            StreamChunk::from_pretty(
2562                " I I
2563                + 9 10"
2564            )
2565        );
2566
2567        // push the 3rd left chunk
2568        // (tests that no change if there are still matches)
2569        tx_l.push_chunk(chunk_l3);
2570        hash_join.next_unwrap_pending();
2571
2572        // push the 4th left chunk
2573        // (tests that insertion occurs when there are no more matches)
2574        tx_l.push_chunk(chunk_l4);
2575        let chunk = hash_join.next_unwrap_ready_chunk()?;
2576        assert_eq!(
2577            chunk,
2578            StreamChunk::from_pretty(
2579                " I I
2580                + 1 4"
2581            )
2582        );
2583
2584        Ok(())
2585    }
2586
2587    #[tokio::test]
2588    async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2589        let chunk_l1 = StreamChunk::from_pretty(
2590            "  I I
2591             + 1 4
2592             + 2 5
2593             + 3 6",
2594        );
2595        let chunk_l2 = StreamChunk::from_pretty(
2596            "  I I
2597             + 6 8
2598             + 3 8",
2599        );
2600        let chunk_r1 = StreamChunk::from_pretty(
2601            "  I I
2602             + 2 7
2603             + 4 8
2604             + 6 9",
2605        );
2606        let chunk_r2 = StreamChunk::from_pretty(
2607            "  I  I
2608             + 3 10
2609             + 6 11",
2610        );
2611        let (mut tx_l, mut tx_r, mut hash_join) =
2612            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2613
2614        // push the init barrier for left and right
2615        tx_l.push_barrier(test_epoch(1), false);
2616        tx_r.push_barrier(test_epoch(1), false);
2617        hash_join.next_unwrap_ready_barrier()?;
2618
2619        // push the 1st left chunk
2620        tx_l.push_chunk(chunk_l1);
2621        hash_join.next_unwrap_pending();
2622
2623        // push a barrier to left side
2624        tx_l.push_barrier(test_epoch(2), false);
2625
2626        // push the 2nd left chunk
2627        tx_l.push_chunk(chunk_l2);
2628
2629        // join the first right chunk
2630        tx_r.push_chunk(chunk_r1);
2631
2632        // Consume stream chunk
2633        let chunk = hash_join.next_unwrap_ready_chunk()?;
2634        assert_eq!(
2635            chunk,
2636            StreamChunk::from_pretty(
2637                " I I I I
2638                + 2 5 2 7"
2639            )
2640        );
2641
2642        // push a barrier to right side
2643        tx_r.push_barrier(test_epoch(2), false);
2644
2645        // get the aligned barrier here
2646        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2647        assert!(matches!(
2648            hash_join.next_unwrap_ready_barrier()?,
2649            Barrier {
2650                epoch,
2651                mutation: None,
2652                ..
2653            } if epoch == expected_epoch
2654        ));
2655
2656        // join the 2nd left chunk
2657        let chunk = hash_join.next_unwrap_ready_chunk()?;
2658        assert_eq!(
2659            chunk,
2660            StreamChunk::from_pretty(
2661                " I I I I
2662                + 6 8 6 9"
2663            )
2664        );
2665
2666        // push the 2nd right chunk
2667        tx_r.push_chunk(chunk_r2);
2668        let chunk = hash_join.next_unwrap_ready_chunk()?;
2669        assert_eq!(
2670            chunk,
2671            StreamChunk::from_pretty(
2672                " I I I I
2673                + 3 6 3 10
2674                + 3 8 3 10
2675                + 6 8 6 11"
2676            )
2677        );
2678
2679        Ok(())
2680    }
2681
2682    #[tokio::test]
2683    async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2684        let chunk_l1 = StreamChunk::from_pretty(
2685            "  I I
2686             + 1 4
2687             + 2 .
2688             + 3 .",
2689        );
2690        let chunk_l2 = StreamChunk::from_pretty(
2691            "  I I
2692             + 6 .
2693             + 3 8",
2694        );
2695        let chunk_r1 = StreamChunk::from_pretty(
2696            "  I I
2697             + 2 7
2698             + 4 8
2699             + 6 9",
2700        );
2701        let chunk_r2 = StreamChunk::from_pretty(
2702            "  I  I
2703             + 3 10
2704             + 6 11",
2705        );
2706        let (mut tx_l, mut tx_r, mut hash_join) =
2707            create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2708
2709        // push the init barrier for left and right
2710        tx_l.push_barrier(test_epoch(1), false);
2711        tx_r.push_barrier(test_epoch(1), false);
2712        hash_join.next_unwrap_ready_barrier()?;
2713
2714        // push the 1st left chunk
2715        tx_l.push_chunk(chunk_l1);
2716        hash_join.next_unwrap_pending();
2717
2718        // push a barrier to left side
2719        tx_l.push_barrier(test_epoch(2), false);
2720
2721        // push the 2nd left chunk
2722        tx_l.push_chunk(chunk_l2);
2723
2724        // join the first right chunk
2725        tx_r.push_chunk(chunk_r1);
2726
2727        // Consume stream chunk
2728        let chunk = hash_join.next_unwrap_ready_chunk()?;
2729        assert_eq!(
2730            chunk,
2731            StreamChunk::from_pretty(
2732                " I I I I
2733                + 2 . 2 7"
2734            )
2735        );
2736
2737        // push a barrier to right side
2738        tx_r.push_barrier(test_epoch(2), false);
2739
2740        // get the aligned barrier here
2741        let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2742        assert!(matches!(
2743            hash_join.next_unwrap_ready_barrier()?,
2744            Barrier {
2745                epoch,
2746                mutation: None,
2747                ..
2748            } if epoch == expected_epoch
2749        ));
2750
2751        // join the 2nd left chunk
2752        let chunk = hash_join.next_unwrap_ready_chunk()?;
2753        assert_eq!(
2754            chunk,
2755            StreamChunk::from_pretty(
2756                " I I I I
2757                + 6 . 6 9"
2758            )
2759        );
2760
2761        // push the 2nd right chunk
2762        tx_r.push_chunk(chunk_r2);
2763        let chunk = hash_join.next_unwrap_ready_chunk()?;
2764        assert_eq!(
2765            chunk,
2766            StreamChunk::from_pretty(
2767                " I I I I
2768                + 3 8 3 10
2769                + 3 . 3 10
2770                + 6 . 6 11"
2771            )
2772        );
2773
2774        Ok(())
2775    }
2776
2777    #[tokio::test]
2778    async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2779        let chunk_l1 = StreamChunk::from_pretty(
2780            "  I I
2781             + 1 4
2782             + 2 5
2783             + 3 6",
2784        );
2785        let chunk_l2 = StreamChunk::from_pretty(
2786            "  I I
2787             + 3 8
2788             - 3 8",
2789        );
2790        let chunk_r1 = StreamChunk::from_pretty(
2791            "  I I
2792             + 2 7
2793             + 4 8
2794             + 6 9",
2795        );
2796        let chunk_r2 = StreamChunk::from_pretty(
2797            "  I  I
2798             + 3 10
2799             + 6 11",
2800        );
2801        let (mut tx_l, mut tx_r, mut hash_join) =
2802            create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2803
2804        // push the init barrier for left and right
2805        tx_l.push_barrier(test_epoch(1), false);
2806        tx_r.push_barrier(test_epoch(1), false);
2807        hash_join.next_unwrap_ready_barrier()?;
2808
2809        // push the 1st left chunk
2810        tx_l.push_chunk(chunk_l1);
2811        let chunk = hash_join.next_unwrap_ready_chunk()?;
2812        assert_eq!(
2813            chunk,
2814            StreamChunk::from_pretty(
2815                " I I I I
2816                + 1 4 . .
2817                + 2 5 . .
2818                + 3 6 . ."
2819            )
2820        );
2821
2822        // push the 2nd left chunk
2823        tx_l.push_chunk(chunk_l2);
2824        let chunk = hash_join.next_unwrap_ready_chunk()?;
2825        assert_eq!(
2826            chunk,
2827            StreamChunk::from_pretty(
2828                " I I I I
2829                + 3 8 . . D
2830                - 3 8 . . D"
2831            )
2832        );
2833
2834        // push the 1st right chunk
2835        tx_r.push_chunk(chunk_r1);
2836        let chunk = hash_join.next_unwrap_ready_chunk()?;
2837        assert_eq!(
2838            chunk,
2839            StreamChunk::from_pretty(
2840                " I I I I
2841                - 2 5 . .
2842                + 2 5 2 7"
2843            )
2844        );
2845
2846        // push the 2nd right chunk
2847        tx_r.push_chunk(chunk_r2);
2848        let chunk = hash_join.next_unwrap_ready_chunk()?;
2849        assert_eq!(
2850            chunk,
2851            StreamChunk::from_pretty(
2852                " I I I I
2853                - 3 6 . .
2854                + 3 6 3 10"
2855            )
2856        );
2857
2858        Ok(())
2859    }
2860
2861    #[tokio::test]
2862    async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2863        let chunk_l1 = StreamChunk::from_pretty(
2864            "  I I
2865             + 1 4
2866             + 2 5
2867             + . 6",
2868        );
2869        let chunk_l2 = StreamChunk::from_pretty(
2870            "  I I
2871             + . 8
2872             - . 8",
2873        );
2874        let chunk_r1 = StreamChunk::from_pretty(
2875            "  I I
2876             + 2 7
2877             + 4 8
2878             + 6 9",
2879        );
2880        let chunk_r2 = StreamChunk::from_pretty(
2881            "  I  I
2882             + . 10
2883             + 6 11",
2884        );
2885        let (mut tx_l, mut tx_r, mut hash_join) =
2886            create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2887
2888        // push the init barrier for left and right
2889        tx_l.push_barrier(test_epoch(1), false);
2890        tx_r.push_barrier(test_epoch(1), false);
2891        hash_join.next_unwrap_ready_barrier()?;
2892
2893        // push the 1st left chunk
2894        tx_l.push_chunk(chunk_l1);
2895        let chunk = hash_join.next_unwrap_ready_chunk()?;
2896        assert_eq!(
2897            chunk,
2898            StreamChunk::from_pretty(
2899                " I I I I
2900                + 1 4 . .
2901                + 2 5 . .
2902                + . 6 . ."
2903            )
2904        );
2905
2906        // push the 2nd left chunk
2907        tx_l.push_chunk(chunk_l2);
2908        let chunk = hash_join.next_unwrap_ready_chunk()?;
2909        assert_eq!(
2910            chunk,
2911            StreamChunk::from_pretty(
2912                " I I I I
2913                + . 8 . . D
2914                - . 8 . . D"
2915            )
2916        );
2917
2918        // push the 1st right chunk
2919        tx_r.push_chunk(chunk_r1);
2920        let chunk = hash_join.next_unwrap_ready_chunk()?;
2921        assert_eq!(
2922            chunk,
2923            StreamChunk::from_pretty(
2924                " I I I I
2925                - 2 5 . .
2926                + 2 5 2 7"
2927            )
2928        );
2929
2930        // push the 2nd right chunk
2931        tx_r.push_chunk(chunk_r2);
2932        let chunk = hash_join.next_unwrap_ready_chunk()?;
2933        assert_eq!(
2934            chunk,
2935            StreamChunk::from_pretty(
2936                " I I I I
2937                - . 6 . .
2938                + . 6 . 10"
2939            )
2940        );
2941
2942        Ok(())
2943    }
2944
2945    #[tokio::test]
2946    async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2947        let chunk_l1 = StreamChunk::from_pretty(
2948            "  I I
2949             + 1 4
2950             + 2 5
2951             + 3 6",
2952        );
2953        let chunk_l2 = StreamChunk::from_pretty(
2954            "  I I
2955             + 3 8
2956             - 3 8",
2957        );
2958        let chunk_r1 = StreamChunk::from_pretty(
2959            "  I I
2960             + 2 7
2961             + 4 8
2962             + 6 9",
2963        );
2964        let chunk_r2 = StreamChunk::from_pretty(
2965            "  I  I
2966             + 5 10
2967             - 5 10",
2968        );
2969        let (mut tx_l, mut tx_r, mut hash_join) =
2970            create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
2971
2972        // push the init barrier for left and right
2973        tx_l.push_barrier(test_epoch(1), false);
2974        tx_r.push_barrier(test_epoch(1), false);
2975        hash_join.next_unwrap_ready_barrier()?;
2976
2977        // push the 1st left chunk
2978        tx_l.push_chunk(chunk_l1);
2979        hash_join.next_unwrap_pending();
2980
2981        // push the 2nd left chunk
2982        tx_l.push_chunk(chunk_l2);
2983        hash_join.next_unwrap_pending();
2984
2985        // push the 1st right chunk
2986        tx_r.push_chunk(chunk_r1);
2987        let chunk = hash_join.next_unwrap_ready_chunk()?;
2988        assert_eq!(
2989            chunk,
2990            StreamChunk::from_pretty(
2991                " I I I I
2992                + 2 5 2 7
2993                + . . 4 8
2994                + . . 6 9"
2995            )
2996        );
2997
2998        // push the 2nd right chunk
2999        tx_r.push_chunk(chunk_r2);
3000        let chunk = hash_join.next_unwrap_ready_chunk()?;
3001        assert_eq!(
3002            chunk,
3003            StreamChunk::from_pretty(
3004                " I I I I
3005                + . . 5 10 D
3006                - . . 5 10 D"
3007            )
3008        );
3009
3010        Ok(())
3011    }
3012
3013    #[tokio::test]
3014    async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3015        let chunk_l1 = StreamChunk::from_pretty(
3016            "  I I I
3017             + 1 4 1
3018             + 2 5 2
3019             + 3 6 3",
3020        );
3021        let chunk_l2 = StreamChunk::from_pretty(
3022            "  I I I
3023             + 4 9 4
3024             + 5 10 5",
3025        );
3026        let chunk_r1 = StreamChunk::from_pretty(
3027            "  I I I
3028             + 2 5 1
3029             + 4 9 2
3030             + 6 9 3",
3031        );
3032        let chunk_r2 = StreamChunk::from_pretty(
3033            "  I I I
3034             + 1 4 4
3035             + 3 6 5",
3036        );
3037
3038        let (mut tx_l, mut tx_r, mut hash_join) =
3039            create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3040
3041        // push the init barrier for left and right
3042        tx_l.push_barrier(test_epoch(1), false);
3043        tx_r.push_barrier(test_epoch(1), false);
3044        hash_join.next_unwrap_ready_barrier()?;
3045
3046        // push the 1st left chunk
3047        tx_l.push_chunk(chunk_l1);
3048        let chunk = hash_join.next_unwrap_ready_chunk()?;
3049        assert_eq!(
3050            chunk,
3051            StreamChunk::from_pretty(
3052                " I I I I I I
3053                + 1 4 1 . . .
3054                + 2 5 2 . . .
3055                + 3 6 3 . . ."
3056            )
3057        );
3058
3059        // push the 2nd left chunk
3060        tx_l.push_chunk(chunk_l2);
3061        let chunk = hash_join.next_unwrap_ready_chunk()?;
3062        assert_eq!(
3063            chunk,
3064            StreamChunk::from_pretty(
3065                " I I I I I I
3066                + 4 9 4 . . .
3067                + 5 10 5 . . ."
3068            )
3069        );
3070
3071        // push the 1st right chunk
3072        tx_r.push_chunk(chunk_r1);
3073        let chunk = hash_join.next_unwrap_ready_chunk()?;
3074        assert_eq!(
3075            chunk,
3076            StreamChunk::from_pretty(
3077                " I I I I I I
3078                - 2 5 2 . . .
3079                + 2 5 2 2 5 1
3080                - 4 9 4 . . .
3081                + 4 9 4 4 9 2"
3082            )
3083        );
3084
3085        // push the 2nd right chunk
3086        tx_r.push_chunk(chunk_r2);
3087        let chunk = hash_join.next_unwrap_ready_chunk()?;
3088        assert_eq!(
3089            chunk,
3090            StreamChunk::from_pretty(
3091                " I I I I I I
3092                - 1 4 1 . . .
3093                + 1 4 1 1 4 4
3094                - 3 6 3 . . .
3095                + 3 6 3 3 6 5"
3096            )
3097        );
3098
3099        Ok(())
3100    }
3101
3102    #[tokio::test]
3103    async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3104        let chunk_l1 = StreamChunk::from_pretty(
3105            "  I I I
3106             + 1 4 1
3107             + 2 5 2
3108             + 3 6 3",
3109        );
3110        let chunk_l2 = StreamChunk::from_pretty(
3111            "  I I I
3112             + 4 9 4
3113             + 5 10 5",
3114        );
3115        let chunk_r1 = StreamChunk::from_pretty(
3116            "  I I I
3117             + 2 5 1
3118             + 4 9 2
3119             + 6 9 3",
3120        );
3121        let chunk_r2 = StreamChunk::from_pretty(
3122            "  I I I
3123             + 1 4 4
3124             + 3 6 5
3125             + 7 7 6",
3126        );
3127
3128        let (mut tx_l, mut tx_r, mut hash_join) =
3129            create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3130
3131        // push the init barrier for left and right
3132        tx_l.push_barrier(test_epoch(1), false);
3133        tx_r.push_barrier(test_epoch(1), false);
3134        hash_join.next_unwrap_ready_barrier()?;
3135
3136        // push the 1st left chunk
3137        tx_l.push_chunk(chunk_l1);
3138        hash_join.next_unwrap_pending();
3139
3140        // push the 2nd left chunk
3141        tx_l.push_chunk(chunk_l2);
3142        hash_join.next_unwrap_pending();
3143
3144        // push the 1st right chunk
3145        tx_r.push_chunk(chunk_r1);
3146        let chunk = hash_join.next_unwrap_ready_chunk()?;
3147        assert_eq!(
3148            chunk,
3149            StreamChunk::from_pretty(
3150                "  I I I I I I
3151                + 2 5 2 2 5 1
3152                + 4 9 4 4 9 2
3153                + . . . 6 9 3"
3154            )
3155        );
3156
3157        // push the 2nd right chunk
3158        tx_r.push_chunk(chunk_r2);
3159        let chunk = hash_join.next_unwrap_ready_chunk()?;
3160        assert_eq!(
3161            chunk,
3162            StreamChunk::from_pretty(
3163                "  I I I I I I
3164                + 1 4 1 1 4 4
3165                + 3 6 3 3 6 5
3166                + . . . 7 7 6"
3167            )
3168        );
3169
3170        Ok(())
3171    }
3172
3173    #[tokio::test]
3174    async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3175        let chunk_l1 = StreamChunk::from_pretty(
3176            "  I I
3177             + 1 4
3178             + 2 5
3179             + 3 6",
3180        );
3181        let chunk_l2 = StreamChunk::from_pretty(
3182            "  I I
3183             + 3 8
3184             - 3 8",
3185        );
3186        let chunk_r1 = StreamChunk::from_pretty(
3187            "  I I
3188             + 2 7
3189             + 4 8
3190             + 6 9",
3191        );
3192        let chunk_r2 = StreamChunk::from_pretty(
3193            "  I  I
3194             + 5 10
3195             - 5 10",
3196        );
3197        let (mut tx_l, mut tx_r, mut hash_join) =
3198            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3199
3200        // push the init barrier for left and right
3201        tx_l.push_barrier(test_epoch(1), false);
3202        tx_r.push_barrier(test_epoch(1), false);
3203        hash_join.next_unwrap_ready_barrier()?;
3204
3205        // push the 1st left chunk
3206        tx_l.push_chunk(chunk_l1);
3207        let chunk = hash_join.next_unwrap_ready_chunk()?;
3208        assert_eq!(
3209            chunk,
3210            StreamChunk::from_pretty(
3211                " I I I I
3212                + 1 4 . .
3213                + 2 5 . .
3214                + 3 6 . ."
3215            )
3216        );
3217
3218        // push the 2nd left chunk
3219        tx_l.push_chunk(chunk_l2);
3220        let chunk = hash_join.next_unwrap_ready_chunk()?;
3221        assert_eq!(
3222            chunk,
3223            StreamChunk::from_pretty(
3224                " I I I I
3225                + 3 8 . . D
3226                - 3 8 . . D"
3227            )
3228        );
3229
3230        // push the 1st right chunk
3231        tx_r.push_chunk(chunk_r1);
3232        let chunk = hash_join.next_unwrap_ready_chunk()?;
3233        assert_eq!(
3234            chunk,
3235            StreamChunk::from_pretty(
3236                " I I I I
3237                - 2 5 . .
3238                + 2 5 2 7
3239                + . . 4 8
3240                + . . 6 9"
3241            )
3242        );
3243
3244        // push the 2nd right chunk
3245        tx_r.push_chunk(chunk_r2);
3246        let chunk = hash_join.next_unwrap_ready_chunk()?;
3247        assert_eq!(
3248            chunk,
3249            StreamChunk::from_pretty(
3250                " I I I I
3251                + . . 5 10 D
3252                - . . 5 10 D"
3253            )
3254        );
3255
3256        Ok(())
3257    }
3258
3259    #[tokio::test]
3260    async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3261        let (mut tx_l, mut tx_r, mut hash_join) =
3262            create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3263
3264        // push the init barrier for left and right
3265        tx_l.push_barrier(test_epoch(1), false);
3266        tx_r.push_barrier(test_epoch(1), false);
3267        hash_join.next_unwrap_ready_barrier()?;
3268
3269        tx_l.push_chunk(StreamChunk::from_pretty(
3270            "  I I
3271             + 1 1
3272            ",
3273        ));
3274        let chunk = hash_join.next_unwrap_ready_chunk()?;
3275        assert_eq!(
3276            chunk,
3277            StreamChunk::from_pretty(
3278                " I I I I
3279                + 1 1 . ."
3280            )
3281        );
3282
3283        tx_r.push_chunk(StreamChunk::from_pretty(
3284            "  I I
3285             + 1 1
3286            ",
3287        ));
3288        let chunk = hash_join.next_unwrap_ready_chunk()?;
3289
3290        assert_eq!(
3291            chunk,
3292            StreamChunk::from_pretty(
3293                " I I I I
3294                - 1 1 . .
3295                + 1 1 1 1"
3296            )
3297        );
3298
3299        tx_l.push_chunk(StreamChunk::from_pretty(
3300            "   I I
3301              - 1 1
3302              + 1 2
3303            ",
3304        ));
3305        let chunk = hash_join.next_unwrap_ready_chunk()?;
3306        let chunk = chunk.compact();
3307        assert_eq!(
3308            chunk,
3309            StreamChunk::from_pretty(
3310                " I I I I
3311                - 1 1 1 1
3312                + 1 2 1 1
3313                "
3314            )
3315        );
3316
3317        Ok(())
3318    }
3319
3320    #[tokio::test]
3321    async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3322    {
3323        let chunk_l1 = StreamChunk::from_pretty(
3324            "  I I
3325             + 1 4
3326             + 2 5
3327             + 3 6
3328             + 3 7",
3329        );
3330        let chunk_l2 = StreamChunk::from_pretty(
3331            "  I I
3332             + 3 8
3333             - 3 8
3334             - 1 4", // delete row to cause an empty JoinHashEntry
3335        );
3336        let chunk_r1 = StreamChunk::from_pretty(
3337            "  I I
3338             + 2 6
3339             + 4 8
3340             + 3 4",
3341        );
3342        let chunk_r2 = StreamChunk::from_pretty(
3343            "  I  I
3344             + 5 10
3345             - 5 10
3346             + 1 2",
3347        );
3348        let (mut tx_l, mut tx_r, mut hash_join) =
3349            create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3350
3351        // push the init barrier for left and right
3352        tx_l.push_barrier(test_epoch(1), false);
3353        tx_r.push_barrier(test_epoch(1), false);
3354        hash_join.next_unwrap_ready_barrier()?;
3355
3356        // push the 1st left chunk
3357        tx_l.push_chunk(chunk_l1);
3358        let chunk = hash_join.next_unwrap_ready_chunk()?;
3359        assert_eq!(
3360            chunk,
3361            StreamChunk::from_pretty(
3362                " I I I I
3363                + 1 4 . .
3364                + 2 5 . .
3365                + 3 6 . .
3366                + 3 7 . ."
3367            )
3368        );
3369
3370        // push the 2nd left chunk
3371        tx_l.push_chunk(chunk_l2);
3372        let chunk = hash_join.next_unwrap_ready_chunk()?;
3373        assert_eq!(
3374            chunk,
3375            StreamChunk::from_pretty(
3376                " I I I I
3377                + 3 8 . . D
3378                - 3 8 . . D
3379                - 1 4 . ."
3380            )
3381        );
3382
3383        // push the 1st right chunk
3384        tx_r.push_chunk(chunk_r1);
3385        let chunk = hash_join.next_unwrap_ready_chunk()?;
3386        assert_eq!(
3387            chunk,
3388            StreamChunk::from_pretty(
3389                " I I I I
3390                - 2 5 . .
3391                + 2 5 2 6
3392                + . . 4 8
3393                + . . 3 4" /* regression test (#2420): 3 4 should be forwarded only once
3394                            * despite matching on eq join on 2
3395                            * entries */
3396            )
3397        );
3398
3399        // push the 2nd right chunk
3400        tx_r.push_chunk(chunk_r2);
3401        let chunk = hash_join.next_unwrap_ready_chunk()?;
3402        assert_eq!(
3403            chunk,
3404            StreamChunk::from_pretty(
3405                " I I I I
3406                + . . 5 10 D
3407                - . . 5 10 D
3408                + . . 1 2" /* regression test (#2420): 1 2 forwarded even if matches on an empty
3409                            * join entry */
3410            )
3411        );
3412
3413        Ok(())
3414    }
3415
3416    #[tokio::test]
3417    async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3418        let chunk_l1 = StreamChunk::from_pretty(
3419            "  I I
3420             + 1 4
3421             + 2 10
3422             + 3 6",
3423        );
3424        let chunk_l2 = StreamChunk::from_pretty(
3425            "  I I
3426             + 3 8
3427             - 3 8",
3428        );
3429        let chunk_r1 = StreamChunk::from_pretty(
3430            "  I I
3431             + 2 7
3432             + 4 8
3433             + 6 9",
3434        );
3435        let chunk_r2 = StreamChunk::from_pretty(
3436            "  I  I
3437             + 3 10
3438             + 6 11",
3439        );
3440        let (mut tx_l, mut tx_r, mut hash_join) =
3441            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3442
3443        // push the init barrier for left and right
3444        tx_l.push_barrier(test_epoch(1), false);
3445        tx_r.push_barrier(test_epoch(1), false);
3446        hash_join.next_unwrap_ready_barrier()?;
3447
3448        // push the 1st left chunk
3449        tx_l.push_chunk(chunk_l1);
3450        hash_join.next_unwrap_pending();
3451
3452        // push the 2nd left chunk
3453        tx_l.push_chunk(chunk_l2);
3454        hash_join.next_unwrap_pending();
3455
3456        // push the 1st right chunk
3457        tx_r.push_chunk(chunk_r1);
3458        hash_join.next_unwrap_pending();
3459
3460        // push the 2nd right chunk
3461        tx_r.push_chunk(chunk_r2);
3462        let chunk = hash_join.next_unwrap_ready_chunk()?;
3463        assert_eq!(
3464            chunk,
3465            StreamChunk::from_pretty(
3466                " I I I I
3467                + 3 6 3 10"
3468            )
3469        );
3470
3471        Ok(())
3472    }
3473
3474    #[tokio::test]
3475    async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3476        let (mut tx_l, mut tx_r, mut hash_join) =
3477            create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3478
3479        // push the init barrier for left and right
3480        tx_l.push_barrier(test_epoch(1), false);
3481        tx_r.push_barrier(test_epoch(1), false);
3482        hash_join.next_unwrap_ready_barrier()?;
3483
3484        tx_l.push_int64_watermark(0, 100);
3485
3486        tx_l.push_int64_watermark(0, 200);
3487
3488        tx_l.push_barrier(test_epoch(2), false);
3489        tx_r.push_barrier(test_epoch(2), false);
3490        hash_join.next_unwrap_ready_barrier()?;
3491
3492        tx_r.push_int64_watermark(0, 50);
3493
3494        let w1 = hash_join.next().await.unwrap().unwrap();
3495        let w1 = w1.as_watermark().unwrap();
3496
3497        let w2 = hash_join.next().await.unwrap().unwrap();
3498        let w2 = w2.as_watermark().unwrap();
3499
3500        tx_r.push_int64_watermark(0, 100);
3501
3502        let w3 = hash_join.next().await.unwrap().unwrap();
3503        let w3 = w3.as_watermark().unwrap();
3504
3505        let w4 = hash_join.next().await.unwrap().unwrap();
3506        let w4 = w4.as_watermark().unwrap();
3507
3508        assert_eq!(
3509            w1,
3510            &Watermark {
3511                col_idx: 2,
3512                data_type: DataType::Int64,
3513                val: ScalarImpl::Int64(50)
3514            }
3515        );
3516
3517        assert_eq!(
3518            w2,
3519            &Watermark {
3520                col_idx: 0,
3521                data_type: DataType::Int64,
3522                val: ScalarImpl::Int64(50)
3523            }
3524        );
3525
3526        assert_eq!(
3527            w3,
3528            &Watermark {
3529                col_idx: 2,
3530                data_type: DataType::Int64,
3531                val: ScalarImpl::Int64(100)
3532            }
3533        );
3534
3535        assert_eq!(
3536            w4,
3537            &Watermark {
3538                col_idx: 0,
3539                data_type: DataType::Int64,
3540                val: ScalarImpl::Int64(100)
3541            }
3542        );
3543
3544        Ok(())
3545    }
3546}