risingwave_stream/executor/
asof_join.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! `AsOf` join executor with optional LRU cache.
16//!
17//! When `use_cache` is false, the executor directly queries the state table.
18//! When `use_cache` is true, the `AsOfJoinHashMap` maintains an LRU cache.
19//! Controlled by the session variable `streaming_asof_join_use_cache`.
20
21use std::cmp::Ordering;
22use std::collections::BTreeMap;
23use std::ops::Bound;
24use std::time::Duration;
25
26use either::Either;
27use itertools::Itertools;
28use multimap::MultiMap;
29use risingwave_common::array::Op;
30use risingwave_common::metrics::LabelGuardedHistogram;
31use risingwave_common::row::RowExt;
32use risingwave_common::util::epoch::EpochPair;
33use risingwave_common::util::sort_util::cmp_rows_ascending;
34use tokio::time::Instant;
35
36use super::barrier_align::*;
37use super::join::asof_join::*;
38use super::join::builder::JoinStreamChunkBuilder;
39use super::join::row::JoinRow;
40use super::join::*;
41use super::watermark::*;
42use crate::executor::join::builder::JoinChunkBuilder;
43use crate::executor::prelude::*;
44
45pub struct JoinParams {
46    /// Indices of the join keys
47    pub join_key_indices: Vec<usize>,
48    /// Indices of the input pk after dedup
49    pub deduped_pk_indices: Vec<usize>,
50}
51
52impl JoinParams {
53    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
54        Self {
55            join_key_indices,
56            deduped_pk_indices,
57        }
58    }
59}
60
61struct JoinSide<S: StateStore, E: AsOfRowEncoding> {
62    /// Store all data from a one side stream
63    ht: AsOfJoinHashMap<S, E>,
64    /// Indices of the join key columns
65    join_key_indices: Vec<usize>,
66    /// The data type of all columns without degree.
67    all_data_types: Vec<DataType>,
68    /// The mapping from input indices of a side to output columns.
69    i2o_mapping: Vec<(usize, usize)>,
70    i2o_mapping_indexed: MultiMap<usize, usize>,
71    /// The index of the inequality column.
72    inequal_key_idx: usize,
73}
74
75impl<S: StateStore, E: AsOfRowEncoding> std::fmt::Debug for JoinSide<S, E> {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("JoinSide")
78            .field("join_key_indices", &self.join_key_indices)
79            .field("col_types", &self.all_data_types)
80            .field("i2o_mapping", &self.i2o_mapping)
81            .finish()
82    }
83}
84
85impl<S: StateStore, E: AsOfRowEncoding> JoinSide<S, E> {
86    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
87        self.ht.init(epoch).await
88    }
89}
90
91/// `AsOfJoinExecutor` for streaming as-of joins.
92///
93/// This executor uses `AsOfJoinHashMap` to support both execution modes:
94/// it can either maintain an LRU cache or query the state table directly,
95/// depending on the `use_cache` configuration.
96pub struct AsOfJoinExecutor<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> {
97    ctx: ActorContextRef,
98    info: ExecutorInfo,
99
100    /// Left input executor
101    input_l: Option<Executor>,
102    /// Right input executor
103    input_r: Option<Executor>,
104    /// The data types of the formed new columns
105    actual_output_data_types: Vec<DataType>,
106    /// The parameters of the left join executor
107    side_l: JoinSide<S, E>,
108    /// The parameters of the right join executor
109    side_r: JoinSide<S, E>,
110
111    /// Whether nulls are considered equal for each join key column
112    null_safe: Vec<bool>,
113
114    metrics: Arc<StreamingMetrics>,
115    /// The maximum size of the chunk produced by executor at a time
116    chunk_size: usize,
117    /// watermark column index -> `BufferedWatermarks`
118    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
119    /// `AsOf` join description
120    asof_desc: AsOfDesc,
121    /// Row counter for periodic cache eviction
122    cnt_rows_received: u32,
123    /// Number of processed rows between periodic manual evictions of the join cache.
124    join_cache_evict_interval_rows: u32,
125    /// Threshold for logging high join amplification warnings.
126    high_join_amplification_threshold: usize,
127}
128
129impl<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> std::fmt::Debug
130    for AsOfJoinExecutor<S, T, E>
131{
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        f.debug_struct("AsOfJoinExecutor")
134            .field("join_type", &T)
135            .field("input_left", &self.input_l.as_ref().unwrap().identity())
136            .field("input_right", &self.input_r.as_ref().unwrap().identity())
137            .field("side_l", &self.side_l)
138            .field("side_r", &self.side_r)
139            .field("stream_key", &self.info.stream_key)
140            .field("schema", &self.info.schema)
141            .field("actual_output_data_types", &self.actual_output_data_types)
142            .finish()
143    }
144}
145
146impl<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> Execute
147    for AsOfJoinExecutor<S, T, E>
148{
149    fn execute(self: Box<Self>) -> BoxedMessageStream {
150        self.into_stream().boxed()
151    }
152}
153
154struct EqJoinArgs<'a, S: StateStore, E: AsOfRowEncoding> {
155    ctx: &'a ActorContextRef,
156    side_l: &'a mut JoinSide<S, E>,
157    side_r: &'a mut JoinSide<S, E>,
158    null_safe: &'a [bool],
159    asof_desc: &'a AsOfDesc,
160    actual_output_data_types: &'a [DataType],
161    chunk: StreamChunk,
162    chunk_size: usize,
163    cnt_rows_received: &'a mut u32,
164    join_cache_evict_interval_rows: u32,
165    high_join_amplification_threshold: usize,
166    /// Bound at executor scope so the guard isn't dropped between chunks (which resets the series).
167    /// Only observed by [`AsOfJoinExecutor::eq_join_right`].
168    join_matched_join_keys: &'a LabelGuardedHistogram,
169}
170
171impl<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> AsOfJoinExecutor<S, T, E> {
172    #[expect(clippy::too_many_arguments)]
173    pub fn new(
174        ctx: ActorContextRef,
175        info: ExecutorInfo,
176        input_l: Executor,
177        input_r: Executor,
178        params_l: JoinParams,
179        params_r: JoinParams,
180        null_safe: Vec<bool>,
181        output_indices: Vec<usize>,
182        state_table_l: StateTable<S>,
183        state_table_r: StateTable<S>,
184        watermark_epoch: AtomicU64Ref,
185        metrics: Arc<StreamingMetrics>,
186        chunk_size: usize,
187        asof_desc: AsOfDesc,
188        use_cache: bool,
189        high_join_amplification_threshold: usize,
190    ) -> Self {
191        let join_cache_evict_interval_rows = ctx
192            .config
193            .developer
194            .join_hash_map_evict_interval_rows
195            .max(1);
196        let cache_epoch = if use_cache {
197            Some(watermark_epoch)
198        } else {
199            None
200        };
201        let schema_fields = [
202            input_l.schema().fields.clone(),
203            input_r.schema().fields.clone(),
204        ]
205        .concat();
206
207        let original_output_data_types = schema_fields
208            .iter()
209            .map(|field| field.data_type())
210            .collect_vec();
211        let actual_output_data_types = output_indices
212            .iter()
213            .map(|&idx| original_output_data_types[idx].clone())
214            .collect_vec();
215
216        let state_all_data_types_l = input_l.schema().data_types();
217        let state_all_data_types_r = input_r.schema().data_types();
218
219        let state_join_key_indices_l = params_l.join_key_indices;
220        let state_join_key_indices_r = params_r.join_key_indices;
221
222        let join_key_data_types_l = state_join_key_indices_l
223            .iter()
224            .map(|idx| state_all_data_types_l[*idx].clone())
225            .collect_vec();
226
227        let join_key_data_types_r = state_join_key_indices_r
228            .iter()
229            .map(|idx| state_all_data_types_r[*idx].clone())
230            .collect_vec();
231
232        assert_eq!(join_key_data_types_l, join_key_data_types_r);
233
234        let (left_to_output, right_to_output) = {
235            let (left_len, right_len) = if is_left_semi_or_anti(T) {
236                (state_all_data_types_l.len(), 0usize)
237            } else if is_right_semi_or_anti(T) {
238                (0usize, state_all_data_types_r.len())
239            } else {
240                (state_all_data_types_l.len(), state_all_data_types_r.len())
241            };
242            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
243        };
244
245        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
246        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
247
248        let watermark_buffers = BTreeMap::new();
249
250        let inequal_key_idx_l = asof_desc.left_idx;
251        let inequal_key_idx_r = asof_desc.right_idx;
252
253        Self {
254            ctx: ctx.clone(),
255            info,
256            input_l: Some(input_l),
257            input_r: Some(input_r),
258            actual_output_data_types,
259            null_safe,
260            side_l: JoinSide {
261                ht: AsOfJoinHashMap::new(
262                    state_join_key_indices_l.clone(),
263                    state_table_l,
264                    params_l.deduped_pk_indices,
265                    inequal_key_idx_l,
266                    state_all_data_types_l.clone(),
267                    cache_epoch.clone(),
268                    metrics.clone(),
269                    ctx.id,
270                    ctx.fragment_id,
271                    "left",
272                ),
273                join_key_indices: state_join_key_indices_l,
274                all_data_types: state_all_data_types_l,
275                i2o_mapping: left_to_output,
276                i2o_mapping_indexed: l2o_indexed,
277                inequal_key_idx: inequal_key_idx_l,
278            },
279            side_r: JoinSide {
280                ht: AsOfJoinHashMap::new(
281                    state_join_key_indices_r.clone(),
282                    state_table_r,
283                    params_r.deduped_pk_indices,
284                    inequal_key_idx_r,
285                    state_all_data_types_r.clone(),
286                    cache_epoch,
287                    metrics.clone(),
288                    ctx.id,
289                    ctx.fragment_id,
290                    "right",
291                ),
292                join_key_indices: state_join_key_indices_r,
293                all_data_types: state_all_data_types_r,
294                i2o_mapping: right_to_output,
295                i2o_mapping_indexed: r2o_indexed,
296                inequal_key_idx: inequal_key_idx_r,
297            },
298            metrics,
299            chunk_size,
300            watermark_buffers,
301            asof_desc,
302            cnt_rows_received: 0,
303            join_cache_evict_interval_rows,
304            high_join_amplification_threshold,
305        }
306    }
307
308    /// Periodically evict the LRU cache to prevent memory buildup between barriers.
309    fn evict_cache(
310        side_l: &mut JoinSide<S, E>,
311        side_r: &mut JoinSide<S, E>,
312        cnt_rows_received: &mut u32,
313        join_cache_evict_interval_rows: u32,
314    ) {
315        *cnt_rows_received += 1;
316        if *cnt_rows_received >= join_cache_evict_interval_rows {
317            side_l.ht.evict_cache();
318            side_r.ht.evict_cache();
319            *cnt_rows_received = 0;
320        }
321    }
322
323    #[try_stream(ok = Message, error = StreamExecutorError)]
324    async fn into_stream(mut self) {
325        let input_l = self.input_l.take().unwrap();
326        let input_r = self.input_r.take().unwrap();
327        let aligned_stream = barrier_align(
328            input_l.execute(),
329            input_r.execute(),
330            self.ctx.id,
331            self.ctx.fragment_id,
332            self.metrics.clone(),
333            "Join",
334        );
335        pin_mut!(aligned_stream);
336        let actor_id = self.ctx.id;
337
338        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
339        let first_epoch = barrier.epoch;
340        yield Message::Barrier(barrier);
341        self.side_l.init(first_epoch).await?;
342        self.side_r.init(first_epoch).await?;
343
344        let actor_id_str = self.ctx.id.to_string();
345        let fragment_id_str = self.ctx.fragment_id.to_string();
346
347        let join_actor_input_waiting_duration_ns = self
348            .metrics
349            .join_actor_input_waiting_duration_ns
350            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
351        let left_join_match_duration_ns = self
352            .metrics
353            .join_match_duration_ns
354            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
355        let right_join_match_duration_ns = self
356            .metrics
357            .join_match_duration_ns
358            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
359
360        let barrier_join_match_duration_ns = self
361            .metrics
362            .join_match_duration_ns
363            .with_guarded_label_values(&[
364                actor_id_str.as_str(),
365                fragment_id_str.as_str(),
366                "barrier",
367            ]);
368
369        let left_join_cached_entry_count = self
370            .metrics
371            .join_cached_entry_count
372            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
373
374        let right_join_cached_entry_count = self
375            .metrics
376            .join_cached_entry_count
377            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
378
379        // Bind at executor scope: a per-chunk guard would be dropped between chunks and reset the series.
380        let right_table_id_str = self.side_r.ht.table_id().to_string();
381        let join_matched_join_keys = self
382            .metrics
383            .join_matched_join_keys
384            .with_guarded_label_values(&[
385                actor_id_str.as_str(),
386                fragment_id_str.as_str(),
387                right_table_id_str.as_str(),
388            ]);
389
390        let mut start_time = Instant::now();
391
392        while let Some(msg) = aligned_stream
393            .next()
394            .instrument_await("hash_join_barrier_align")
395            .await
396        {
397            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
398            match msg? {
399                AlignedMessage::WatermarkLeft(watermark) => {
400                    for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
401                        yield Message::Watermark(watermark_to_emit);
402                    }
403                }
404                AlignedMessage::WatermarkRight(watermark) => {
405                    for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
406                        yield Message::Watermark(watermark_to_emit);
407                    }
408                }
409                AlignedMessage::Left(chunk) => {
410                    let mut left_time = Duration::from_nanos(0);
411                    let mut left_start_time = Instant::now();
412                    #[for_await]
413                    for chunk in Self::eq_join_left(EqJoinArgs {
414                        ctx: &self.ctx,
415                        side_l: &mut self.side_l,
416                        side_r: &mut self.side_r,
417                        null_safe: &self.null_safe,
418                        asof_desc: &self.asof_desc,
419                        actual_output_data_types: &self.actual_output_data_types,
420                        chunk,
421                        chunk_size: self.chunk_size,
422                        cnt_rows_received: &mut self.cnt_rows_received,
423                        join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
424                        high_join_amplification_threshold: self.high_join_amplification_threshold,
425                        join_matched_join_keys: &join_matched_join_keys,
426                    }) {
427                        left_time += left_start_time.elapsed();
428                        yield Message::Chunk(chunk?);
429                        left_start_time = Instant::now();
430                    }
431                    left_time += left_start_time.elapsed();
432                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
433                    self.try_flush_data().await?;
434                }
435                AlignedMessage::Right(chunk) => {
436                    let mut right_time = Duration::from_nanos(0);
437                    let mut right_start_time = Instant::now();
438                    #[for_await]
439                    for chunk in Self::eq_join_right(EqJoinArgs {
440                        ctx: &self.ctx,
441                        side_l: &mut self.side_l,
442                        side_r: &mut self.side_r,
443                        null_safe: &self.null_safe,
444                        asof_desc: &self.asof_desc,
445                        actual_output_data_types: &self.actual_output_data_types,
446                        chunk,
447                        chunk_size: self.chunk_size,
448                        cnt_rows_received: &mut self.cnt_rows_received,
449                        join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
450                        high_join_amplification_threshold: self.high_join_amplification_threshold,
451                        join_matched_join_keys: &join_matched_join_keys,
452                    }) {
453                        right_time += right_start_time.elapsed();
454                        yield Message::Chunk(chunk?);
455                        right_start_time = Instant::now();
456                    }
457                    right_time += right_start_time.elapsed();
458                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
459                    self.try_flush_data().await?;
460                }
461                AlignedMessage::Barrier(barrier) => {
462                    let barrier_start_time = Instant::now();
463                    let (left_post_commit, right_post_commit) =
464                        self.flush_data(barrier.epoch).await?;
465
466                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
467                    yield Message::Barrier(barrier);
468
469                    // Update the vnode bitmap for state tables of both sides if asked.
470                    right_post_commit
471                        .post_yield_barrier(update_vnode_bitmap.clone())
472                        .await?;
473                    if left_post_commit
474                        .post_yield_barrier(update_vnode_bitmap)
475                        .await?
476                        .unwrap_or(false)
477                    {
478                        self.watermark_buffers
479                            .values_mut()
480                            .for_each(|buffers| buffers.clear());
481                    }
482
483                    // Report metrics of cached join rows/entries
484                    for (join_cached_entry_count, ht) in [
485                        (&left_join_cached_entry_count, &self.side_l.ht),
486                        (&right_join_cached_entry_count, &self.side_r.ht),
487                    ] {
488                        join_cached_entry_count.set(ht.entry_count() as i64);
489                    }
490
491                    barrier_join_match_duration_ns
492                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
493                }
494            }
495            start_time = Instant::now();
496        }
497    }
498
499    async fn flush_data(
500        &mut self,
501        epoch: EpochPair,
502    ) -> StreamExecutorResult<(
503        AsOfJoinHashMapPostCommit<'_, S, E>,
504        AsOfJoinHashMapPostCommit<'_, S, E>,
505    )> {
506        let left = self.side_l.ht.flush(epoch).await?;
507        let right = self.side_r.ht.flush(epoch).await?;
508        Ok((left, right))
509    }
510
511    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
512        self.side_l.ht.try_flush().await?;
513        self.side_r.ht.try_flush().await?;
514        Ok(())
515    }
516
517    fn handle_watermark(
518        &mut self,
519        side: SideTypePrimitive,
520        watermark: Watermark,
521    ) -> StreamExecutorResult<Vec<Watermark>> {
522        let (side_update, side_match) = if side == SideType::Left {
523            (&mut self.side_l, &mut self.side_r)
524        } else {
525            (&mut self.side_r, &mut self.side_l)
526        };
527
528        // State cleaning
529        if side_update.join_key_indices[0] == watermark.col_idx {
530            side_match.ht.update_watermark(watermark.val.clone());
531        }
532
533        // Select watermarks to yield.
534        let wm_in_jk = side_update
535            .join_key_indices
536            .iter()
537            .positions(|idx| *idx == watermark.col_idx);
538        let mut watermarks_to_emit = vec![];
539        for idx in wm_in_jk {
540            let buffers = self
541                .watermark_buffers
542                .entry(idx)
543                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
544            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
545                let empty_indices = vec![];
546                let output_indices = side_update
547                    .i2o_mapping_indexed
548                    .get_vec(&side_update.join_key_indices[idx])
549                    .unwrap_or(&empty_indices)
550                    .iter()
551                    .chain(
552                        side_match
553                            .i2o_mapping_indexed
554                            .get_vec(&side_match.join_key_indices[idx])
555                            .unwrap_or(&empty_indices),
556                    );
557                for output_idx in output_indices {
558                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
559                }
560            };
561        }
562        Ok(watermarks_to_emit)
563    }
564
565    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
566    async fn eq_join_left(args: EqJoinArgs<'_, S, E>) {
567        let EqJoinArgs {
568            ctx: _,
569            side_l,
570            side_r,
571            null_safe,
572            asof_desc,
573            actual_output_data_types,
574            chunk,
575            chunk_size,
576            cnt_rows_received,
577            join_cache_evict_interval_rows,
578            high_join_amplification_threshold: _,
579            join_matched_join_keys: _,
580        } = args;
581
582        let (side_update, side_match) = (side_l, side_r);
583
584        let mut join_chunk_builder =
585            JoinChunkBuilder::<T, { SideType::Left }>::new(JoinStreamChunkBuilder::new(
586                chunk_size,
587                actual_output_data_types.to_vec(),
588                side_update.i2o_mapping.clone(),
589                side_match.i2o_mapping.clone(),
590            ));
591
592        // The inequality key is always a single column; wrap in an array for `project()`.
593        let inequal_key_idx_update = [side_update.inequal_key_idx];
594
595        for r in chunk.rows_with_holes() {
596            let Some((op, row)) = r else {
597                continue;
598            };
599            Self::evict_cache(
600                side_update,
601                side_match,
602                cnt_rows_received,
603                join_cache_evict_interval_rows,
604            );
605
606            // Check null-safe: if any non-null-safe join key column is NULL, skip matching.
607            // Rows that can never match are not stored in state (consistent with old behavior).
608            let join_key_null_not_safe = side_update
609                .join_key_indices
610                .iter()
611                .zip_eq(null_safe.iter())
612                .any(|(idx, ns)| !ns && row.datum_at(*idx).is_none());
613            if join_key_null_not_safe {
614                match op {
615                    Op::Insert | Op::UpdateInsert => {
616                        if let Some(chunk) =
617                            join_chunk_builder.forward_if_not_matched(Op::Insert, row)
618                        {
619                            yield chunk;
620                        }
621                    }
622                    Op::Delete | Op::UpdateDelete => {
623                        if let Some(chunk) =
624                            join_chunk_builder.forward_if_not_matched(Op::Delete, row)
625                        {
626                            yield chunk;
627                        }
628                    }
629                }
630                continue;
631            }
632
633            let join_key = row.project(&side_update.join_key_indices);
634
635            let inequal_key_is_null = side_update.ht.check_inequal_key_null(&row);
636            let inequal_key = row.project(&inequal_key_idx_update);
637
638            if !inequal_key_is_null {
639                let matched_row_by_inequality = match asof_desc.inequality_type {
640                    AsOfInequalityType::Lt => {
641                        side_match
642                            .ht
643                            .lower_bound_by_inequality_with_jk_prefix(
644                                &join_key,
645                                Bound::Excluded(&inequal_key),
646                            )
647                            .await
648                    }
649                    AsOfInequalityType::Le => {
650                        side_match
651                            .ht
652                            .lower_bound_by_inequality_with_jk_prefix(
653                                &join_key,
654                                Bound::Included(&inequal_key),
655                            )
656                            .await
657                    }
658                    AsOfInequalityType::Gt => {
659                        side_match
660                            .ht
661                            .upper_bound_by_inequality_with_jk_prefix(
662                                &join_key,
663                                Bound::Excluded(&inequal_key),
664                            )
665                            .await
666                    }
667                    AsOfInequalityType::Ge => {
668                        side_match
669                            .ht
670                            .upper_bound_by_inequality_with_jk_prefix(
671                                &join_key,
672                                Bound::Included(&inequal_key),
673                            )
674                            .await
675                    }
676                }?
677                .map(|row| JoinRow::new(row, 0));
678                match op {
679                    Op::Insert | Op::UpdateInsert => {
680                        if let Some(matched_row) = matched_row_by_inequality {
681                            if let Some(chunk) =
682                                join_chunk_builder.with_match_on_insert(&row, &matched_row)
683                            {
684                                yield chunk;
685                            }
686                        } else if let Some(chunk) =
687                            join_chunk_builder.forward_if_not_matched(Op::Insert, row)
688                        {
689                            yield chunk;
690                        }
691                        side_update.ht.insert(row)?;
692                    }
693                    Op::Delete | Op::UpdateDelete => {
694                        if let Some(matched_row) = matched_row_by_inequality {
695                            if let Some(chunk) =
696                                join_chunk_builder.with_match_on_delete(&row, &matched_row)
697                            {
698                                yield chunk;
699                            }
700                        } else if let Some(chunk) =
701                            join_chunk_builder.forward_if_not_matched(Op::Delete, row)
702                        {
703                            yield chunk;
704                        }
705                        side_update.ht.delete(row)?;
706                    }
707                }
708            } else {
709                // Inequality key is NULL, which can never satisfy the inequality predicate,
710                // so we skip storing and only forward as unmatched for left outer join.
711                match op {
712                    Op::Insert | Op::UpdateInsert => {
713                        if let Some(chunk) =
714                            join_chunk_builder.forward_if_not_matched(Op::Insert, row)
715                        {
716                            yield chunk;
717                        }
718                    }
719                    Op::Delete | Op::UpdateDelete => {
720                        if let Some(chunk) =
721                            join_chunk_builder.forward_if_not_matched(Op::Delete, row)
722                        {
723                            yield chunk;
724                        }
725                    }
726                }
727            }
728        }
729        if let Some(chunk) = join_chunk_builder.take() {
730            yield chunk;
731        }
732    }
733
734    fn cmp_pk_rows(pk1: &impl Row, pk2: &impl Row) -> Ordering {
735        cmp_rows_ascending(pk1, pk2)
736    }
737
738    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
739    async fn eq_join_right(args: EqJoinArgs<'_, S, E>) {
740        let EqJoinArgs {
741            ctx,
742            side_l,
743            side_r,
744            null_safe,
745            asof_desc,
746            actual_output_data_types,
747            chunk,
748            chunk_size,
749            cnt_rows_received,
750            join_cache_evict_interval_rows,
751            high_join_amplification_threshold,
752            join_matched_join_keys,
753        } = args;
754
755        let (side_update, side_match) = (side_r, side_l);
756
757        let mut join_chunk_builder = JoinStreamChunkBuilder::new(
758            chunk_size,
759            actual_output_data_types.to_vec(),
760            side_update.i2o_mapping.clone(),
761            side_match.i2o_mapping.clone(),
762        );
763
764        // The inequality key is always a single column; wrap in an array for `project()`.
765        let inequal_key_idx_update = [side_update.inequal_key_idx];
766
767        for r in chunk.rows_with_holes() {
768            let Some((op, row)) = r else {
769                continue;
770            };
771            Self::evict_cache(
772                side_update,
773                side_match,
774                cnt_rows_received,
775                join_cache_evict_interval_rows,
776            );
777
778            // Check null-safe: if any non-null-safe join key column is NULL, skip.
779            // Rows that can never match are not stored in state (consistent with old behavior).
780            let join_key_null_not_safe = side_update
781                .join_key_indices
782                .iter()
783                .zip_eq(null_safe.iter())
784                .any(|(idx, ns)| !ns && row.datum_at(*idx).is_none());
785            if join_key_null_not_safe {
786                continue;
787            }
788
789            let join_key = row.project(&side_update.join_key_indices);
790
791            let inequal_key_is_null = side_update.ht.check_inequal_key_null(&row);
792            let inequal_key = row.project(&inequal_key_idx_update);
793
794            let mut join_matched_rows_cnt = 0;
795
796            if !inequal_key_is_null {
797                let (row_to_delete_r, row_to_insert_r) = {
798                    let (first_row, second_row) = side_update
799                        .ht
800                        .first_two_by_inequality_with_jk_prefix(&join_key, &inequal_key)
801                        .await?;
802                    if let Some(first_row) = first_row {
803                        let row_pk = side_update.ht.get_pk_from_row(row);
804                        let first_pk = side_update.ht.get_pk_from_row(&first_row).to_owned_row();
805                        match op {
806                            Op::Insert | Op::UpdateInsert => {
807                                // If there are multiple rows match the inequality key in the right table, we use one with smallest pk.
808                                if Self::cmp_pk_rows(&first_pk, &row_pk) == Ordering::Greater {
809                                    (Some(Either::Left(first_row)), Some(Either::Right(row)))
810                                } else {
811                                    // No affected row in the right table.
812                                    (None, None)
813                                }
814                            }
815                            Op::Delete | Op::UpdateDelete => {
816                                if Self::cmp_pk_rows(&first_pk, &row_pk) == Ordering::Equal {
817                                    if let Some(second_row) = second_row {
818                                        (Some(Either::Right(row)), Some(Either::Left(second_row)))
819                                    } else {
820                                        (Some(Either::Right(row)), None)
821                                    }
822                                } else {
823                                    // No affected row in the right table.
824                                    (None, None)
825                                }
826                            }
827                        }
828                    } else {
829                        match op {
830                            // Decide the row_to_delete later
831                            Op::Insert | Op::UpdateInsert => (None, Some(Either::Right(row))),
832                            // Decide the row_to_insert later
833                            Op::Delete | Op::UpdateDelete => (Some(Either::Right(row)), None),
834                        }
835                    }
836                };
837                // 4 cases for row_to_delete_r and row_to_insert_r:
838                // 1. Some(_), Some(_): delete row_to_delete_r and insert row_to_insert_r
839                // 2. None, Some(_)   : row_to_delete to be decided by the nearest inequality key
840                // 3. Some(_), None   : row_to_insert to be decided by the nearest inequality key
841                // 4. None, None      : do nothing
842                if row_to_delete_r.is_none() && row_to_insert_r.is_none() {
843                    // no row to delete or insert.
844                } else {
845                    let prev_inequality_key = side_update
846                        .ht
847                        .upper_bound_by_inequality_with_jk_prefix(
848                            &join_key,
849                            Bound::Excluded(&inequal_key),
850                        )
851                        .await?
852                        .map(|r| r.project(&inequal_key_idx_update));
853                    let next_inequality_key = side_update
854                        .ht
855                        .lower_bound_by_inequality_with_jk_prefix(
856                            &join_key,
857                            Bound::Excluded(&inequal_key),
858                        )
859                        .await?
860                        .map(|r| r.project(&inequal_key_idx_update));
861
862                    let affected_inequality_key_r = match asof_desc.inequality_type {
863                        AsOfInequalityType::Lt | AsOfInequalityType::Le => &next_inequality_key,
864                        AsOfInequalityType::Gt | AsOfInequalityType::Ge => &prev_inequality_key,
865                    };
866                    let affected_row_r =
867                        if let Some(affected_inequality_key_r) = affected_inequality_key_r {
868                            side_update
869                                .ht
870                                .first_by_inequality_with_jk_prefix(
871                                    &join_key,
872                                    &affected_inequality_key_r,
873                                )
874                                .await?
875                        } else {
876                            None
877                        }
878                        .map(Either::Left);
879
880                    let (row_to_delete_r, row_to_insert_r) =
881                        match (&row_to_delete_r, &row_to_insert_r) {
882                            (Some(_), Some(_)) => (row_to_delete_r, row_to_insert_r),
883                            (None, Some(_)) => (affected_row_r, row_to_insert_r),
884                            (Some(_), None) => (row_to_delete_r, affected_row_r),
885                            (None, None) => unreachable!(),
886                        };
887                    let range = match asof_desc.inequality_type {
888                        AsOfInequalityType::Lt => (
889                            prev_inequality_key
890                                .map(Either::Left)
891                                .map_or_else(|| Bound::Unbounded, Bound::Included),
892                            Bound::Excluded(Either::Right(&inequal_key)),
893                        ),
894                        AsOfInequalityType::Le => (
895                            prev_inequality_key
896                                .map(Either::Left)
897                                .map_or_else(|| Bound::Unbounded, Bound::Excluded),
898                            Bound::Included(Either::Right(&inequal_key)),
899                        ),
900                        AsOfInequalityType::Gt => (
901                            Bound::Excluded(Either::Right(&inequal_key)),
902                            next_inequality_key
903                                .map(Either::Left)
904                                .map_or_else(|| Bound::Unbounded, Bound::Included),
905                        ),
906                        AsOfInequalityType::Ge => (
907                            Bound::Included(Either::Right(&inequal_key)),
908                            next_inequality_key
909                                .map(Either::Left)
910                                .map_or_else(|| Bound::Unbounded, Bound::Excluded),
911                        ),
912                    };
913
914                    let rows_l_stream = side_match
915                        .ht
916                        .range_by_inequality_with_jk_prefix(&join_key, &range)
917                        .await?;
918                    #[for_await]
919                    for row_l in rows_l_stream {
920                        let row_l = row_l?;
921                        join_matched_rows_cnt += 1;
922                        if let Some(row_to_delete_r) = &row_to_delete_r {
923                            if let Some(chunk) =
924                                join_chunk_builder.append_row(Op::Delete, row_to_delete_r, &row_l)
925                            {
926                                yield chunk;
927                            }
928                        } else if is_as_of_left_outer(T)
929                            && let Some(chunk) =
930                                join_chunk_builder.append_row_matched(Op::Delete, &row_l)
931                        {
932                            yield chunk;
933                        }
934                        if let Some(row_to_insert_r) = &row_to_insert_r {
935                            if let Some(chunk) =
936                                join_chunk_builder.append_row(Op::Insert, row_to_insert_r, &row_l)
937                            {
938                                yield chunk;
939                            }
940                        } else if is_as_of_left_outer(T)
941                            && let Some(chunk) =
942                                join_chunk_builder.append_row_matched(Op::Insert, &row_l)
943                        {
944                            yield chunk;
945                        }
946                    }
947                }
948
949                match op {
950                    Op::Insert | Op::UpdateInsert => {
951                        side_update.ht.insert(row)?;
952                    }
953                    Op::Delete | Op::UpdateDelete => {
954                        side_update.ht.delete(row)?;
955                    }
956                }
957            } else {
958                // Inequality key is NULL, which can never satisfy the inequality predicate,
959                // so we skip storing. No action needed on the right side.
960            }
961            join_matched_join_keys.observe(join_matched_rows_cnt as _);
962            if join_matched_rows_cnt > high_join_amplification_threshold {
963                let join_key = row.project(&side_update.join_key_indices);
964                tracing::warn!(target: "high_join_amplification",
965                    matched_rows_len = join_matched_rows_cnt,
966                    update_table_id = %side_update.ht.table_id(),
967                    match_table_id = %side_match.ht.table_id(),
968                    join_key = ?join_key,
969                    actor_id = %ctx.id,
970                    fragment_id = %ctx.fragment_id,
971                    "large rows matched for join key when AsOf join updating right side",
972                );
973            }
974        }
975        if let Some(chunk) = join_chunk_builder.take() {
976            yield chunk;
977        }
978    }
979}
980
981#[cfg(test)]
982mod tests {
983    use std::sync::atomic::AtomicU64;
984
985    use risingwave_common::array::*;
986    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
987    use risingwave_common::util::epoch::test_epoch;
988    use risingwave_common::util::sort_util::OrderType;
989    use risingwave_storage::memory::MemoryStateStore;
990
991    use super::*;
992    use crate::common::table::test_utils::gen_pbtable;
993    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
994
995    async fn create_in_memory_state_table(
996        mem_state: MemoryStateStore,
997        data_types: &[DataType],
998        order_types: &[OrderType],
999        pk_indices: &[usize],
1000        table_id: u32,
1001    ) -> StateTable<MemoryStateStore> {
1002        let column_descs = data_types
1003            .iter()
1004            .enumerate()
1005            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1006            .collect_vec();
1007        StateTable::from_table_catalog(
1008            &gen_pbtable(
1009                TableId::new(table_id),
1010                column_descs,
1011                order_types.to_vec(),
1012                pk_indices.to_vec(),
1013                0,
1014            ),
1015            mem_state.clone(),
1016            None,
1017        )
1018        .await
1019    }
1020
1021    async fn create_executor<const T: AsOfJoinTypePrimitive>(
1022        asof_desc: AsOfDesc,
1023        use_cache: bool,
1024    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1025        let schema = Schema {
1026            fields: vec![
1027                Field::unnamed(DataType::Int64), // join key
1028                Field::unnamed(DataType::Int64),
1029                Field::unnamed(DataType::Int64),
1030            ],
1031        };
1032        let (tx_l, source_l) = MockSource::channel();
1033        let source_l = source_l.into_executor(schema.clone(), vec![1]);
1034        let (tx_r, source_r) = MockSource::channel();
1035        let source_r = source_r.into_executor(schema, vec![1]);
1036        let params_l = JoinParams::new(vec![0], vec![1]);
1037        let params_r = JoinParams::new(vec![0], vec![1]);
1038
1039        let mem_state = MemoryStateStore::new();
1040
1041        let state_l = create_in_memory_state_table(
1042            mem_state.clone(),
1043            &[DataType::Int64, DataType::Int64, DataType::Int64],
1044            &[
1045                OrderType::ascending(),
1046                OrderType::ascending(),
1047                OrderType::ascending(),
1048            ],
1049            &[0, asof_desc.left_idx, 1],
1050            0,
1051        )
1052        .await;
1053
1054        let state_r = create_in_memory_state_table(
1055            mem_state,
1056            &[DataType::Int64, DataType::Int64, DataType::Int64],
1057            &[
1058                OrderType::ascending(),
1059                OrderType::ascending(),
1060                OrderType::ascending(),
1061            ],
1062            &[0, asof_desc.right_idx, 1],
1063            1,
1064        )
1065        .await;
1066
1067        let schema: Schema = [source_l.schema().fields(), source_r.schema().fields()]
1068            .concat()
1069            .into_iter()
1070            .collect();
1071        let schema_len = schema.len();
1072        let info = ExecutorInfo::for_test(schema, vec![1], "AsOfJoinExecutor".to_owned(), 0);
1073
1074        let executor = AsOfJoinExecutor::<MemoryStateStore, T, AsOfCpuEncoding>::new(
1075            ActorContext::for_test(123),
1076            info,
1077            source_l,
1078            source_r,
1079            params_l,
1080            params_r,
1081            vec![false],
1082            (0..schema_len).collect_vec(),
1083            state_l,
1084            state_r,
1085            Arc::new(AtomicU64::new(0)),
1086            Arc::new(StreamingMetrics::unused()),
1087            1024,
1088            asof_desc,
1089            use_cache,
1090            2048, // high_join_amplification_threshold
1091        );
1092        (tx_l, tx_r, executor.boxed().execute())
1093    }
1094
1095    #[tokio::test]
1096    async fn test_asof_inner_join() -> StreamExecutorResult<()> {
1097        test_asof_inner_join_impl(false).await
1098    }
1099
1100    #[tokio::test]
1101    async fn test_asof_inner_join_with_cache() -> StreamExecutorResult<()> {
1102        test_asof_inner_join_impl(true).await
1103    }
1104
1105    async fn test_asof_inner_join_impl(use_cache: bool) -> StreamExecutorResult<()> {
1106        let asof_desc = AsOfDesc {
1107            left_idx: 0,
1108            right_idx: 2,
1109            inequality_type: AsOfInequalityType::Lt,
1110        };
1111
1112        let chunk_l1 = StreamChunk::from_pretty(
1113            "  I I I
1114             + 1 4 7
1115             + 2 5 8
1116             + 3 6 9",
1117        );
1118        let chunk_l2 = StreamChunk::from_pretty(
1119            "  I I I
1120             + 3 8 1
1121             - 3 8 1",
1122        );
1123        let chunk_r1 = StreamChunk::from_pretty(
1124            "  I I I
1125             + 2 1 7
1126             + 2 2 1
1127             + 2 3 4
1128             + 2 4 2
1129             + 6 1 9
1130             + 6 2 9",
1131        );
1132        let chunk_r2 = StreamChunk::from_pretty(
1133            "  I I I
1134             - 2 3 4",
1135        );
1136        let chunk_r3 = StreamChunk::from_pretty(
1137            "  I I I
1138             + 2 3 3",
1139        );
1140        let chunk_l3 = StreamChunk::from_pretty(
1141            "  I I I
1142             - 2 5 8",
1143        );
1144        let chunk_l4 = StreamChunk::from_pretty(
1145            "  I I I
1146             + 6 3 1
1147             + 6 4 1",
1148        );
1149        let chunk_r4 = StreamChunk::from_pretty(
1150            "  I I I
1151             - 6 1 9",
1152        );
1153
1154        let (mut tx_l, mut tx_r, mut hash_join) =
1155            create_executor::<{ AsOfJoinType::Inner }>(asof_desc, use_cache).await;
1156
1157        // push the init barrier for left and right
1158        tx_l.push_barrier(test_epoch(1), false);
1159        tx_r.push_barrier(test_epoch(1), false);
1160        hash_join.next_unwrap_ready_barrier()?;
1161
1162        // push the 1st left chunk
1163        tx_l.push_chunk(chunk_l1);
1164        hash_join.next_unwrap_pending();
1165
1166        // push the init barrier for left and right
1167        tx_l.push_barrier(test_epoch(2), false);
1168        tx_r.push_barrier(test_epoch(2), false);
1169        hash_join.next_unwrap_ready_barrier()?;
1170
1171        // push the 2nd left chunk
1172        tx_l.push_chunk(chunk_l2);
1173        hash_join.next_unwrap_pending();
1174
1175        // push the 1st right chunk
1176        tx_r.push_chunk(chunk_r1);
1177        let chunk = hash_join.next_unwrap_ready_chunk()?;
1178        assert_eq!(
1179            chunk,
1180            StreamChunk::from_pretty(
1181                " I I I I I I
1182                + 2 5 8 2 1 7
1183                - 2 5 8 2 1 7
1184                + 2 5 8 2 3 4"
1185            )
1186        );
1187
1188        // push the 2nd right chunk
1189        tx_r.push_chunk(chunk_r2);
1190        let chunk = hash_join.next_unwrap_ready_chunk()?;
1191        assert_eq!(
1192            chunk,
1193            StreamChunk::from_pretty(
1194                " I I I I I I
1195                - 2 5 8 2 3 4
1196                + 2 5 8 2 1 7"
1197            )
1198        );
1199
1200        // push the 3rd right chunk
1201        tx_r.push_chunk(chunk_r3);
1202        let chunk = hash_join.next_unwrap_ready_chunk()?;
1203        assert_eq!(
1204            chunk,
1205            StreamChunk::from_pretty(
1206                " I I I I I I
1207                - 2 5 8 2 1 7
1208                + 2 5 8 2 3 3"
1209            )
1210        );
1211
1212        // push the 3rd left chunk
1213        tx_l.push_chunk(chunk_l3);
1214        let chunk = hash_join.next_unwrap_ready_chunk()?;
1215        assert_eq!(
1216            chunk,
1217            StreamChunk::from_pretty(
1218                " I I I I I I
1219                - 2 5 8 2 3 3"
1220            )
1221        );
1222
1223        // push the 4th left chunk
1224        tx_l.push_chunk(chunk_l4);
1225        let chunk = hash_join.next_unwrap_ready_chunk()?;
1226        assert_eq!(
1227            chunk,
1228            StreamChunk::from_pretty(
1229                " I I I I I I
1230                + 6 3 1 6 1 9
1231                + 6 4 1 6 1 9"
1232            )
1233        );
1234
1235        // push the 4th right chunk
1236        tx_r.push_chunk(chunk_r4);
1237        let chunk = hash_join.next_unwrap_ready_chunk()?;
1238        assert_eq!(
1239            chunk,
1240            StreamChunk::from_pretty(
1241                " I I I I I I
1242                - 6 3 1 6 1 9
1243                + 6 3 1 6 2 9
1244                - 6 4 1 6 1 9
1245                + 6 4 1 6 2 9"
1246            )
1247        );
1248
1249        Ok(())
1250    }
1251
1252    #[tokio::test]
1253    async fn test_asof_left_outer_join() -> StreamExecutorResult<()> {
1254        test_asof_left_outer_join_impl(false).await
1255    }
1256
1257    #[tokio::test]
1258    async fn test_asof_left_outer_join_with_cache() -> StreamExecutorResult<()> {
1259        test_asof_left_outer_join_impl(true).await
1260    }
1261
1262    async fn test_asof_left_outer_join_impl(use_cache: bool) -> StreamExecutorResult<()> {
1263        let asof_desc = AsOfDesc {
1264            left_idx: 1,
1265            right_idx: 2,
1266            inequality_type: AsOfInequalityType::Ge,
1267        };
1268
1269        let chunk_l1 = StreamChunk::from_pretty(
1270            "  I I I
1271             + 1 4 7
1272             + 2 5 8
1273             + 3 6 9",
1274        );
1275        let chunk_l2 = StreamChunk::from_pretty(
1276            "  I I I
1277             + 3 8 1
1278             - 3 8 1",
1279        );
1280        let chunk_r1 = StreamChunk::from_pretty(
1281            "  I I I
1282             + 2 3 4
1283             + 2 2 5
1284             + 2 1 5
1285             + 6 1 8
1286             + 6 2 9",
1287        );
1288        let chunk_r2 = StreamChunk::from_pretty(
1289            "  I I I
1290             - 2 3 4
1291             - 2 1 5
1292             - 2 2 5",
1293        );
1294        let chunk_l3 = StreamChunk::from_pretty(
1295            "  I I I
1296             + 6 8 9",
1297        );
1298        let chunk_r3 = StreamChunk::from_pretty(
1299            "  I I I
1300             - 6 1 8",
1301        );
1302
1303        let (mut tx_l, mut tx_r, mut hash_join) =
1304            create_executor::<{ AsOfJoinType::LeftOuter }>(asof_desc, use_cache).await;
1305
1306        // push the init barrier for left and right
1307        tx_l.push_barrier(test_epoch(1), false);
1308        tx_r.push_barrier(test_epoch(1), false);
1309        hash_join.next_unwrap_ready_barrier()?;
1310
1311        // push the 1st left chunk
1312        tx_l.push_chunk(chunk_l1);
1313        let chunk = hash_join.next_unwrap_ready_chunk()?;
1314        assert_eq!(
1315            chunk,
1316            StreamChunk::from_pretty(
1317                " I I I I I I
1318                + 1 4 7 . . .
1319                + 2 5 8 . . .
1320                + 3 6 9 . . ."
1321            )
1322        );
1323
1324        // push the init barrier for left and right
1325        tx_l.push_barrier(test_epoch(2), false);
1326        tx_r.push_barrier(test_epoch(2), false);
1327        hash_join.next_unwrap_ready_barrier()?;
1328
1329        // push the 2nd left chunk
1330        tx_l.push_chunk(chunk_l2);
1331        let chunk = hash_join.next_unwrap_ready_chunk()?;
1332        assert_eq!(
1333            chunk,
1334            StreamChunk::from_pretty(
1335                " I I I I I I
1336                + 3 8 1 . . . D
1337                - 3 8 1 . . . D"
1338            )
1339        );
1340
1341        // push the 1st right chunk
1342        tx_r.push_chunk(chunk_r1);
1343        let chunk = hash_join.next_unwrap_ready_chunk()?;
1344        assert_eq!(
1345            chunk,
1346            StreamChunk::from_pretty(
1347                " I I I I I I
1348                - 2 5 8 . . .
1349                + 2 5 8 2 3 4
1350                - 2 5 8 2 3 4
1351                + 2 5 8 2 2 5
1352                - 2 5 8 2 2 5
1353                + 2 5 8 2 1 5"
1354            )
1355        );
1356
1357        // push the 2nd right chunk
1358        tx_r.push_chunk(chunk_r2);
1359        let chunk = hash_join.next_unwrap_ready_chunk()?;
1360        assert_eq!(
1361            chunk,
1362            StreamChunk::from_pretty(
1363                " I I I I I I
1364                - 2 5 8 2 1 5
1365                + 2 5 8 2 2 5
1366                - 2 5 8 2 2 5
1367                + 2 5 8 . . ."
1368            )
1369        );
1370
1371        // push the 3rd left chunk
1372        tx_l.push_chunk(chunk_l3);
1373        let chunk = hash_join.next_unwrap_ready_chunk()?;
1374        assert_eq!(
1375            chunk,
1376            StreamChunk::from_pretty(
1377                " I I I I I I
1378                + 6 8 9 6 1 8"
1379            )
1380        );
1381
1382        // push the 3rd right chunk
1383        tx_r.push_chunk(chunk_r3);
1384        let chunk = hash_join.next_unwrap_ready_chunk()?;
1385        assert_eq!(
1386            chunk,
1387            StreamChunk::from_pretty(
1388                " I I I I I I
1389                - 6 8 9 6 1 8
1390                + 6 8 9 . . ."
1391            )
1392        );
1393        Ok(())
1394    }
1395}