risingwave_stream/executor/
asof_join.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::time::Duration;
19
20use either::Either;
21use itertools::Itertools;
22use multimap::MultiMap;
23use risingwave_common::array::Op;
24use risingwave_common::hash::{HashKey, NullBitmap};
25use risingwave_common::util::epoch::EpochPair;
26use risingwave_common::util::iter_util::ZipEqDebug;
27use tokio::time::Instant;
28
29use self::builder::JoinChunkBuilder;
30use super::barrier_align::*;
31use super::join::hash_join::*;
32use super::join::*;
33use super::watermark::*;
34use crate::executor::join::builder::JoinStreamChunkBuilder;
35use crate::executor::join::row::JoinEncoding;
36use crate::executor::prelude::*;
37
38fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
39    HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
40}
41
42pub struct JoinParams {
43    /// Indices of the join keys
44    pub join_key_indices: Vec<usize>,
45    /// Indices of the input pk after dedup
46    pub deduped_pk_indices: Vec<usize>,
47}
48
49impl JoinParams {
50    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
51        Self {
52            join_key_indices,
53            deduped_pk_indices,
54        }
55    }
56}
57
58struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
59    /// Store all data from a one side stream
60    ht: JoinHashMap<K, S, E>,
61    /// Indices of the join key columns
62    join_key_indices: Vec<usize>,
63    /// The data type of all columns without degree.
64    all_data_types: Vec<DataType>,
65    /// The start position for the side in output new columns
66    start_pos: usize,
67    /// The mapping from input indices of a side to output columns.
68    i2o_mapping: Vec<(usize, usize)>,
69    i2o_mapping_indexed: MultiMap<usize, usize>,
70    /// Whether degree table is needed for this side.
71    need_degree_table: bool,
72    _marker: std::marker::PhantomData<E>,
73}
74
75impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("JoinSide")
78            .field("join_key_indices", &self.join_key_indices)
79            .field("col_types", &self.all_data_types)
80            .field("start_pos", &self.start_pos)
81            .field("i2o_mapping", &self.i2o_mapping)
82            .field("need_degree_table", &self.need_degree_table)
83            .finish()
84    }
85}
86
87impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
88    // WARNING: Please do not call this until we implement it.
89    fn is_dirty(&self) -> bool {
90        unimplemented!()
91    }
92
93    #[expect(dead_code)]
94    fn clear_cache(&mut self) {
95        assert!(
96            !self.is_dirty(),
97            "cannot clear cache while states of hash join are dirty"
98        );
99
100        // TODO: not working with rearranged chain
101        // self.ht.clear();
102    }
103
104    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
105        self.ht.init(epoch).await
106    }
107}
108
109/// `AsOfJoinExecutor` takes two input streams and runs equal hash join on them.
110/// The output columns are the concatenation of left and right columns.
111pub struct AsOfJoinExecutor<
112    K: HashKey,
113    S: StateStore,
114    const T: AsOfJoinTypePrimitive,
115    E: JoinEncoding,
116> {
117    ctx: ActorContextRef,
118    info: ExecutorInfo,
119
120    /// Left input executor
121    input_l: Option<Executor>,
122    /// Right input executor
123    input_r: Option<Executor>,
124    /// The data types of the formed new columns
125    actual_output_data_types: Vec<DataType>,
126    /// The parameters of the left join executor
127    side_l: JoinSide<K, S, E>,
128    /// The parameters of the right join executor
129    side_r: JoinSide<K, S, E>,
130
131    metrics: Arc<StreamingMetrics>,
132    /// The maximum size of the chunk produced by executor at a time
133    chunk_size: usize,
134    /// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
135    cnt_rows_received: u32,
136
137    /// watermark column index -> `BufferedWatermarks`
138    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
139
140    high_join_amplification_threshold: usize,
141    /// Number of processed rows between periodic manual evictions of the join cache.
142    join_cache_evict_interval_rows: u32,
143    /// `AsOf` join description
144    asof_desc: AsOfDesc,
145}
146
147impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
148    for AsOfJoinExecutor<K, S, T, E>
149{
150    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151        f.debug_struct("AsOfJoinExecutor")
152            .field("join_type", &T)
153            .field("input_left", &self.input_l.as_ref().unwrap().identity())
154            .field("input_right", &self.input_r.as_ref().unwrap().identity())
155            .field("side_l", &self.side_l)
156            .field("side_r", &self.side_r)
157            .field("stream_key", &self.info.stream_key)
158            .field("schema", &self.info.schema)
159            .field("actual_output_data_types", &self.actual_output_data_types)
160            .field(
161                "join_cache_evict_interval_rows",
162                &self.join_cache_evict_interval_rows,
163            )
164            .finish()
165    }
166}
167
168impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive, E: JoinEncoding> Execute
169    for AsOfJoinExecutor<K, S, T, E>
170{
171    fn execute(self: Box<Self>) -> BoxedMessageStream {
172        self.into_stream().boxed()
173    }
174}
175
176struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
177    ctx: &'a ActorContextRef,
178    side_l: &'a mut JoinSide<K, S, E>,
179    side_r: &'a mut JoinSide<K, S, E>,
180    asof_desc: &'a AsOfDesc,
181    actual_output_data_types: &'a [DataType],
182    // inequality_watermarks: &'a Watermark,
183    chunk: StreamChunk,
184    chunk_size: usize,
185    cnt_rows_received: &'a mut u32,
186    high_join_amplification_threshold: usize,
187    join_cache_evict_interval_rows: u32,
188}
189
190impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive, E: JoinEncoding>
191    AsOfJoinExecutor<K, S, T, E>
192{
193    #[allow(clippy::too_many_arguments)]
194    pub fn new(
195        ctx: ActorContextRef,
196        info: ExecutorInfo,
197        input_l: Executor,
198        input_r: Executor,
199        params_l: JoinParams,
200        params_r: JoinParams,
201        null_safe: Vec<bool>,
202        output_indices: Vec<usize>,
203        state_table_l: StateTable<S>,
204        state_table_r: StateTable<S>,
205        watermark_epoch: AtomicU64Ref,
206        metrics: Arc<StreamingMetrics>,
207        chunk_size: usize,
208        high_join_amplification_threshold: usize,
209        asof_desc: AsOfDesc,
210    ) -> Self {
211        let join_cache_evict_interval_rows = ctx
212            .config
213            .developer
214            .join_hash_map_evict_interval_rows
215            .max(1);
216        let side_l_column_n = input_l.schema().len();
217
218        let schema_fields = [
219            input_l.schema().fields.clone(),
220            input_r.schema().fields.clone(),
221        ]
222        .concat();
223
224        let original_output_data_types = schema_fields
225            .iter()
226            .map(|field| field.data_type())
227            .collect_vec();
228        let actual_output_data_types = output_indices
229            .iter()
230            .map(|&idx| original_output_data_types[idx].clone())
231            .collect_vec();
232
233        // Data types of of hash join state.
234        let state_all_data_types_l = input_l.schema().data_types();
235        let state_all_data_types_r = input_r.schema().data_types();
236
237        let state_pk_indices_l = input_l.stream_key().to_vec();
238        let state_pk_indices_r = input_r.stream_key().to_vec();
239
240        let state_join_key_indices_l = params_l.join_key_indices;
241        let state_join_key_indices_r = params_r.join_key_indices;
242
243        // If pk is contained in join key.
244        let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
245        let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
246
247        let join_key_data_types_l = state_join_key_indices_l
248            .iter()
249            .map(|idx| state_all_data_types_l[*idx].clone())
250            .collect_vec();
251
252        let join_key_data_types_r = state_join_key_indices_r
253            .iter()
254            .map(|idx| state_all_data_types_r[*idx].clone())
255            .collect_vec();
256
257        assert_eq!(join_key_data_types_l, join_key_data_types_r);
258
259        let null_matched = K::Bitmap::from_bool_vec(null_safe);
260
261        let (left_to_output, right_to_output) = {
262            let (left_len, right_len) = if is_left_semi_or_anti(T) {
263                (state_all_data_types_l.len(), 0usize)
264            } else if is_right_semi_or_anti(T) {
265                (0usize, state_all_data_types_r.len())
266            } else {
267                (state_all_data_types_l.len(), state_all_data_types_r.len())
268            };
269            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
270        };
271
272        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
273        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
274
275        // handle inequality watermarks
276        // https://github.com/risingwavelabs/risingwave/issues/18503
277        // let inequality_watermarks = None;
278        let watermark_buffers = BTreeMap::new();
279
280        let inequal_key_idx_l = Some(asof_desc.left_idx);
281        let inequal_key_idx_r = Some(asof_desc.right_idx);
282
283        Self {
284            ctx: ctx.clone(),
285            info,
286            input_l: Some(input_l),
287            input_r: Some(input_r),
288            actual_output_data_types,
289            side_l: JoinSide {
290                ht: JoinHashMap::new(
291                    watermark_epoch.clone(),
292                    join_key_data_types_l,
293                    state_join_key_indices_l.clone(),
294                    state_all_data_types_l.clone(),
295                    state_table_l,
296                    params_l.deduped_pk_indices,
297                    None,
298                    null_matched.clone(),
299                    pk_contained_in_jk_l,
300                    inequal_key_idx_l,
301                    metrics.clone(),
302                    ctx.id,
303                    ctx.fragment_id,
304                    "left",
305                ),
306                join_key_indices: state_join_key_indices_l,
307                all_data_types: state_all_data_types_l,
308                i2o_mapping: left_to_output,
309                i2o_mapping_indexed: l2o_indexed,
310                start_pos: 0,
311                need_degree_table: false,
312                _marker: PhantomData,
313            },
314            side_r: JoinSide {
315                ht: JoinHashMap::new(
316                    watermark_epoch,
317                    join_key_data_types_r,
318                    state_join_key_indices_r.clone(),
319                    state_all_data_types_r.clone(),
320                    state_table_r,
321                    params_r.deduped_pk_indices,
322                    None,
323                    null_matched,
324                    pk_contained_in_jk_r,
325                    inequal_key_idx_r,
326                    metrics.clone(),
327                    ctx.id,
328                    ctx.fragment_id,
329                    "right",
330                ),
331                join_key_indices: state_join_key_indices_r,
332                all_data_types: state_all_data_types_r,
333                start_pos: side_l_column_n,
334                i2o_mapping: right_to_output,
335                i2o_mapping_indexed: r2o_indexed,
336                need_degree_table: false,
337                _marker: PhantomData,
338            },
339            metrics,
340            chunk_size,
341            cnt_rows_received: 0,
342            watermark_buffers,
343            high_join_amplification_threshold,
344            join_cache_evict_interval_rows,
345            asof_desc,
346        }
347    }
348
349    #[try_stream(ok = Message, error = StreamExecutorError)]
350    async fn into_stream(mut self) {
351        let input_l = self.input_l.take().unwrap();
352        let input_r = self.input_r.take().unwrap();
353        let aligned_stream = barrier_align(
354            input_l.execute(),
355            input_r.execute(),
356            self.ctx.id,
357            self.ctx.fragment_id,
358            self.metrics.clone(),
359            "Join",
360        );
361        pin_mut!(aligned_stream);
362        let actor_id = self.ctx.id;
363
364        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
365        let first_epoch = barrier.epoch;
366        // The first barrier message should be propagated.
367        yield Message::Barrier(barrier);
368        self.side_l.init(first_epoch).await?;
369        self.side_r.init(first_epoch).await?;
370
371        let actor_id_str = self.ctx.id.to_string();
372        let fragment_id_str = self.ctx.fragment_id.to_string();
373
374        // initialized some metrics
375        let join_actor_input_waiting_duration_ns = self
376            .metrics
377            .join_actor_input_waiting_duration_ns
378            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
379        let left_join_match_duration_ns = self
380            .metrics
381            .join_match_duration_ns
382            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
383        let right_join_match_duration_ns = self
384            .metrics
385            .join_match_duration_ns
386            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
387
388        let barrier_join_match_duration_ns = self
389            .metrics
390            .join_match_duration_ns
391            .with_guarded_label_values(&[
392                actor_id_str.as_str(),
393                fragment_id_str.as_str(),
394                "barrier",
395            ]);
396
397        let left_join_cached_entry_count = self
398            .metrics
399            .join_cached_entry_count
400            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
401
402        let right_join_cached_entry_count = self
403            .metrics
404            .join_cached_entry_count
405            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
406
407        let mut start_time = Instant::now();
408
409        while let Some(msg) = aligned_stream
410            .next()
411            .instrument_await("hash_join_barrier_align")
412            .await
413        {
414            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
415            match msg? {
416                AlignedMessage::WatermarkLeft(watermark) => {
417                    for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
418                        yield Message::Watermark(watermark_to_emit);
419                    }
420                }
421                AlignedMessage::WatermarkRight(watermark) => {
422                    for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
423                        yield Message::Watermark(watermark_to_emit);
424                    }
425                }
426                AlignedMessage::Left(chunk) => {
427                    let mut left_time = Duration::from_nanos(0);
428                    let mut left_start_time = Instant::now();
429                    #[for_await]
430                    for chunk in Self::eq_join_left(EqJoinArgs {
431                        ctx: &self.ctx,
432                        side_l: &mut self.side_l,
433                        side_r: &mut self.side_r,
434                        asof_desc: &self.asof_desc,
435                        actual_output_data_types: &self.actual_output_data_types,
436                        // inequality_watermarks: &self.inequality_watermarks,
437                        chunk,
438                        chunk_size: self.chunk_size,
439                        cnt_rows_received: &mut self.cnt_rows_received,
440                        high_join_amplification_threshold: self.high_join_amplification_threshold,
441                        join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
442                    }) {
443                        left_time += left_start_time.elapsed();
444                        yield Message::Chunk(chunk?);
445                        left_start_time = Instant::now();
446                    }
447                    left_time += left_start_time.elapsed();
448                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
449                    self.try_flush_data().await?;
450                }
451                AlignedMessage::Right(chunk) => {
452                    let mut right_time = Duration::from_nanos(0);
453                    let mut right_start_time = Instant::now();
454                    #[for_await]
455                    for chunk in Self::eq_join_right(EqJoinArgs {
456                        ctx: &self.ctx,
457                        side_l: &mut self.side_l,
458                        side_r: &mut self.side_r,
459                        asof_desc: &self.asof_desc,
460                        actual_output_data_types: &self.actual_output_data_types,
461                        // inequality_watermarks: &self.inequality_watermarks,
462                        chunk,
463                        chunk_size: self.chunk_size,
464                        cnt_rows_received: &mut self.cnt_rows_received,
465                        high_join_amplification_threshold: self.high_join_amplification_threshold,
466                        join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
467                    }) {
468                        right_time += right_start_time.elapsed();
469                        yield Message::Chunk(chunk?);
470                        right_start_time = Instant::now();
471                    }
472                    right_time += right_start_time.elapsed();
473                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
474                    self.try_flush_data().await?;
475                }
476                AlignedMessage::Barrier(barrier) => {
477                    let barrier_start_time = Instant::now();
478                    let (left_post_commit, right_post_commit) =
479                        self.flush_data(barrier.epoch).await?;
480
481                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
482                    yield Message::Barrier(barrier);
483
484                    // Update the vnode bitmap for state tables of both sides if asked.
485                    right_post_commit
486                        .post_yield_barrier(update_vnode_bitmap.clone())
487                        .await?;
488                    if left_post_commit
489                        .post_yield_barrier(update_vnode_bitmap)
490                        .await?
491                        .unwrap_or(false)
492                    {
493                        self.watermark_buffers
494                            .values_mut()
495                            .for_each(|buffers| buffers.clear());
496                    }
497
498                    // Report metrics of cached join rows/entries
499                    for (join_cached_entry_count, ht) in [
500                        (&left_join_cached_entry_count, &self.side_l.ht),
501                        (&right_join_cached_entry_count, &self.side_r.ht),
502                    ] {
503                        join_cached_entry_count.set(ht.entry_count() as i64);
504                    }
505
506                    barrier_join_match_duration_ns
507                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
508                }
509            }
510            start_time = Instant::now();
511        }
512    }
513
514    async fn flush_data(
515        &mut self,
516        epoch: EpochPair,
517    ) -> StreamExecutorResult<(
518        JoinHashMapPostCommit<'_, K, S, E>,
519        JoinHashMapPostCommit<'_, K, S, E>,
520    )> {
521        // All changes to the state has been buffered in the mem-table of the state table. Just
522        // `commit` them here.
523        let left = self.side_l.ht.flush(epoch).await?;
524        let right = self.side_r.ht.flush(epoch).await?;
525        Ok((left, right))
526    }
527
528    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
529        // All changes to the state has been buffered in the mem-table of the state table. Just
530        // `commit` them here.
531        self.side_l.ht.try_flush().await?;
532        self.side_r.ht.try_flush().await?;
533        Ok(())
534    }
535
536    // We need to manually evict the cache.
537    fn evict_cache(
538        side_update: &mut JoinSide<K, S, E>,
539        side_match: &mut JoinSide<K, S, E>,
540        cnt_rows_received: &mut u32,
541        join_cache_evict_interval_rows: u32,
542    ) {
543        *cnt_rows_received += 1;
544        if *cnt_rows_received >= join_cache_evict_interval_rows {
545            side_update.ht.evict();
546            side_match.ht.evict();
547            *cnt_rows_received = 0;
548        }
549    }
550
551    fn handle_watermark(
552        &mut self,
553        side: SideTypePrimitive,
554        watermark: Watermark,
555    ) -> StreamExecutorResult<Vec<Watermark>> {
556        let (side_update, side_match) = if side == SideType::Left {
557            (&mut self.side_l, &mut self.side_r)
558        } else {
559            (&mut self.side_r, &mut self.side_l)
560        };
561
562        // State cleaning
563        if side_update.join_key_indices[0] == watermark.col_idx {
564            side_match.ht.update_watermark(watermark.val.clone());
565        }
566
567        // Select watermarks to yield.
568        let wm_in_jk = side_update
569            .join_key_indices
570            .iter()
571            .positions(|idx| *idx == watermark.col_idx);
572        let mut watermarks_to_emit = vec![];
573        for idx in wm_in_jk {
574            let buffers = self
575                .watermark_buffers
576                .entry(idx)
577                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
578            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
579                let empty_indices = vec![];
580                let output_indices = side_update
581                    .i2o_mapping_indexed
582                    .get_vec(&side_update.join_key_indices[idx])
583                    .unwrap_or(&empty_indices)
584                    .iter()
585                    .chain(
586                        side_match
587                            .i2o_mapping_indexed
588                            .get_vec(&side_match.join_key_indices[idx])
589                            .unwrap_or(&empty_indices),
590                    );
591                for output_idx in output_indices {
592                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
593                }
594            };
595        }
596        Ok(watermarks_to_emit)
597    }
598
599    /// the data the hash table and match the coming
600    /// data chunk with the executor state
601    async fn hash_eq_match(
602        key: &K,
603        ht: &mut JoinHashMap<K, S, E>,
604    ) -> StreamExecutorResult<Option<HashValueType<E>>> {
605        if !key.null_bitmap().is_subset(ht.null_matched()) {
606            Ok(None)
607        } else {
608            ht.take_state(key).await.map(Some)
609        }
610    }
611
612    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
613    async fn eq_join_left(args: EqJoinArgs<'_, K, S, E>) {
614        let EqJoinArgs {
615            ctx: _,
616            side_l,
617            side_r,
618            asof_desc,
619            actual_output_data_types,
620            // inequality_watermarks,
621            chunk,
622            chunk_size,
623            cnt_rows_received,
624            high_join_amplification_threshold: _,
625            join_cache_evict_interval_rows,
626        } = args;
627
628        let (side_update, side_match) = (side_l, side_r);
629
630        let mut join_chunk_builder =
631            JoinChunkBuilder::<T, { SideType::Left }>::new(JoinStreamChunkBuilder::new(
632                chunk_size,
633                actual_output_data_types.to_vec(),
634                side_update.i2o_mapping.clone(),
635                side_match.i2o_mapping.clone(),
636            ));
637
638        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
639        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
640            let Some((op, row)) = r else {
641                continue;
642            };
643            Self::evict_cache(
644                side_update,
645                side_match,
646                cnt_rows_received,
647                join_cache_evict_interval_rows,
648            );
649
650            let matched_rows = if !side_update.ht.check_inequal_key_null(&row) {
651                Self::hash_eq_match(key, &mut side_match.ht).await?
652            } else {
653                None
654            };
655            let inequal_key = side_update.ht.serialize_inequal_key_from_row(row);
656
657            if let Some(matched_rows) = matched_rows {
658                let matched_row_by_inequality = match asof_desc.inequality_type {
659                    AsOfInequalityType::Lt => matched_rows.lower_bound_by_inequality(
660                        Bound::Excluded(&inequal_key),
661                        &side_match.all_data_types,
662                    ),
663                    AsOfInequalityType::Le => matched_rows.lower_bound_by_inequality(
664                        Bound::Included(&inequal_key),
665                        &side_match.all_data_types,
666                    ),
667                    AsOfInequalityType::Gt => matched_rows.upper_bound_by_inequality(
668                        Bound::Excluded(&inequal_key),
669                        &side_match.all_data_types,
670                    ),
671                    AsOfInequalityType::Ge => matched_rows.upper_bound_by_inequality(
672                        Bound::Included(&inequal_key),
673                        &side_match.all_data_types,
674                    ),
675                };
676                match op {
677                    Op::Insert | Op::UpdateInsert => {
678                        if let Some(matched_row_by_inequality) = matched_row_by_inequality {
679                            let matched_row = matched_row_by_inequality?;
680
681                            if let Some(chunk) =
682                                join_chunk_builder.with_match_on_insert(&row, &matched_row)
683                            {
684                                yield chunk;
685                            }
686                        } else if let Some(chunk) =
687                            join_chunk_builder.forward_if_not_matched(Op::Insert, row)
688                        {
689                            yield chunk;
690                        }
691                        side_update.ht.insert_row(key, row)?;
692                    }
693                    Op::Delete | Op::UpdateDelete => {
694                        if let Some(matched_row_by_inequality) = matched_row_by_inequality {
695                            let matched_row = matched_row_by_inequality?;
696
697                            if let Some(chunk) =
698                                join_chunk_builder.with_match_on_delete(&row, &matched_row)
699                            {
700                                yield chunk;
701                            }
702                        } else if let Some(chunk) =
703                            join_chunk_builder.forward_if_not_matched(Op::Delete, row)
704                        {
705                            yield chunk;
706                        }
707                        side_update.ht.delete_row(key, row)?;
708                    }
709                }
710                // Insert back the state taken from ht.
711                side_match.ht.update_state(key, matched_rows);
712            } else {
713                // Row which violates null-safe bitmap will never be matched so we need not
714                // store.
715                match op {
716                    Op::Insert | Op::UpdateInsert => {
717                        if let Some(chunk) =
718                            join_chunk_builder.forward_if_not_matched(Op::Insert, row)
719                        {
720                            yield chunk;
721                        }
722                    }
723                    Op::Delete | Op::UpdateDelete => {
724                        if let Some(chunk) =
725                            join_chunk_builder.forward_if_not_matched(Op::Delete, row)
726                        {
727                            yield chunk;
728                        }
729                    }
730                }
731            }
732        }
733        if let Some(chunk) = join_chunk_builder.take() {
734            yield chunk;
735        }
736    }
737
738    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
739    async fn eq_join_right(args: EqJoinArgs<'_, K, S, E>) {
740        let EqJoinArgs {
741            ctx,
742            side_l,
743            side_r,
744            asof_desc,
745            actual_output_data_types,
746            // inequality_watermarks,
747            chunk,
748            chunk_size,
749            cnt_rows_received,
750            high_join_amplification_threshold,
751            join_cache_evict_interval_rows,
752        } = args;
753
754        let (side_update, side_match) = (side_r, side_l);
755
756        let mut join_chunk_builder = JoinStreamChunkBuilder::new(
757            chunk_size,
758            actual_output_data_types.to_vec(),
759            side_update.i2o_mapping.clone(),
760            side_match.i2o_mapping.clone(),
761        );
762
763        let join_matched_rows_metrics = ctx
764            .streaming_metrics
765            .join_matched_join_keys
766            .with_guarded_label_values(&[
767                &ctx.id.to_string(),
768                &ctx.fragment_id.to_string(),
769                &side_update.ht.table_id().to_string(),
770            ]);
771
772        let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
773        for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
774            let Some((op, row)) = r else {
775                continue;
776            };
777            let mut join_matched_rows_cnt = 0;
778
779            Self::evict_cache(
780                side_update,
781                side_match,
782                cnt_rows_received,
783                join_cache_evict_interval_rows,
784            );
785
786            let matched_rows = if !side_update.ht.check_inequal_key_null(&row) {
787                Self::hash_eq_match(key, &mut side_match.ht).await?
788            } else {
789                None
790            };
791            let inequal_key = side_update.ht.serialize_inequal_key_from_row(row);
792
793            if let Some(matched_rows) = matched_rows {
794                let update_rows = Self::hash_eq_match(key, &mut side_update.ht).await?.expect("None is not expected because we have checked null in key when getting matched_rows");
795                let right_inequality_index = update_rows.inequality_index();
796                let (row_to_delete_r, row_to_insert_r) =
797                    if let Some(pks) = right_inequality_index.get(&inequal_key) {
798                        assert!(!pks.is_empty());
799                        let row_pk = side_update.ht.serialize_pk_from_row(row);
800                        match op {
801                            Op::Insert | Op::UpdateInsert => {
802                                // If there are multiple rows match the inequality key in the right table, we use one with smallest pk.
803                                let smallest_pk = pks.first_key_sorted().unwrap();
804                                if smallest_pk > &row_pk {
805                                    // smallest_pk is in the cache index, so it must exist in the cache.
806                                    if let Some(to_delete_row) = update_rows
807                                        .get_by_indexed_pk(smallest_pk, &side_update.all_data_types)
808                                    {
809                                        (
810                                            Some(Either::Left(to_delete_row?.row)),
811                                            Some(Either::Right(row)),
812                                        )
813                                    } else {
814                                        // Something wrong happened. Ignore this row in non strict consistency mode.
815                                        (None, None)
816                                    }
817                                } else {
818                                    // No affected row in the right table.
819                                    (None, None)
820                                }
821                            }
822                            Op::Delete | Op::UpdateDelete => {
823                                let smallest_pk = pks.first_key_sorted().unwrap();
824                                if smallest_pk == &row_pk {
825                                    if let Some(second_smallest_pk) = pks.second_key_sorted() {
826                                        if let Some(to_insert_row) = update_rows.get_by_indexed_pk(
827                                            second_smallest_pk,
828                                            &side_update.all_data_types,
829                                        ) {
830                                            (
831                                                Some(Either::Right(row)),
832                                                Some(Either::Left(to_insert_row?.row)),
833                                            )
834                                        } else {
835                                            // Something wrong happened. Ignore this row in non strict consistency mode.
836                                            (None, None)
837                                        }
838                                    } else {
839                                        (Some(Either::Right(row)), None)
840                                    }
841                                } else {
842                                    // No affected row in the right table.
843                                    (None, None)
844                                }
845                            }
846                        }
847                    } else {
848                        match op {
849                            // Decide the row_to_delete later
850                            Op::Insert | Op::UpdateInsert => (None, Some(Either::Right(row))),
851                            // Decide the row_to_insert later
852                            Op::Delete | Op::UpdateDelete => (Some(Either::Right(row)), None),
853                        }
854                    };
855
856                // 4 cases for row_to_delete_r and row_to_insert_r:
857                // 1. Some(_), Some(_): delete row_to_delete_r and insert row_to_insert_r
858                // 2. None, Some(_)   : row_to_delete to be decided by the nearest inequality key
859                // 3. Some(_), None   : row_to_insert to be decided by the nearest inequality key
860                // 4. None, None      : do nothing
861                if row_to_delete_r.is_none() && row_to_insert_r.is_none() {
862                    // no row to delete or insert.
863                } else {
864                    let prev_inequality_key =
865                        right_inequality_index.upper_bound_key(Bound::Excluded(&inequal_key));
866                    let next_inequality_key =
867                        right_inequality_index.lower_bound_key(Bound::Excluded(&inequal_key));
868                    let affected_row_r = match asof_desc.inequality_type {
869                        AsOfInequalityType::Lt | AsOfInequalityType::Le => next_inequality_key
870                            .and_then(|k| {
871                                update_rows.get_first_by_inequality(k, &side_update.all_data_types)
872                            }),
873                        AsOfInequalityType::Gt | AsOfInequalityType::Ge => prev_inequality_key
874                            .and_then(|k| {
875                                update_rows.get_first_by_inequality(k, &side_update.all_data_types)
876                            }),
877                    }
878                    .transpose()?
879                    .map(|r| Either::Left(r.row));
880
881                    let (row_to_delete_r, row_to_insert_r) =
882                        match (&row_to_delete_r, &row_to_insert_r) {
883                            (Some(_), Some(_)) => (row_to_delete_r, row_to_insert_r),
884                            (None, Some(_)) => (affected_row_r, row_to_insert_r),
885                            (Some(_), None) => (row_to_delete_r, affected_row_r),
886                            (None, None) => unreachable!(),
887                        };
888                    let range = match asof_desc.inequality_type {
889                        AsOfInequalityType::Lt => (
890                            prev_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Included),
891                            Bound::Excluded(&inequal_key),
892                        ),
893                        AsOfInequalityType::Le => (
894                            prev_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Excluded),
895                            Bound::Included(&inequal_key),
896                        ),
897                        AsOfInequalityType::Gt => (
898                            Bound::Excluded(&inequal_key),
899                            next_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Included),
900                        ),
901                        AsOfInequalityType::Ge => (
902                            Bound::Included(&inequal_key),
903                            next_inequality_key.map_or_else(|| Bound::Unbounded, Bound::Excluded),
904                        ),
905                    };
906
907                    let rows_l =
908                        matched_rows.range_by_inequality(range, &side_match.all_data_types);
909                    for row_l in rows_l {
910                        join_matched_rows_cnt += 1;
911                        let row_l = row_l?.row;
912                        if let Some(row_to_delete_r) = &row_to_delete_r {
913                            if let Some(chunk) =
914                                join_chunk_builder.append_row(Op::Delete, row_to_delete_r, &row_l)
915                            {
916                                yield chunk;
917                            }
918                        } else if is_as_of_left_outer(T)
919                            && let Some(chunk) =
920                                join_chunk_builder.append_row_matched(Op::Delete, &row_l)
921                        {
922                            yield chunk;
923                        }
924                        if let Some(row_to_insert_r) = &row_to_insert_r {
925                            if let Some(chunk) =
926                                join_chunk_builder.append_row(Op::Insert, row_to_insert_r, &row_l)
927                            {
928                                yield chunk;
929                            }
930                        } else if is_as_of_left_outer(T)
931                            && let Some(chunk) =
932                                join_chunk_builder.append_row_matched(Op::Insert, &row_l)
933                        {
934                            yield chunk;
935                        }
936                    }
937                }
938                // Insert back the state taken from ht.
939                side_match.ht.update_state(key, matched_rows);
940                side_update.ht.update_state(key, update_rows);
941
942                match op {
943                    Op::Insert | Op::UpdateInsert => {
944                        side_update.ht.insert_row(key, row)?;
945                    }
946                    Op::Delete | Op::UpdateDelete => {
947                        side_update.ht.delete_row(key, row)?;
948                    }
949                }
950            } else {
951                // Row which violates null-safe bitmap will never be matched so we need not
952                // store.
953                // Noop here because we only support left outer AsOf join.
954            }
955            join_matched_rows_metrics.observe(join_matched_rows_cnt as _);
956            if join_matched_rows_cnt > high_join_amplification_threshold {
957                let join_key_data_types = side_update.ht.join_key_data_types();
958                let key = key.deserialize(join_key_data_types)?;
959                tracing::warn!(target: "high_join_amplification",
960                    matched_rows_len = join_matched_rows_cnt,
961                    update_table_id = %side_update.ht.table_id(),
962                    match_table_id = %side_match.ht.table_id(),
963                    join_key = ?key,
964                    actor_id = %ctx.id,
965                    fragment_id = %ctx.fragment_id,
966                    "large rows matched for join key when AsOf join updating right side",
967                );
968            }
969        }
970        if let Some(chunk) = join_chunk_builder.take() {
971            yield chunk;
972        }
973    }
974}
975
976#[cfg(test)]
977mod tests {
978    use std::sync::atomic::AtomicU64;
979
980    use risingwave_common::array::*;
981    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
982    use risingwave_common::hash::Key64;
983    use risingwave_common::util::epoch::test_epoch;
984    use risingwave_common::util::sort_util::OrderType;
985    use risingwave_storage::memory::MemoryStateStore;
986
987    use super::*;
988    use crate::common::table::test_utils::gen_pbtable;
989    use crate::executor::MemoryEncoding;
990    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
991
992    async fn create_in_memory_state_table(
993        mem_state: MemoryStateStore,
994        data_types: &[DataType],
995        order_types: &[OrderType],
996        pk_indices: &[usize],
997        table_id: u32,
998    ) -> StateTable<MemoryStateStore> {
999        let column_descs = data_types
1000            .iter()
1001            .enumerate()
1002            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1003            .collect_vec();
1004        StateTable::from_table_catalog(
1005            &gen_pbtable(
1006                TableId::new(table_id),
1007                column_descs,
1008                order_types.to_vec(),
1009                pk_indices.to_vec(),
1010                0,
1011            ),
1012            mem_state.clone(),
1013            None,
1014        )
1015        .await
1016    }
1017
1018    async fn create_executor<const T: AsOfJoinTypePrimitive>(
1019        asof_desc: AsOfDesc,
1020    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1021        let schema = Schema {
1022            fields: vec![
1023                Field::unnamed(DataType::Int64), // join key
1024                Field::unnamed(DataType::Int64),
1025                Field::unnamed(DataType::Int64),
1026            ],
1027        };
1028        let (tx_l, source_l) = MockSource::channel();
1029        let source_l = source_l.into_executor(schema.clone(), vec![1]);
1030        let (tx_r, source_r) = MockSource::channel();
1031        let source_r = source_r.into_executor(schema, vec![1]);
1032        let params_l = JoinParams::new(vec![0], vec![1]);
1033        let params_r = JoinParams::new(vec![0], vec![1]);
1034
1035        let mem_state = MemoryStateStore::new();
1036
1037        let state_l = create_in_memory_state_table(
1038            mem_state.clone(),
1039            &[DataType::Int64, DataType::Int64, DataType::Int64],
1040            &[
1041                OrderType::ascending(),
1042                OrderType::ascending(),
1043                OrderType::ascending(),
1044            ],
1045            &[0, asof_desc.left_idx, 1],
1046            0,
1047        )
1048        .await;
1049
1050        let state_r = create_in_memory_state_table(
1051            mem_state,
1052            &[DataType::Int64, DataType::Int64, DataType::Int64],
1053            &[
1054                OrderType::ascending(),
1055                OrderType::ascending(),
1056                OrderType::ascending(),
1057            ],
1058            &[0, asof_desc.right_idx, 1],
1059            1,
1060        )
1061        .await;
1062
1063        let schema: Schema = [source_l.schema().fields(), source_r.schema().fields()]
1064            .concat()
1065            .into_iter()
1066            .collect();
1067        let schema_len = schema.len();
1068        let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1069
1070        let executor = AsOfJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1071            ActorContext::for_test(123),
1072            info,
1073            source_l,
1074            source_r,
1075            params_l,
1076            params_r,
1077            vec![false],
1078            (0..schema_len).collect_vec(),
1079            state_l,
1080            state_r,
1081            Arc::new(AtomicU64::new(0)),
1082            Arc::new(StreamingMetrics::unused()),
1083            1024,
1084            2048,
1085            asof_desc,
1086        );
1087        (tx_l, tx_r, executor.boxed().execute())
1088    }
1089
1090    #[tokio::test]
1091    async fn test_as_of_inner_join() -> StreamExecutorResult<()> {
1092        let asof_desc = AsOfDesc {
1093            left_idx: 0,
1094            right_idx: 2,
1095            inequality_type: AsOfInequalityType::Lt,
1096        };
1097
1098        let chunk_l1 = StreamChunk::from_pretty(
1099            "  I I I
1100             + 1 4 7
1101             + 2 5 8
1102             + 3 6 9",
1103        );
1104        let chunk_l2 = StreamChunk::from_pretty(
1105            "  I I I
1106             + 3 8 1
1107             - 3 8 1",
1108        );
1109        let chunk_r1 = StreamChunk::from_pretty(
1110            "  I I I
1111             + 2 1 7
1112             + 2 2 1
1113             + 2 3 4
1114             + 2 4 2
1115             + 6 1 9
1116             + 6 2 9",
1117        );
1118        let chunk_r2 = StreamChunk::from_pretty(
1119            "  I I I
1120             - 2 3 4",
1121        );
1122        let chunk_r3 = StreamChunk::from_pretty(
1123            "  I I I
1124             + 2 3 3",
1125        );
1126        let chunk_l3 = StreamChunk::from_pretty(
1127            "  I I I
1128             - 2 5 8",
1129        );
1130        let chunk_l4 = StreamChunk::from_pretty(
1131            "  I I I
1132             + 6 3 1
1133             + 6 4 1",
1134        );
1135        let chunk_r4 = StreamChunk::from_pretty(
1136            "  I I I
1137             - 6 1 9",
1138        );
1139
1140        let (mut tx_l, mut tx_r, mut hash_join) =
1141            create_executor::<{ AsOfJoinType::Inner }>(asof_desc).await;
1142
1143        // push the init barrier for left and right
1144        tx_l.push_barrier(test_epoch(1), false);
1145        tx_r.push_barrier(test_epoch(1), false);
1146        hash_join.next_unwrap_ready_barrier()?;
1147
1148        // push the 1st left chunk
1149        tx_l.push_chunk(chunk_l1);
1150        hash_join.next_unwrap_pending();
1151
1152        // push the init barrier for left and right
1153        tx_l.push_barrier(test_epoch(2), false);
1154        tx_r.push_barrier(test_epoch(2), false);
1155        hash_join.next_unwrap_ready_barrier()?;
1156
1157        // push the 2nd left chunk
1158        tx_l.push_chunk(chunk_l2);
1159        hash_join.next_unwrap_pending();
1160
1161        // push the 1st right chunk
1162        tx_r.push_chunk(chunk_r1);
1163        let chunk = hash_join.next_unwrap_ready_chunk()?;
1164        assert_eq!(
1165            chunk,
1166            StreamChunk::from_pretty(
1167                " I I I I I I
1168                + 2 5 8 2 1 7
1169                - 2 5 8 2 1 7
1170                + 2 5 8 2 3 4"
1171            )
1172        );
1173
1174        // push the 2nd right chunk
1175        tx_r.push_chunk(chunk_r2);
1176        let chunk = hash_join.next_unwrap_ready_chunk()?;
1177        assert_eq!(
1178            chunk,
1179            StreamChunk::from_pretty(
1180                " I I I I I I
1181                - 2 5 8 2 3 4
1182                + 2 5 8 2 1 7"
1183            )
1184        );
1185
1186        // push the 3rd right chunk
1187        tx_r.push_chunk(chunk_r3);
1188        let chunk = hash_join.next_unwrap_ready_chunk()?;
1189        assert_eq!(
1190            chunk,
1191            StreamChunk::from_pretty(
1192                " I I I I I I
1193                - 2 5 8 2 1 7
1194                + 2 5 8 2 3 3"
1195            )
1196        );
1197
1198        // push the 3rd left chunk
1199        tx_l.push_chunk(chunk_l3);
1200        let chunk = hash_join.next_unwrap_ready_chunk()?;
1201        assert_eq!(
1202            chunk,
1203            StreamChunk::from_pretty(
1204                " I I I I I I
1205                - 2 5 8 2 3 3"
1206            )
1207        );
1208
1209        // push the 4th left chunk
1210        tx_l.push_chunk(chunk_l4);
1211        let chunk = hash_join.next_unwrap_ready_chunk()?;
1212        assert_eq!(
1213            chunk,
1214            StreamChunk::from_pretty(
1215                " I I I I I I
1216                + 6 3 1 6 1 9
1217                + 6 4 1 6 1 9"
1218            )
1219        );
1220
1221        // push the 4th right chunk
1222        tx_r.push_chunk(chunk_r4);
1223        let chunk = hash_join.next_unwrap_ready_chunk()?;
1224        assert_eq!(
1225            chunk,
1226            StreamChunk::from_pretty(
1227                " I I I I I I
1228                - 6 3 1 6 1 9
1229                + 6 3 1 6 2 9
1230                - 6 4 1 6 1 9
1231                + 6 4 1 6 2 9"
1232            )
1233        );
1234
1235        Ok(())
1236    }
1237
1238    #[tokio::test]
1239    async fn test_as_of_left_outer_join() -> StreamExecutorResult<()> {
1240        let asof_desc = AsOfDesc {
1241            left_idx: 1,
1242            right_idx: 2,
1243            inequality_type: AsOfInequalityType::Ge,
1244        };
1245
1246        let chunk_l1 = StreamChunk::from_pretty(
1247            "  I I I
1248             + 1 4 7
1249             + 2 5 8
1250             + 3 6 9",
1251        );
1252        let chunk_l2 = StreamChunk::from_pretty(
1253            "  I I I
1254             + 3 8 1
1255             - 3 8 1",
1256        );
1257        let chunk_r1 = StreamChunk::from_pretty(
1258            "  I I I
1259             + 2 3 4
1260             + 2 2 5
1261             + 2 1 5
1262             + 6 1 8
1263             + 6 2 9",
1264        );
1265        let chunk_r2 = StreamChunk::from_pretty(
1266            "  I I I
1267             - 2 3 4
1268             - 2 1 5
1269             - 2 2 5",
1270        );
1271        let chunk_l3 = StreamChunk::from_pretty(
1272            "  I I I
1273             + 6 8 9",
1274        );
1275        let chunk_r3 = StreamChunk::from_pretty(
1276            "  I I I
1277             - 6 1 8",
1278        );
1279
1280        let (mut tx_l, mut tx_r, mut hash_join) =
1281            create_executor::<{ AsOfJoinType::LeftOuter }>(asof_desc).await;
1282
1283        // push the init barrier for left and right
1284        tx_l.push_barrier(test_epoch(1), false);
1285        tx_r.push_barrier(test_epoch(1), false);
1286        hash_join.next_unwrap_ready_barrier()?;
1287
1288        // push the 1st left chunk
1289        tx_l.push_chunk(chunk_l1);
1290        let chunk = hash_join.next_unwrap_ready_chunk()?;
1291        assert_eq!(
1292            chunk,
1293            StreamChunk::from_pretty(
1294                " I I I I I I
1295                + 1 4 7 . . .
1296                + 2 5 8 . . .
1297                + 3 6 9 . . ."
1298            )
1299        );
1300
1301        // push the init barrier for left and right
1302        tx_l.push_barrier(test_epoch(2), false);
1303        tx_r.push_barrier(test_epoch(2), false);
1304        hash_join.next_unwrap_ready_barrier()?;
1305
1306        // push the 2nd left chunk
1307        tx_l.push_chunk(chunk_l2);
1308        let chunk = hash_join.next_unwrap_ready_chunk()?;
1309        assert_eq!(
1310            chunk,
1311            StreamChunk::from_pretty(
1312                " I I I I I I
1313                + 3 8 1 . . . D
1314                - 3 8 1 . . . D"
1315            )
1316        );
1317
1318        // push the 1st right chunk
1319        tx_r.push_chunk(chunk_r1);
1320        let chunk = hash_join.next_unwrap_ready_chunk()?;
1321        assert_eq!(
1322            chunk,
1323            StreamChunk::from_pretty(
1324                " I I I I I I
1325                - 2 5 8 . . .
1326                + 2 5 8 2 3 4
1327                - 2 5 8 2 3 4
1328                + 2 5 8 2 2 5
1329                - 2 5 8 2 2 5
1330                + 2 5 8 2 1 5"
1331            )
1332        );
1333
1334        // push the 2nd right chunk
1335        tx_r.push_chunk(chunk_r2);
1336        let chunk = hash_join.next_unwrap_ready_chunk()?;
1337        assert_eq!(
1338            chunk,
1339            StreamChunk::from_pretty(
1340                " I I I I I I
1341                - 2 5 8 2 1 5
1342                + 2 5 8 2 2 5
1343                - 2 5 8 2 2 5
1344                + 2 5 8 . . ."
1345            )
1346        );
1347
1348        // push the 3rd left chunk
1349        tx_l.push_chunk(chunk_l3);
1350        let chunk = hash_join.next_unwrap_ready_chunk()?;
1351        assert_eq!(
1352            chunk,
1353            StreamChunk::from_pretty(
1354                " I I I I I I
1355                + 6 8 9 6 1 8"
1356            )
1357        );
1358
1359        // push the 3rd right chunk
1360        tx_r.push_chunk(chunk_r3);
1361        let chunk = hash_join.next_unwrap_ready_chunk()?;
1362        assert_eq!(
1363            chunk,
1364            StreamChunk::from_pretty(
1365                " I I I I I I
1366                - 6 8 9 6 1 8
1367                + 6 8 9 . . ."
1368            )
1369        );
1370        Ok(())
1371    }
1372}