risingwave_stream/executor/
asof_join.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! `AsOf` join executor with optional LRU cache.
16//!
17//! When `use_cache` is false, the executor directly queries the state table.
18//! When `use_cache` is true, the `AsOfJoinHashMap` maintains an LRU cache.
19//! Controlled by the session variable `streaming_asof_join_use_cache`.
20
21use std::cmp::Ordering;
22use std::collections::BTreeMap;
23use std::ops::Bound;
24use std::time::Duration;
25
26use either::Either;
27use itertools::Itertools;
28use multimap::MultiMap;
29use risingwave_common::array::Op;
30use risingwave_common::row::RowExt;
31use risingwave_common::util::epoch::EpochPair;
32use risingwave_common::util::sort_util::cmp_rows_ascending;
33use tokio::time::Instant;
34
35use super::barrier_align::*;
36use super::join::asof_join::*;
37use super::join::builder::JoinStreamChunkBuilder;
38use super::join::row::JoinRow;
39use super::join::*;
40use super::watermark::*;
41use crate::executor::join::builder::JoinChunkBuilder;
42use crate::executor::prelude::*;
43
44pub struct JoinParams {
45    /// Indices of the join keys
46    pub join_key_indices: Vec<usize>,
47    /// Indices of the input pk after dedup
48    pub deduped_pk_indices: Vec<usize>,
49}
50
51impl JoinParams {
52    pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
53        Self {
54            join_key_indices,
55            deduped_pk_indices,
56        }
57    }
58}
59
60struct JoinSide<S: StateStore, E: AsOfRowEncoding> {
61    /// Store all data from a one side stream
62    ht: AsOfJoinHashMap<S, E>,
63    /// Indices of the join key columns
64    join_key_indices: Vec<usize>,
65    /// The data type of all columns without degree.
66    all_data_types: Vec<DataType>,
67    /// The mapping from input indices of a side to output columns.
68    i2o_mapping: Vec<(usize, usize)>,
69    i2o_mapping_indexed: MultiMap<usize, usize>,
70    /// The index of the inequality column.
71    inequal_key_idx: usize,
72}
73
74impl<S: StateStore, E: AsOfRowEncoding> std::fmt::Debug for JoinSide<S, E> {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.debug_struct("JoinSide")
77            .field("join_key_indices", &self.join_key_indices)
78            .field("col_types", &self.all_data_types)
79            .field("i2o_mapping", &self.i2o_mapping)
80            .finish()
81    }
82}
83
84impl<S: StateStore, E: AsOfRowEncoding> JoinSide<S, E> {
85    pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
86        self.ht.init(epoch).await
87    }
88}
89
90/// `AsOfJoinExecutor` for streaming as-of joins.
91///
92/// This executor uses `AsOfJoinHashMap` to support both execution modes:
93/// it can either maintain an LRU cache or query the state table directly,
94/// depending on the `use_cache` configuration.
95pub struct AsOfJoinExecutor<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> {
96    ctx: ActorContextRef,
97    info: ExecutorInfo,
98
99    /// Left input executor
100    input_l: Option<Executor>,
101    /// Right input executor
102    input_r: Option<Executor>,
103    /// The data types of the formed new columns
104    actual_output_data_types: Vec<DataType>,
105    /// The parameters of the left join executor
106    side_l: JoinSide<S, E>,
107    /// The parameters of the right join executor
108    side_r: JoinSide<S, E>,
109
110    /// Whether nulls are considered equal for each join key column
111    null_safe: Vec<bool>,
112
113    metrics: Arc<StreamingMetrics>,
114    /// The maximum size of the chunk produced by executor at a time
115    chunk_size: usize,
116    /// watermark column index -> `BufferedWatermarks`
117    watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
118    /// `AsOf` join description
119    asof_desc: AsOfDesc,
120    /// Row counter for periodic cache eviction
121    cnt_rows_received: u32,
122    /// Number of processed rows between periodic manual evictions of the join cache.
123    join_cache_evict_interval_rows: u32,
124    /// Threshold for logging high join amplification warnings.
125    high_join_amplification_threshold: usize,
126}
127
128impl<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> std::fmt::Debug
129    for AsOfJoinExecutor<S, T, E>
130{
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        f.debug_struct("AsOfJoinExecutor")
133            .field("join_type", &T)
134            .field("input_left", &self.input_l.as_ref().unwrap().identity())
135            .field("input_right", &self.input_r.as_ref().unwrap().identity())
136            .field("side_l", &self.side_l)
137            .field("side_r", &self.side_r)
138            .field("stream_key", &self.info.stream_key)
139            .field("schema", &self.info.schema)
140            .field("actual_output_data_types", &self.actual_output_data_types)
141            .finish()
142    }
143}
144
145impl<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> Execute
146    for AsOfJoinExecutor<S, T, E>
147{
148    fn execute(self: Box<Self>) -> BoxedMessageStream {
149        self.into_stream().boxed()
150    }
151}
152
153struct EqJoinArgs<'a, S: StateStore, E: AsOfRowEncoding> {
154    ctx: &'a ActorContextRef,
155    side_l: &'a mut JoinSide<S, E>,
156    side_r: &'a mut JoinSide<S, E>,
157    null_safe: &'a [bool],
158    asof_desc: &'a AsOfDesc,
159    actual_output_data_types: &'a [DataType],
160    chunk: StreamChunk,
161    chunk_size: usize,
162    cnt_rows_received: &'a mut u32,
163    join_cache_evict_interval_rows: u32,
164    high_join_amplification_threshold: usize,
165}
166
167impl<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> AsOfJoinExecutor<S, T, E> {
168    #[expect(clippy::too_many_arguments)]
169    pub fn new(
170        ctx: ActorContextRef,
171        info: ExecutorInfo,
172        input_l: Executor,
173        input_r: Executor,
174        params_l: JoinParams,
175        params_r: JoinParams,
176        null_safe: Vec<bool>,
177        output_indices: Vec<usize>,
178        state_table_l: StateTable<S>,
179        state_table_r: StateTable<S>,
180        watermark_epoch: AtomicU64Ref,
181        metrics: Arc<StreamingMetrics>,
182        chunk_size: usize,
183        asof_desc: AsOfDesc,
184        use_cache: bool,
185        high_join_amplification_threshold: usize,
186    ) -> Self {
187        let join_cache_evict_interval_rows = ctx
188            .config
189            .developer
190            .join_hash_map_evict_interval_rows
191            .max(1);
192        let cache_epoch = if use_cache {
193            Some(watermark_epoch)
194        } else {
195            None
196        };
197        let schema_fields = [
198            input_l.schema().fields.clone(),
199            input_r.schema().fields.clone(),
200        ]
201        .concat();
202
203        let original_output_data_types = schema_fields
204            .iter()
205            .map(|field| field.data_type())
206            .collect_vec();
207        let actual_output_data_types = output_indices
208            .iter()
209            .map(|&idx| original_output_data_types[idx].clone())
210            .collect_vec();
211
212        let state_all_data_types_l = input_l.schema().data_types();
213        let state_all_data_types_r = input_r.schema().data_types();
214
215        let state_join_key_indices_l = params_l.join_key_indices;
216        let state_join_key_indices_r = params_r.join_key_indices;
217
218        let join_key_data_types_l = state_join_key_indices_l
219            .iter()
220            .map(|idx| state_all_data_types_l[*idx].clone())
221            .collect_vec();
222
223        let join_key_data_types_r = state_join_key_indices_r
224            .iter()
225            .map(|idx| state_all_data_types_r[*idx].clone())
226            .collect_vec();
227
228        assert_eq!(join_key_data_types_l, join_key_data_types_r);
229
230        let (left_to_output, right_to_output) = {
231            let (left_len, right_len) = if is_left_semi_or_anti(T) {
232                (state_all_data_types_l.len(), 0usize)
233            } else if is_right_semi_or_anti(T) {
234                (0usize, state_all_data_types_r.len())
235            } else {
236                (state_all_data_types_l.len(), state_all_data_types_r.len())
237            };
238            JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
239        };
240
241        let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
242        let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
243
244        let watermark_buffers = BTreeMap::new();
245
246        let inequal_key_idx_l = asof_desc.left_idx;
247        let inequal_key_idx_r = asof_desc.right_idx;
248
249        Self {
250            ctx: ctx.clone(),
251            info,
252            input_l: Some(input_l),
253            input_r: Some(input_r),
254            actual_output_data_types,
255            null_safe,
256            side_l: JoinSide {
257                ht: AsOfJoinHashMap::new(
258                    state_join_key_indices_l.clone(),
259                    state_table_l,
260                    params_l.deduped_pk_indices,
261                    inequal_key_idx_l,
262                    state_all_data_types_l.clone(),
263                    cache_epoch.clone(),
264                    metrics.clone(),
265                    ctx.id,
266                    ctx.fragment_id,
267                    "left",
268                ),
269                join_key_indices: state_join_key_indices_l,
270                all_data_types: state_all_data_types_l,
271                i2o_mapping: left_to_output,
272                i2o_mapping_indexed: l2o_indexed,
273                inequal_key_idx: inequal_key_idx_l,
274            },
275            side_r: JoinSide {
276                ht: AsOfJoinHashMap::new(
277                    state_join_key_indices_r.clone(),
278                    state_table_r,
279                    params_r.deduped_pk_indices,
280                    inequal_key_idx_r,
281                    state_all_data_types_r.clone(),
282                    cache_epoch,
283                    metrics.clone(),
284                    ctx.id,
285                    ctx.fragment_id,
286                    "right",
287                ),
288                join_key_indices: state_join_key_indices_r,
289                all_data_types: state_all_data_types_r,
290                i2o_mapping: right_to_output,
291                i2o_mapping_indexed: r2o_indexed,
292                inequal_key_idx: inequal_key_idx_r,
293            },
294            metrics,
295            chunk_size,
296            watermark_buffers,
297            asof_desc,
298            cnt_rows_received: 0,
299            join_cache_evict_interval_rows,
300            high_join_amplification_threshold,
301        }
302    }
303
304    /// Periodically evict the LRU cache to prevent memory buildup between barriers.
305    fn evict_cache(
306        side_l: &mut JoinSide<S, E>,
307        side_r: &mut JoinSide<S, E>,
308        cnt_rows_received: &mut u32,
309        join_cache_evict_interval_rows: u32,
310    ) {
311        *cnt_rows_received += 1;
312        if *cnt_rows_received >= join_cache_evict_interval_rows {
313            side_l.ht.evict_cache();
314            side_r.ht.evict_cache();
315            *cnt_rows_received = 0;
316        }
317    }
318
319    #[try_stream(ok = Message, error = StreamExecutorError)]
320    async fn into_stream(mut self) {
321        let input_l = self.input_l.take().unwrap();
322        let input_r = self.input_r.take().unwrap();
323        let aligned_stream = barrier_align(
324            input_l.execute(),
325            input_r.execute(),
326            self.ctx.id,
327            self.ctx.fragment_id,
328            self.metrics.clone(),
329            "Join",
330        );
331        pin_mut!(aligned_stream);
332        let actor_id = self.ctx.id;
333
334        let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
335        let first_epoch = barrier.epoch;
336        yield Message::Barrier(barrier);
337        self.side_l.init(first_epoch).await?;
338        self.side_r.init(first_epoch).await?;
339
340        let actor_id_str = self.ctx.id.to_string();
341        let fragment_id_str = self.ctx.fragment_id.to_string();
342
343        let join_actor_input_waiting_duration_ns = self
344            .metrics
345            .join_actor_input_waiting_duration_ns
346            .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
347        let left_join_match_duration_ns = self
348            .metrics
349            .join_match_duration_ns
350            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
351        let right_join_match_duration_ns = self
352            .metrics
353            .join_match_duration_ns
354            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
355
356        let barrier_join_match_duration_ns = self
357            .metrics
358            .join_match_duration_ns
359            .with_guarded_label_values(&[
360                actor_id_str.as_str(),
361                fragment_id_str.as_str(),
362                "barrier",
363            ]);
364
365        let left_join_cached_entry_count = self
366            .metrics
367            .join_cached_entry_count
368            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
369
370        let right_join_cached_entry_count = self
371            .metrics
372            .join_cached_entry_count
373            .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
374
375        let mut start_time = Instant::now();
376
377        while let Some(msg) = aligned_stream
378            .next()
379            .instrument_await("hash_join_barrier_align")
380            .await
381        {
382            join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
383            match msg? {
384                AlignedMessage::WatermarkLeft(watermark) => {
385                    for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
386                        yield Message::Watermark(watermark_to_emit);
387                    }
388                }
389                AlignedMessage::WatermarkRight(watermark) => {
390                    for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
391                        yield Message::Watermark(watermark_to_emit);
392                    }
393                }
394                AlignedMessage::Left(chunk) => {
395                    let mut left_time = Duration::from_nanos(0);
396                    let mut left_start_time = Instant::now();
397                    #[for_await]
398                    for chunk in Self::eq_join_left(EqJoinArgs {
399                        ctx: &self.ctx,
400                        side_l: &mut self.side_l,
401                        side_r: &mut self.side_r,
402                        null_safe: &self.null_safe,
403                        asof_desc: &self.asof_desc,
404                        actual_output_data_types: &self.actual_output_data_types,
405                        chunk,
406                        chunk_size: self.chunk_size,
407                        cnt_rows_received: &mut self.cnt_rows_received,
408                        join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
409                        high_join_amplification_threshold: self.high_join_amplification_threshold,
410                    }) {
411                        left_time += left_start_time.elapsed();
412                        yield Message::Chunk(chunk?);
413                        left_start_time = Instant::now();
414                    }
415                    left_time += left_start_time.elapsed();
416                    left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
417                    self.try_flush_data().await?;
418                }
419                AlignedMessage::Right(chunk) => {
420                    let mut right_time = Duration::from_nanos(0);
421                    let mut right_start_time = Instant::now();
422                    #[for_await]
423                    for chunk in Self::eq_join_right(EqJoinArgs {
424                        ctx: &self.ctx,
425                        side_l: &mut self.side_l,
426                        side_r: &mut self.side_r,
427                        null_safe: &self.null_safe,
428                        asof_desc: &self.asof_desc,
429                        actual_output_data_types: &self.actual_output_data_types,
430                        chunk,
431                        chunk_size: self.chunk_size,
432                        cnt_rows_received: &mut self.cnt_rows_received,
433                        join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
434                        high_join_amplification_threshold: self.high_join_amplification_threshold,
435                    }) {
436                        right_time += right_start_time.elapsed();
437                        yield Message::Chunk(chunk?);
438                        right_start_time = Instant::now();
439                    }
440                    right_time += right_start_time.elapsed();
441                    right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
442                    self.try_flush_data().await?;
443                }
444                AlignedMessage::Barrier(barrier) => {
445                    let barrier_start_time = Instant::now();
446                    let (left_post_commit, right_post_commit) =
447                        self.flush_data(barrier.epoch).await?;
448
449                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
450                    yield Message::Barrier(barrier);
451
452                    // Update the vnode bitmap for state tables of both sides if asked.
453                    right_post_commit
454                        .post_yield_barrier(update_vnode_bitmap.clone())
455                        .await?;
456                    if left_post_commit
457                        .post_yield_barrier(update_vnode_bitmap)
458                        .await?
459                        .unwrap_or(false)
460                    {
461                        self.watermark_buffers
462                            .values_mut()
463                            .for_each(|buffers| buffers.clear());
464                    }
465
466                    // Report metrics of cached join rows/entries
467                    for (join_cached_entry_count, ht) in [
468                        (&left_join_cached_entry_count, &self.side_l.ht),
469                        (&right_join_cached_entry_count, &self.side_r.ht),
470                    ] {
471                        join_cached_entry_count.set(ht.entry_count() as i64);
472                    }
473
474                    barrier_join_match_duration_ns
475                        .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
476                }
477            }
478            start_time = Instant::now();
479        }
480    }
481
482    async fn flush_data(
483        &mut self,
484        epoch: EpochPair,
485    ) -> StreamExecutorResult<(
486        AsOfJoinHashMapPostCommit<'_, S, E>,
487        AsOfJoinHashMapPostCommit<'_, S, E>,
488    )> {
489        let left = self.side_l.ht.flush(epoch).await?;
490        let right = self.side_r.ht.flush(epoch).await?;
491        Ok((left, right))
492    }
493
494    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
495        self.side_l.ht.try_flush().await?;
496        self.side_r.ht.try_flush().await?;
497        Ok(())
498    }
499
500    fn handle_watermark(
501        &mut self,
502        side: SideTypePrimitive,
503        watermark: Watermark,
504    ) -> StreamExecutorResult<Vec<Watermark>> {
505        let (side_update, side_match) = if side == SideType::Left {
506            (&mut self.side_l, &mut self.side_r)
507        } else {
508            (&mut self.side_r, &mut self.side_l)
509        };
510
511        // State cleaning
512        if side_update.join_key_indices[0] == watermark.col_idx {
513            side_match.ht.update_watermark(watermark.val.clone());
514        }
515
516        // Select watermarks to yield.
517        let wm_in_jk = side_update
518            .join_key_indices
519            .iter()
520            .positions(|idx| *idx == watermark.col_idx);
521        let mut watermarks_to_emit = vec![];
522        for idx in wm_in_jk {
523            let buffers = self
524                .watermark_buffers
525                .entry(idx)
526                .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
527            if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
528                let empty_indices = vec![];
529                let output_indices = side_update
530                    .i2o_mapping_indexed
531                    .get_vec(&side_update.join_key_indices[idx])
532                    .unwrap_or(&empty_indices)
533                    .iter()
534                    .chain(
535                        side_match
536                            .i2o_mapping_indexed
537                            .get_vec(&side_match.join_key_indices[idx])
538                            .unwrap_or(&empty_indices),
539                    );
540                for output_idx in output_indices {
541                    watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
542                }
543            };
544        }
545        Ok(watermarks_to_emit)
546    }
547
548    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
549    async fn eq_join_left(args: EqJoinArgs<'_, S, E>) {
550        let EqJoinArgs {
551            ctx: _,
552            side_l,
553            side_r,
554            null_safe,
555            asof_desc,
556            actual_output_data_types,
557            chunk,
558            chunk_size,
559            cnt_rows_received,
560            join_cache_evict_interval_rows,
561            high_join_amplification_threshold: _,
562        } = args;
563
564        let (side_update, side_match) = (side_l, side_r);
565
566        let mut join_chunk_builder =
567            JoinChunkBuilder::<T, { SideType::Left }>::new(JoinStreamChunkBuilder::new(
568                chunk_size,
569                actual_output_data_types.to_vec(),
570                side_update.i2o_mapping.clone(),
571                side_match.i2o_mapping.clone(),
572            ));
573
574        // The inequality key is always a single column; wrap in an array for `project()`.
575        let inequal_key_idx_update = [side_update.inequal_key_idx];
576
577        for r in chunk.rows_with_holes() {
578            let Some((op, row)) = r else {
579                continue;
580            };
581            Self::evict_cache(
582                side_update,
583                side_match,
584                cnt_rows_received,
585                join_cache_evict_interval_rows,
586            );
587
588            // Check null-safe: if any non-null-safe join key column is NULL, skip matching.
589            // Rows that can never match are not stored in state (consistent with old behavior).
590            let join_key_null_not_safe = side_update
591                .join_key_indices
592                .iter()
593                .zip_eq(null_safe.iter())
594                .any(|(idx, ns)| !ns && row.datum_at(*idx).is_none());
595            if join_key_null_not_safe {
596                match op {
597                    Op::Insert | Op::UpdateInsert => {
598                        if let Some(chunk) =
599                            join_chunk_builder.forward_if_not_matched(Op::Insert, row)
600                        {
601                            yield chunk;
602                        }
603                    }
604                    Op::Delete | Op::UpdateDelete => {
605                        if let Some(chunk) =
606                            join_chunk_builder.forward_if_not_matched(Op::Delete, row)
607                        {
608                            yield chunk;
609                        }
610                    }
611                }
612                continue;
613            }
614
615            let join_key = row.project(&side_update.join_key_indices);
616
617            let inequal_key_is_null = side_update.ht.check_inequal_key_null(&row);
618            let inequal_key = row.project(&inequal_key_idx_update);
619
620            if !inequal_key_is_null {
621                let matched_row_by_inequality = match asof_desc.inequality_type {
622                    AsOfInequalityType::Lt => {
623                        side_match
624                            .ht
625                            .lower_bound_by_inequality_with_jk_prefix(
626                                &join_key,
627                                Bound::Excluded(&inequal_key),
628                            )
629                            .await
630                    }
631                    AsOfInequalityType::Le => {
632                        side_match
633                            .ht
634                            .lower_bound_by_inequality_with_jk_prefix(
635                                &join_key,
636                                Bound::Included(&inequal_key),
637                            )
638                            .await
639                    }
640                    AsOfInequalityType::Gt => {
641                        side_match
642                            .ht
643                            .upper_bound_by_inequality_with_jk_prefix(
644                                &join_key,
645                                Bound::Excluded(&inequal_key),
646                            )
647                            .await
648                    }
649                    AsOfInequalityType::Ge => {
650                        side_match
651                            .ht
652                            .upper_bound_by_inequality_with_jk_prefix(
653                                &join_key,
654                                Bound::Included(&inequal_key),
655                            )
656                            .await
657                    }
658                }?
659                .map(|row| JoinRow::new(row, 0));
660                match op {
661                    Op::Insert | Op::UpdateInsert => {
662                        if let Some(matched_row) = matched_row_by_inequality {
663                            if let Some(chunk) =
664                                join_chunk_builder.with_match_on_insert(&row, &matched_row)
665                            {
666                                yield chunk;
667                            }
668                        } else if let Some(chunk) =
669                            join_chunk_builder.forward_if_not_matched(Op::Insert, row)
670                        {
671                            yield chunk;
672                        }
673                        side_update.ht.insert(row)?;
674                    }
675                    Op::Delete | Op::UpdateDelete => {
676                        if let Some(matched_row) = matched_row_by_inequality {
677                            if let Some(chunk) =
678                                join_chunk_builder.with_match_on_delete(&row, &matched_row)
679                            {
680                                yield chunk;
681                            }
682                        } else if let Some(chunk) =
683                            join_chunk_builder.forward_if_not_matched(Op::Delete, row)
684                        {
685                            yield chunk;
686                        }
687                        side_update.ht.delete(row)?;
688                    }
689                }
690            } else {
691                // Inequality key is NULL, which can never satisfy the inequality predicate,
692                // so we skip storing and only forward as unmatched for left outer join.
693                match op {
694                    Op::Insert | Op::UpdateInsert => {
695                        if let Some(chunk) =
696                            join_chunk_builder.forward_if_not_matched(Op::Insert, row)
697                        {
698                            yield chunk;
699                        }
700                    }
701                    Op::Delete | Op::UpdateDelete => {
702                        if let Some(chunk) =
703                            join_chunk_builder.forward_if_not_matched(Op::Delete, row)
704                        {
705                            yield chunk;
706                        }
707                    }
708                }
709            }
710        }
711        if let Some(chunk) = join_chunk_builder.take() {
712            yield chunk;
713        }
714    }
715
716    fn cmp_pk_rows(pk1: &impl Row, pk2: &impl Row) -> Ordering {
717        cmp_rows_ascending(pk1, pk2)
718    }
719
720    #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
721    async fn eq_join_right(args: EqJoinArgs<'_, S, E>) {
722        let EqJoinArgs {
723            ctx,
724            side_l,
725            side_r,
726            null_safe,
727            asof_desc,
728            actual_output_data_types,
729            chunk,
730            chunk_size,
731            cnt_rows_received,
732            join_cache_evict_interval_rows,
733            high_join_amplification_threshold,
734        } = args;
735
736        let (side_update, side_match) = (side_r, side_l);
737
738        let mut join_chunk_builder = JoinStreamChunkBuilder::new(
739            chunk_size,
740            actual_output_data_types.to_vec(),
741            side_update.i2o_mapping.clone(),
742            side_match.i2o_mapping.clone(),
743        );
744
745        let join_matched_join_keys = ctx
746            .streaming_metrics
747            .join_matched_join_keys
748            .with_guarded_label_values(&[
749                &ctx.id.to_string(),
750                &ctx.fragment_id.to_string(),
751                &side_update.ht.table_id().to_string(),
752            ]);
753
754        // The inequality key is always a single column; wrap in an array for `project()`.
755        let inequal_key_idx_update = [side_update.inequal_key_idx];
756
757        for r in chunk.rows_with_holes() {
758            let Some((op, row)) = r else {
759                continue;
760            };
761            Self::evict_cache(
762                side_update,
763                side_match,
764                cnt_rows_received,
765                join_cache_evict_interval_rows,
766            );
767
768            // Check null-safe: if any non-null-safe join key column is NULL, skip.
769            // Rows that can never match are not stored in state (consistent with old behavior).
770            let join_key_null_not_safe = side_update
771                .join_key_indices
772                .iter()
773                .zip_eq(null_safe.iter())
774                .any(|(idx, ns)| !ns && row.datum_at(*idx).is_none());
775            if join_key_null_not_safe {
776                continue;
777            }
778
779            let join_key = row.project(&side_update.join_key_indices);
780
781            let inequal_key_is_null = side_update.ht.check_inequal_key_null(&row);
782            let inequal_key = row.project(&inequal_key_idx_update);
783
784            let mut join_matched_rows_cnt = 0;
785
786            if !inequal_key_is_null {
787                let (row_to_delete_r, row_to_insert_r) = {
788                    let (first_row, second_row) = side_update
789                        .ht
790                        .first_two_by_inequality_with_jk_prefix(&join_key, &inequal_key)
791                        .await?;
792                    if let Some(first_row) = first_row {
793                        let row_pk = side_update.ht.get_pk_from_row(row);
794                        let first_pk = side_update.ht.get_pk_from_row(&first_row).to_owned_row();
795                        match op {
796                            Op::Insert | Op::UpdateInsert => {
797                                // If there are multiple rows match the inequality key in the right table, we use one with smallest pk.
798                                if Self::cmp_pk_rows(&first_pk, &row_pk) == Ordering::Greater {
799                                    (Some(Either::Left(first_row)), Some(Either::Right(row)))
800                                } else {
801                                    // No affected row in the right table.
802                                    (None, None)
803                                }
804                            }
805                            Op::Delete | Op::UpdateDelete => {
806                                if Self::cmp_pk_rows(&first_pk, &row_pk) == Ordering::Equal {
807                                    if let Some(second_row) = second_row {
808                                        (Some(Either::Right(row)), Some(Either::Left(second_row)))
809                                    } else {
810                                        (Some(Either::Right(row)), None)
811                                    }
812                                } else {
813                                    // No affected row in the right table.
814                                    (None, None)
815                                }
816                            }
817                        }
818                    } else {
819                        match op {
820                            // Decide the row_to_delete later
821                            Op::Insert | Op::UpdateInsert => (None, Some(Either::Right(row))),
822                            // Decide the row_to_insert later
823                            Op::Delete | Op::UpdateDelete => (Some(Either::Right(row)), None),
824                        }
825                    }
826                };
827                // 4 cases for row_to_delete_r and row_to_insert_r:
828                // 1. Some(_), Some(_): delete row_to_delete_r and insert row_to_insert_r
829                // 2. None, Some(_)   : row_to_delete to be decided by the nearest inequality key
830                // 3. Some(_), None   : row_to_insert to be decided by the nearest inequality key
831                // 4. None, None      : do nothing
832                if row_to_delete_r.is_none() && row_to_insert_r.is_none() {
833                    // no row to delete or insert.
834                } else {
835                    let prev_inequality_key = side_update
836                        .ht
837                        .upper_bound_by_inequality_with_jk_prefix(
838                            &join_key,
839                            Bound::Excluded(&inequal_key),
840                        )
841                        .await?
842                        .map(|r| r.project(&inequal_key_idx_update));
843                    let next_inequality_key = side_update
844                        .ht
845                        .lower_bound_by_inequality_with_jk_prefix(
846                            &join_key,
847                            Bound::Excluded(&inequal_key),
848                        )
849                        .await?
850                        .map(|r| r.project(&inequal_key_idx_update));
851
852                    let affected_inequality_key_r = match asof_desc.inequality_type {
853                        AsOfInequalityType::Lt | AsOfInequalityType::Le => &next_inequality_key,
854                        AsOfInequalityType::Gt | AsOfInequalityType::Ge => &prev_inequality_key,
855                    };
856                    let affected_row_r =
857                        if let Some(affected_inequality_key_r) = affected_inequality_key_r {
858                            side_update
859                                .ht
860                                .first_by_inequality_with_jk_prefix(
861                                    &join_key,
862                                    &affected_inequality_key_r,
863                                )
864                                .await?
865                        } else {
866                            None
867                        }
868                        .map(Either::Left);
869
870                    let (row_to_delete_r, row_to_insert_r) =
871                        match (&row_to_delete_r, &row_to_insert_r) {
872                            (Some(_), Some(_)) => (row_to_delete_r, row_to_insert_r),
873                            (None, Some(_)) => (affected_row_r, row_to_insert_r),
874                            (Some(_), None) => (row_to_delete_r, affected_row_r),
875                            (None, None) => unreachable!(),
876                        };
877                    let range = match asof_desc.inequality_type {
878                        AsOfInequalityType::Lt => (
879                            prev_inequality_key
880                                .map(Either::Left)
881                                .map_or_else(|| Bound::Unbounded, Bound::Included),
882                            Bound::Excluded(Either::Right(&inequal_key)),
883                        ),
884                        AsOfInequalityType::Le => (
885                            prev_inequality_key
886                                .map(Either::Left)
887                                .map_or_else(|| Bound::Unbounded, Bound::Excluded),
888                            Bound::Included(Either::Right(&inequal_key)),
889                        ),
890                        AsOfInequalityType::Gt => (
891                            Bound::Excluded(Either::Right(&inequal_key)),
892                            next_inequality_key
893                                .map(Either::Left)
894                                .map_or_else(|| Bound::Unbounded, Bound::Included),
895                        ),
896                        AsOfInequalityType::Ge => (
897                            Bound::Included(Either::Right(&inequal_key)),
898                            next_inequality_key
899                                .map(Either::Left)
900                                .map_or_else(|| Bound::Unbounded, Bound::Excluded),
901                        ),
902                    };
903
904                    let rows_l_stream = side_match
905                        .ht
906                        .range_by_inequality_with_jk_prefix(&join_key, &range)
907                        .await?;
908                    #[for_await]
909                    for row_l in rows_l_stream {
910                        let row_l = row_l?;
911                        join_matched_rows_cnt += 1;
912                        if let Some(row_to_delete_r) = &row_to_delete_r {
913                            if let Some(chunk) =
914                                join_chunk_builder.append_row(Op::Delete, row_to_delete_r, &row_l)
915                            {
916                                yield chunk;
917                            }
918                        } else if is_as_of_left_outer(T)
919                            && let Some(chunk) =
920                                join_chunk_builder.append_row_matched(Op::Delete, &row_l)
921                        {
922                            yield chunk;
923                        }
924                        if let Some(row_to_insert_r) = &row_to_insert_r {
925                            if let Some(chunk) =
926                                join_chunk_builder.append_row(Op::Insert, row_to_insert_r, &row_l)
927                            {
928                                yield chunk;
929                            }
930                        } else if is_as_of_left_outer(T)
931                            && let Some(chunk) =
932                                join_chunk_builder.append_row_matched(Op::Insert, &row_l)
933                        {
934                            yield chunk;
935                        }
936                    }
937                }
938
939                match op {
940                    Op::Insert | Op::UpdateInsert => {
941                        side_update.ht.insert(row)?;
942                    }
943                    Op::Delete | Op::UpdateDelete => {
944                        side_update.ht.delete(row)?;
945                    }
946                }
947            } else {
948                // Inequality key is NULL, which can never satisfy the inequality predicate,
949                // so we skip storing. No action needed on the right side.
950            }
951            join_matched_join_keys.observe(join_matched_rows_cnt as _);
952            if join_matched_rows_cnt > high_join_amplification_threshold {
953                let join_key = row.project(&side_update.join_key_indices);
954                tracing::warn!(target: "high_join_amplification",
955                    matched_rows_len = join_matched_rows_cnt,
956                    update_table_id = %side_update.ht.table_id(),
957                    match_table_id = %side_match.ht.table_id(),
958                    join_key = ?join_key,
959                    actor_id = %ctx.id,
960                    fragment_id = %ctx.fragment_id,
961                    "large rows matched for join key when AsOf join updating right side",
962                );
963            }
964        }
965        if let Some(chunk) = join_chunk_builder.take() {
966            yield chunk;
967        }
968    }
969}
970
971#[cfg(test)]
972mod tests {
973    use std::sync::atomic::AtomicU64;
974
975    use risingwave_common::array::*;
976    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
977    use risingwave_common::util::epoch::test_epoch;
978    use risingwave_common::util::sort_util::OrderType;
979    use risingwave_storage::memory::MemoryStateStore;
980
981    use super::*;
982    use crate::common::table::test_utils::gen_pbtable;
983    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
984
985    async fn create_in_memory_state_table(
986        mem_state: MemoryStateStore,
987        data_types: &[DataType],
988        order_types: &[OrderType],
989        pk_indices: &[usize],
990        table_id: u32,
991    ) -> StateTable<MemoryStateStore> {
992        let column_descs = data_types
993            .iter()
994            .enumerate()
995            .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
996            .collect_vec();
997        StateTable::from_table_catalog(
998            &gen_pbtable(
999                TableId::new(table_id),
1000                column_descs,
1001                order_types.to_vec(),
1002                pk_indices.to_vec(),
1003                0,
1004            ),
1005            mem_state.clone(),
1006            None,
1007        )
1008        .await
1009    }
1010
1011    async fn create_executor<const T: AsOfJoinTypePrimitive>(
1012        asof_desc: AsOfDesc,
1013        use_cache: bool,
1014    ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1015        let schema = Schema {
1016            fields: vec![
1017                Field::unnamed(DataType::Int64), // join key
1018                Field::unnamed(DataType::Int64),
1019                Field::unnamed(DataType::Int64),
1020            ],
1021        };
1022        let (tx_l, source_l) = MockSource::channel();
1023        let source_l = source_l.into_executor(schema.clone(), vec![1]);
1024        let (tx_r, source_r) = MockSource::channel();
1025        let source_r = source_r.into_executor(schema, vec![1]);
1026        let params_l = JoinParams::new(vec![0], vec![1]);
1027        let params_r = JoinParams::new(vec![0], vec![1]);
1028
1029        let mem_state = MemoryStateStore::new();
1030
1031        let state_l = create_in_memory_state_table(
1032            mem_state.clone(),
1033            &[DataType::Int64, DataType::Int64, DataType::Int64],
1034            &[
1035                OrderType::ascending(),
1036                OrderType::ascending(),
1037                OrderType::ascending(),
1038            ],
1039            &[0, asof_desc.left_idx, 1],
1040            0,
1041        )
1042        .await;
1043
1044        let state_r = create_in_memory_state_table(
1045            mem_state,
1046            &[DataType::Int64, DataType::Int64, DataType::Int64],
1047            &[
1048                OrderType::ascending(),
1049                OrderType::ascending(),
1050                OrderType::ascending(),
1051            ],
1052            &[0, asof_desc.right_idx, 1],
1053            1,
1054        )
1055        .await;
1056
1057        let schema: Schema = [source_l.schema().fields(), source_r.schema().fields()]
1058            .concat()
1059            .into_iter()
1060            .collect();
1061        let schema_len = schema.len();
1062        let info = ExecutorInfo::for_test(schema, vec![1], "AsOfJoinExecutor".to_owned(), 0);
1063
1064        let executor = AsOfJoinExecutor::<MemoryStateStore, T, AsOfCpuEncoding>::new(
1065            ActorContext::for_test(123),
1066            info,
1067            source_l,
1068            source_r,
1069            params_l,
1070            params_r,
1071            vec![false],
1072            (0..schema_len).collect_vec(),
1073            state_l,
1074            state_r,
1075            Arc::new(AtomicU64::new(0)),
1076            Arc::new(StreamingMetrics::unused()),
1077            1024,
1078            asof_desc,
1079            use_cache,
1080            2048, // high_join_amplification_threshold
1081        );
1082        (tx_l, tx_r, executor.boxed().execute())
1083    }
1084
1085    #[tokio::test]
1086    async fn test_asof_inner_join() -> StreamExecutorResult<()> {
1087        test_asof_inner_join_impl(false).await
1088    }
1089
1090    #[tokio::test]
1091    async fn test_asof_inner_join_with_cache() -> StreamExecutorResult<()> {
1092        test_asof_inner_join_impl(true).await
1093    }
1094
1095    async fn test_asof_inner_join_impl(use_cache: bool) -> StreamExecutorResult<()> {
1096        let asof_desc = AsOfDesc {
1097            left_idx: 0,
1098            right_idx: 2,
1099            inequality_type: AsOfInequalityType::Lt,
1100        };
1101
1102        let chunk_l1 = StreamChunk::from_pretty(
1103            "  I I I
1104             + 1 4 7
1105             + 2 5 8
1106             + 3 6 9",
1107        );
1108        let chunk_l2 = StreamChunk::from_pretty(
1109            "  I I I
1110             + 3 8 1
1111             - 3 8 1",
1112        );
1113        let chunk_r1 = StreamChunk::from_pretty(
1114            "  I I I
1115             + 2 1 7
1116             + 2 2 1
1117             + 2 3 4
1118             + 2 4 2
1119             + 6 1 9
1120             + 6 2 9",
1121        );
1122        let chunk_r2 = StreamChunk::from_pretty(
1123            "  I I I
1124             - 2 3 4",
1125        );
1126        let chunk_r3 = StreamChunk::from_pretty(
1127            "  I I I
1128             + 2 3 3",
1129        );
1130        let chunk_l3 = StreamChunk::from_pretty(
1131            "  I I I
1132             - 2 5 8",
1133        );
1134        let chunk_l4 = StreamChunk::from_pretty(
1135            "  I I I
1136             + 6 3 1
1137             + 6 4 1",
1138        );
1139        let chunk_r4 = StreamChunk::from_pretty(
1140            "  I I I
1141             - 6 1 9",
1142        );
1143
1144        let (mut tx_l, mut tx_r, mut hash_join) =
1145            create_executor::<{ AsOfJoinType::Inner }>(asof_desc, use_cache).await;
1146
1147        // push the init barrier for left and right
1148        tx_l.push_barrier(test_epoch(1), false);
1149        tx_r.push_barrier(test_epoch(1), false);
1150        hash_join.next_unwrap_ready_barrier()?;
1151
1152        // push the 1st left chunk
1153        tx_l.push_chunk(chunk_l1);
1154        hash_join.next_unwrap_pending();
1155
1156        // push the init barrier for left and right
1157        tx_l.push_barrier(test_epoch(2), false);
1158        tx_r.push_barrier(test_epoch(2), false);
1159        hash_join.next_unwrap_ready_barrier()?;
1160
1161        // push the 2nd left chunk
1162        tx_l.push_chunk(chunk_l2);
1163        hash_join.next_unwrap_pending();
1164
1165        // push the 1st right chunk
1166        tx_r.push_chunk(chunk_r1);
1167        let chunk = hash_join.next_unwrap_ready_chunk()?;
1168        assert_eq!(
1169            chunk,
1170            StreamChunk::from_pretty(
1171                " I I I I I I
1172                + 2 5 8 2 1 7
1173                - 2 5 8 2 1 7
1174                + 2 5 8 2 3 4"
1175            )
1176        );
1177
1178        // push the 2nd right chunk
1179        tx_r.push_chunk(chunk_r2);
1180        let chunk = hash_join.next_unwrap_ready_chunk()?;
1181        assert_eq!(
1182            chunk,
1183            StreamChunk::from_pretty(
1184                " I I I I I I
1185                - 2 5 8 2 3 4
1186                + 2 5 8 2 1 7"
1187            )
1188        );
1189
1190        // push the 3rd right chunk
1191        tx_r.push_chunk(chunk_r3);
1192        let chunk = hash_join.next_unwrap_ready_chunk()?;
1193        assert_eq!(
1194            chunk,
1195            StreamChunk::from_pretty(
1196                " I I I I I I
1197                - 2 5 8 2 1 7
1198                + 2 5 8 2 3 3"
1199            )
1200        );
1201
1202        // push the 3rd left chunk
1203        tx_l.push_chunk(chunk_l3);
1204        let chunk = hash_join.next_unwrap_ready_chunk()?;
1205        assert_eq!(
1206            chunk,
1207            StreamChunk::from_pretty(
1208                " I I I I I I
1209                - 2 5 8 2 3 3"
1210            )
1211        );
1212
1213        // push the 4th left chunk
1214        tx_l.push_chunk(chunk_l4);
1215        let chunk = hash_join.next_unwrap_ready_chunk()?;
1216        assert_eq!(
1217            chunk,
1218            StreamChunk::from_pretty(
1219                " I I I I I I
1220                + 6 3 1 6 1 9
1221                + 6 4 1 6 1 9"
1222            )
1223        );
1224
1225        // push the 4th right chunk
1226        tx_r.push_chunk(chunk_r4);
1227        let chunk = hash_join.next_unwrap_ready_chunk()?;
1228        assert_eq!(
1229            chunk,
1230            StreamChunk::from_pretty(
1231                " I I I I I I
1232                - 6 3 1 6 1 9
1233                + 6 3 1 6 2 9
1234                - 6 4 1 6 1 9
1235                + 6 4 1 6 2 9"
1236            )
1237        );
1238
1239        Ok(())
1240    }
1241
1242    #[tokio::test]
1243    async fn test_asof_left_outer_join() -> StreamExecutorResult<()> {
1244        test_asof_left_outer_join_impl(false).await
1245    }
1246
1247    #[tokio::test]
1248    async fn test_asof_left_outer_join_with_cache() -> StreamExecutorResult<()> {
1249        test_asof_left_outer_join_impl(true).await
1250    }
1251
1252    async fn test_asof_left_outer_join_impl(use_cache: bool) -> StreamExecutorResult<()> {
1253        let asof_desc = AsOfDesc {
1254            left_idx: 1,
1255            right_idx: 2,
1256            inequality_type: AsOfInequalityType::Ge,
1257        };
1258
1259        let chunk_l1 = StreamChunk::from_pretty(
1260            "  I I I
1261             + 1 4 7
1262             + 2 5 8
1263             + 3 6 9",
1264        );
1265        let chunk_l2 = StreamChunk::from_pretty(
1266            "  I I I
1267             + 3 8 1
1268             - 3 8 1",
1269        );
1270        let chunk_r1 = StreamChunk::from_pretty(
1271            "  I I I
1272             + 2 3 4
1273             + 2 2 5
1274             + 2 1 5
1275             + 6 1 8
1276             + 6 2 9",
1277        );
1278        let chunk_r2 = StreamChunk::from_pretty(
1279            "  I I I
1280             - 2 3 4
1281             - 2 1 5
1282             - 2 2 5",
1283        );
1284        let chunk_l3 = StreamChunk::from_pretty(
1285            "  I I I
1286             + 6 8 9",
1287        );
1288        let chunk_r3 = StreamChunk::from_pretty(
1289            "  I I I
1290             - 6 1 8",
1291        );
1292
1293        let (mut tx_l, mut tx_r, mut hash_join) =
1294            create_executor::<{ AsOfJoinType::LeftOuter }>(asof_desc, use_cache).await;
1295
1296        // push the init barrier for left and right
1297        tx_l.push_barrier(test_epoch(1), false);
1298        tx_r.push_barrier(test_epoch(1), false);
1299        hash_join.next_unwrap_ready_barrier()?;
1300
1301        // push the 1st left chunk
1302        tx_l.push_chunk(chunk_l1);
1303        let chunk = hash_join.next_unwrap_ready_chunk()?;
1304        assert_eq!(
1305            chunk,
1306            StreamChunk::from_pretty(
1307                " I I I I I I
1308                + 1 4 7 . . .
1309                + 2 5 8 . . .
1310                + 3 6 9 . . ."
1311            )
1312        );
1313
1314        // push the init barrier for left and right
1315        tx_l.push_barrier(test_epoch(2), false);
1316        tx_r.push_barrier(test_epoch(2), false);
1317        hash_join.next_unwrap_ready_barrier()?;
1318
1319        // push the 2nd left chunk
1320        tx_l.push_chunk(chunk_l2);
1321        let chunk = hash_join.next_unwrap_ready_chunk()?;
1322        assert_eq!(
1323            chunk,
1324            StreamChunk::from_pretty(
1325                " I I I I I I
1326                + 3 8 1 . . . D
1327                - 3 8 1 . . . D"
1328            )
1329        );
1330
1331        // push the 1st right chunk
1332        tx_r.push_chunk(chunk_r1);
1333        let chunk = hash_join.next_unwrap_ready_chunk()?;
1334        assert_eq!(
1335            chunk,
1336            StreamChunk::from_pretty(
1337                " I I I I I I
1338                - 2 5 8 . . .
1339                + 2 5 8 2 3 4
1340                - 2 5 8 2 3 4
1341                + 2 5 8 2 2 5
1342                - 2 5 8 2 2 5
1343                + 2 5 8 2 1 5"
1344            )
1345        );
1346
1347        // push the 2nd right chunk
1348        tx_r.push_chunk(chunk_r2);
1349        let chunk = hash_join.next_unwrap_ready_chunk()?;
1350        assert_eq!(
1351            chunk,
1352            StreamChunk::from_pretty(
1353                " I I I I I I
1354                - 2 5 8 2 1 5
1355                + 2 5 8 2 2 5
1356                - 2 5 8 2 2 5
1357                + 2 5 8 . . ."
1358            )
1359        );
1360
1361        // push the 3rd left chunk
1362        tx_l.push_chunk(chunk_l3);
1363        let chunk = hash_join.next_unwrap_ready_chunk()?;
1364        assert_eq!(
1365            chunk,
1366            StreamChunk::from_pretty(
1367                " I I I I I I
1368                + 6 8 9 6 1 8"
1369            )
1370        );
1371
1372        // push the 3rd right chunk
1373        tx_r.push_chunk(chunk_r3);
1374        let chunk = hash_join.next_unwrap_ready_chunk()?;
1375        assert_eq!(
1376            chunk,
1377            StreamChunk::from_pretty(
1378                " I I I I I I
1379                - 6 8 9 6 1 8
1380                + 6 8 9 . . ."
1381            )
1382        );
1383        Ok(())
1384    }
1385}