risingwave_batch_executors/executor/join/
hash_join.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::iter;
17use std::iter::empty;
18use std::marker::PhantomData;
19use std::sync::Arc;
20
21use bytes::Bytes;
22use futures_async_stream::try_stream;
23use itertools::Itertools;
24use risingwave_common::array::{Array, DataChunk, RowRef};
25use risingwave_common::bitmap::{Bitmap, BitmapBuilder, FilterByBitmap};
26use risingwave_common::catalog::Schema;
27use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher};
28use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc};
29use risingwave_common::row::{Row, RowExt, repeat_n};
30use risingwave_common::types::{DataType, Datum, DefaultOrd};
31use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
32use risingwave_common::util::iter_util::ZipEqFast;
33use risingwave_common_estimate_size::EstimateSize;
34use risingwave_expr::expr::{BoxedExpression, Expression, build_from_prost};
35use risingwave_pb::Message;
36use risingwave_pb::batch_plan::plan_node::NodeBody;
37use risingwave_pb::data::DataChunk as PbDataChunk;
38
39use super::{AsOfDesc, AsOfInequalityType, ChunkedData, JoinType, RowId};
40use crate::error::{BatchError, Result};
41use crate::executor::{
42    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
43    WrapStreamExecutor,
44};
45use crate::monitor::BatchSpillMetrics;
46use crate::risingwave_common::hash::NullBitmap;
47use crate::spill::spill_op::SpillBackend::Disk;
48use crate::spill::spill_op::{
49    DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY, SpillBackend, SpillBuildHasher, SpillOp,
50};
51use crate::task::ShutdownToken;
52
53/// Hash Join Executor
54///
55/// High-level idea:
56/// 1. Iterate over the build side (i.e. right table) and build a hash map.
57/// 2. Iterate over the probe side (i.e. left table) and compute the hash value of each row.
58///    Then find the matched build side row for each probe side row in the hash map.
59/// 3. Concatenate the matched pair of probe side row and build side row into a single row and push
60///    it into the data chunk builder.
61/// 4. Yield chunks from the builder.
62pub struct HashJoinExecutor<K> {
63    /// Join type e.g. inner, left outer, ...
64    join_type: JoinType,
65    /// Output schema without applying `output_indices`
66    #[expect(dead_code)]
67    original_schema: Schema,
68    /// Output schema after applying `output_indices`
69    schema: Schema,
70    /// `output_indices` are the indices of the columns that we needed.
71    output_indices: Vec<usize>,
72    /// Left child executor
73    probe_side_source: BoxedExecutor,
74    /// Right child executor
75    build_side_source: BoxedExecutor,
76    /// Column indices of left keys in equi join
77    probe_key_idxs: Vec<usize>,
78    /// Column indices of right keys in equi join
79    build_key_idxs: Vec<usize>,
80    /// Non-equi join condition (optional)
81    cond: Option<Arc<BoxedExpression>>,
82    /// Whether or not to enable 'IS NOT DISTINCT FROM' semantics for a specific probe/build key
83    /// column
84    null_matched: Vec<bool>,
85    identity: String,
86    chunk_size: usize,
87    /// Whether the join is an as-of join
88    asof_desc: Option<AsOfDesc>,
89
90    spill_backend: Option<SpillBackend>,
91    spill_metrics: Arc<BatchSpillMetrics>,
92    /// The upper bound of memory usage for this executor.
93    memory_upper_bound: Option<u64>,
94
95    shutdown_rx: ShutdownToken,
96
97    mem_ctx: MemoryContext,
98    _phantom: PhantomData<K>,
99}
100
101impl<K: HashKey> Executor for HashJoinExecutor<K> {
102    fn schema(&self) -> &Schema {
103        &self.schema
104    }
105
106    fn identity(&self) -> &str {
107        &self.identity
108    }
109
110    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
111        self.do_execute()
112    }
113}
114
115/// In `JoinHashMap`, we only save the row id of the first build row that has the hash key.
116/// In fact, in the build side there may be multiple rows with the same hash key. To handle this
117/// case, we use `ChunkedData` to link them together. For example:
118///
119/// | id | key | row |
120/// | --- | --- | --- |
121/// | 0 | 1 | (1, 2, 3) |
122/// | 1 | 4 | (4, 5, 6) |
123/// | 2 | 1 | (1, 3, 7) |
124/// | 3 | 1 | (1, 3, 2) |
125/// | 4 | 3 | (3, 2, 1) |
126///
127/// The corresponding join hash map is:
128///
129/// | key | value |
130/// | --- | --- |
131/// | 1 | 0 |
132/// | 4 | 1 |
133/// | 3 | 4 |
134///
135/// And we save build rows with the same key like this:
136///
137/// | id | value |
138/// | --- | --- |
139/// | 0 | 2 |
140/// | 1 | None |
141/// | 2 | 3 |
142/// | 3 | None |
143/// | 4 | None |
144///
145/// This can be seen as an implicit linked list. For convenience, we use `RowIdIter` to iterate all
146/// build side row ids with the given key.
147pub type JoinHashMap<K> =
148    hashbrown::HashMap<K, RowId, PrecomputedBuildHasher, MonitoredGlobalAlloc>;
149
150struct RowIdIter<'a> {
151    current_row_id: Option<RowId>,
152    next_row_id: &'a ChunkedData<Option<RowId>>,
153}
154
155impl ChunkedData<Option<RowId>> {
156    fn row_id_iter(&self, begin: Option<RowId>) -> RowIdIter<'_> {
157        RowIdIter {
158            current_row_id: begin,
159            next_row_id: self,
160        }
161    }
162}
163
164impl Iterator for RowIdIter<'_> {
165    type Item = RowId;
166
167    fn next(&mut self) -> Option<Self::Item> {
168        self.current_row_id.inspect(|row_id| {
169            self.current_row_id = self.next_row_id[*row_id];
170        })
171    }
172}
173
174pub struct EquiJoinParams<K> {
175    probe_side: BoxedExecutor,
176    probe_data_types: Vec<DataType>,
177    probe_key_idxs: Vec<usize>,
178    build_side: Vec<DataChunk, MonitoredGlobalAlloc>,
179    build_data_types: Vec<DataType>,
180    full_data_types: Vec<DataType>,
181    hash_map: JoinHashMap<K>,
182    next_build_row_with_same_key: ChunkedData<Option<RowId>>,
183    chunk_size: usize,
184    shutdown_rx: ShutdownToken,
185    asof_desc: Option<AsOfDesc>,
186}
187
188impl<K> EquiJoinParams<K> {
189    #[allow(clippy::too_many_arguments)]
190    pub(super) fn new(
191        probe_side: BoxedExecutor,
192        probe_data_types: Vec<DataType>,
193        probe_key_idxs: Vec<usize>,
194        build_side: Vec<DataChunk, MonitoredGlobalAlloc>,
195        build_data_types: Vec<DataType>,
196        full_data_types: Vec<DataType>,
197        hash_map: JoinHashMap<K>,
198        next_build_row_with_same_key: ChunkedData<Option<RowId>>,
199        chunk_size: usize,
200        shutdown_rx: ShutdownToken,
201        asof_desc: Option<AsOfDesc>,
202    ) -> Self {
203        Self {
204            probe_side,
205            probe_data_types,
206            probe_key_idxs,
207            build_side,
208            build_data_types,
209            full_data_types,
210            hash_map,
211            next_build_row_with_same_key,
212            chunk_size,
213            shutdown_rx,
214            asof_desc,
215        }
216    }
217
218    pub(crate) fn is_asof_join(&self) -> bool {
219        self.asof_desc.is_some()
220    }
221}
222
223/// State variables used in left outer/semi/anti join and full outer join.
224#[derive(Default)]
225struct LeftNonEquiJoinState {
226    /// The number of columns in probe side.
227    probe_column_count: usize,
228    /// The offset of the first output row in **current** chunk for each probe side row that has
229    /// been processed.
230    first_output_row_id: Vec<usize>,
231    /// Whether the probe row being processed currently has output rows in **next** output chunk.
232    has_more_output_rows: bool,
233    /// Whether the probe row being processed currently has matched non-NULL build rows in **last**
234    /// output chunk.
235    found_matched: bool,
236}
237
238/// State variables used in right outer/semi/anti join and full outer join.
239#[derive(Default)]
240struct RightNonEquiJoinState {
241    /// Corresponding build row id for each row in **current** output chunk.
242    build_row_ids: Vec<RowId>,
243    /// Whether a build row has been matched.
244    build_row_matched: ChunkedData<bool>,
245}
246
247pub struct JoinSpillManager {
248    op: SpillOp,
249    partition_num: usize,
250    probe_side_writers: Vec<opendal::Writer>,
251    build_side_writers: Vec<opendal::Writer>,
252    probe_side_chunk_builders: Vec<DataChunkBuilder>,
253    build_side_chunk_builders: Vec<DataChunkBuilder>,
254    spill_build_hasher: SpillBuildHasher,
255    probe_side_data_types: Vec<DataType>,
256    build_side_data_types: Vec<DataType>,
257    spill_chunk_size: usize,
258    spill_metrics: Arc<BatchSpillMetrics>,
259}
260
261/// `JoinSpillManager` is used to manage how to write spill data file and read them back.
262/// The spill data first need to be partitioned. Each partition contains 2 files: `join_probe_side_file` and `join_build_side_file`.
263/// The spill file consume a data chunk and serialize the chunk into a protobuf bytes.
264/// Finally, spill file content will look like the below.
265/// The file write pattern is append-only and the read pattern is sequential scan.
266/// This can maximize the disk IO performance.
267///
268/// ```text
269/// [proto_len]
270/// [proto_bytes]
271/// ...
272/// [proto_len]
273/// [proto_bytes]
274/// ```
275impl JoinSpillManager {
276    pub fn new(
277        spill_backend: SpillBackend,
278        join_identity: &String,
279        partition_num: usize,
280        probe_side_data_types: Vec<DataType>,
281        build_side_data_types: Vec<DataType>,
282        spill_chunk_size: usize,
283        spill_metrics: Arc<BatchSpillMetrics>,
284    ) -> Result<Self> {
285        let suffix_uuid = uuid::Uuid::new_v4();
286        let dir = format!("{}-{}/", join_identity, suffix_uuid);
287        let op = SpillOp::create(dir, spill_backend)?;
288        let probe_side_writers = Vec::with_capacity(partition_num);
289        let build_side_writers = Vec::with_capacity(partition_num);
290        let probe_side_chunk_builders = Vec::with_capacity(partition_num);
291        let build_side_chunk_builders = Vec::with_capacity(partition_num);
292        let spill_build_hasher = SpillBuildHasher(suffix_uuid.as_u64_pair().1);
293        Ok(Self {
294            op,
295            partition_num,
296            probe_side_writers,
297            build_side_writers,
298            probe_side_chunk_builders,
299            build_side_chunk_builders,
300            spill_build_hasher,
301            probe_side_data_types,
302            build_side_data_types,
303            spill_chunk_size,
304            spill_metrics,
305        })
306    }
307
308    pub async fn init_writers(&mut self) -> Result<()> {
309        for i in 0..self.partition_num {
310            let join_probe_side_partition_file_name = format!("join-probe-side-p{}", i);
311            let w = self
312                .op
313                .writer_with(&join_probe_side_partition_file_name)
314                .await?;
315            self.probe_side_writers.push(w);
316
317            let join_build_side_partition_file_name = format!("join-build-side-p{}", i);
318            let w = self
319                .op
320                .writer_with(&join_build_side_partition_file_name)
321                .await?;
322            self.build_side_writers.push(w);
323            self.probe_side_chunk_builders.push(DataChunkBuilder::new(
324                self.probe_side_data_types.clone(),
325                self.spill_chunk_size,
326            ));
327            self.build_side_chunk_builders.push(DataChunkBuilder::new(
328                self.build_side_data_types.clone(),
329                self.spill_chunk_size,
330            ));
331        }
332        Ok(())
333    }
334
335    pub async fn write_probe_side_chunk(
336        &mut self,
337        chunk: DataChunk,
338        hash_codes: Vec<u64>,
339    ) -> Result<()> {
340        let (columns, vis) = chunk.into_parts_v2();
341        for partition in 0..self.partition_num {
342            let new_vis = vis.clone()
343                & Bitmap::from_iter(
344                    hash_codes
345                        .iter()
346                        .map(|hash_code| (*hash_code as usize % self.partition_num) == partition),
347                );
348            let new_chunk = DataChunk::from_parts(columns.clone(), new_vis);
349            for output_chunk in self.probe_side_chunk_builders[partition].append_chunk(new_chunk) {
350                let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
351                let buf = Message::encode_to_vec(&chunk_pb);
352                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
353                self.spill_metrics
354                    .batch_spill_write_bytes
355                    .inc_by((buf.len() + len_bytes.len()) as u64);
356                self.probe_side_writers[partition].write(len_bytes).await?;
357                self.probe_side_writers[partition].write(buf).await?;
358            }
359        }
360        Ok(())
361    }
362
363    pub async fn write_build_side_chunk(
364        &mut self,
365        chunk: DataChunk,
366        hash_codes: Vec<u64>,
367    ) -> Result<()> {
368        let (columns, vis) = chunk.into_parts_v2();
369        for partition in 0..self.partition_num {
370            let new_vis = vis.clone()
371                & Bitmap::from_iter(
372                    hash_codes
373                        .iter()
374                        .map(|hash_code| (*hash_code as usize % self.partition_num) == partition),
375                );
376            let new_chunk = DataChunk::from_parts(columns.clone(), new_vis);
377            for output_chunk in self.build_side_chunk_builders[partition].append_chunk(new_chunk) {
378                let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
379                let buf = Message::encode_to_vec(&chunk_pb);
380                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
381                self.spill_metrics
382                    .batch_spill_write_bytes
383                    .inc_by((buf.len() + len_bytes.len()) as u64);
384                self.build_side_writers[partition].write(len_bytes).await?;
385                self.build_side_writers[partition].write(buf).await?;
386            }
387        }
388        Ok(())
389    }
390
391    pub async fn close_writers(&mut self) -> Result<()> {
392        for partition in 0..self.partition_num {
393            if let Some(output_chunk) = self.probe_side_chunk_builders[partition].consume_all() {
394                let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
395                let buf = Message::encode_to_vec(&chunk_pb);
396                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
397                self.spill_metrics
398                    .batch_spill_write_bytes
399                    .inc_by((buf.len() + len_bytes.len()) as u64);
400                self.probe_side_writers[partition].write(len_bytes).await?;
401                self.probe_side_writers[partition].write(buf).await?;
402            }
403
404            if let Some(output_chunk) = self.build_side_chunk_builders[partition].consume_all() {
405                let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
406                let buf = Message::encode_to_vec(&chunk_pb);
407                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
408                self.spill_metrics
409                    .batch_spill_write_bytes
410                    .inc_by((buf.len() + len_bytes.len()) as u64);
411                self.build_side_writers[partition].write(len_bytes).await?;
412                self.build_side_writers[partition].write(buf).await?;
413            }
414        }
415
416        for mut w in self.probe_side_writers.drain(..) {
417            w.close().await?;
418        }
419        for mut w in self.build_side_writers.drain(..) {
420            w.close().await?;
421        }
422        Ok(())
423    }
424
425    async fn read_probe_side_partition(
426        &mut self,
427        partition: usize,
428    ) -> Result<BoxedDataChunkStream> {
429        let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
430        let r = self
431            .op
432            .reader_with(&join_probe_side_partition_file_name)
433            .await?;
434        Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
435    }
436
437    async fn read_build_side_partition(
438        &mut self,
439        partition: usize,
440    ) -> Result<BoxedDataChunkStream> {
441        let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
442        let r = self
443            .op
444            .reader_with(&join_build_side_partition_file_name)
445            .await?;
446        Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
447    }
448
449    pub async fn estimate_partition_size(&self, partition: usize) -> Result<u64> {
450        let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
451        let probe_size = self
452            .op
453            .stat(&join_probe_side_partition_file_name)
454            .await?
455            .content_length();
456        let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
457        let build_size = self
458            .op
459            .stat(&join_build_side_partition_file_name)
460            .await?
461            .content_length();
462        Ok(probe_size + build_size)
463    }
464
465    async fn clear_partition(&mut self, partition: usize) -> Result<()> {
466        let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
467        self.op.delete(&join_probe_side_partition_file_name).await?;
468        let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
469        self.op.delete(&join_build_side_partition_file_name).await?;
470        Ok(())
471    }
472}
473
474impl<K: HashKey> HashJoinExecutor<K> {
475    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
476    async fn do_execute(self: Box<Self>) {
477        let mut need_to_spill = false;
478        // If the memory upper bound is less than 1MB, we don't need to check memory usage.
479        let check_memory = match self.memory_upper_bound {
480            Some(upper_bound) => upper_bound > SPILL_AT_LEAST_MEMORY,
481            None => true,
482        };
483
484        let probe_schema = self.probe_side_source.schema().clone();
485        let build_schema = self.build_side_source.schema().clone();
486        let probe_data_types = self.probe_side_source.schema().data_types();
487        let build_data_types = self.build_side_source.schema().data_types();
488        let full_data_types = [probe_data_types.clone(), build_data_types.clone()].concat();
489
490        let mut build_side = Vec::new_in(self.mem_ctx.global_allocator());
491        let mut build_row_count = 0;
492        let mut build_side_stream = self.build_side_source.execute();
493        #[for_await]
494        for build_chunk in &mut build_side_stream {
495            let build_chunk = build_chunk?;
496            if build_chunk.cardinality() > 0 {
497                build_row_count += build_chunk.cardinality();
498                let chunk_estimated_heap_size = build_chunk.estimated_heap_size();
499                // push build_chunk to build_side before checking memory limit, otherwise we will lose that chunk when spilling.
500                build_side.push(build_chunk);
501                if !self.mem_ctx.add(chunk_estimated_heap_size as i64) && check_memory {
502                    if self.spill_backend.is_some() {
503                        need_to_spill = true;
504                        break;
505                    } else {
506                        Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
507                    }
508                }
509            }
510        }
511        let mut hash_map = JoinHashMap::with_capacity_and_hasher_in(
512            build_row_count,
513            PrecomputedBuildHasher,
514            self.mem_ctx.global_allocator(),
515        );
516        let mut next_build_row_with_same_key =
517            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
518
519        let null_matched = K::Bitmap::from_bool_vec(self.null_matched.clone());
520
521        let mut mem_added_by_hash_table = 0;
522        if !need_to_spill {
523            // Build hash map
524            for (build_chunk_id, build_chunk) in build_side.iter().enumerate() {
525                let build_keys = K::build_many(&self.build_key_idxs, build_chunk);
526
527                for (build_row_id, build_key) in build_keys
528                    .into_iter()
529                    .enumerate()
530                    .filter_by_bitmap(build_chunk.visibility())
531                {
532                    self.shutdown_rx.check()?;
533                    // Only insert key to hash map if it is consistent with the null safe restriction.
534                    if build_key.null_bitmap().is_subset(&null_matched) {
535                        let row_id = RowId::new(build_chunk_id, build_row_id);
536                        let build_key_size = build_key.estimated_heap_size() as i64;
537                        mem_added_by_hash_table += build_key_size;
538                        if !self.mem_ctx.add(build_key_size) && check_memory {
539                            if self.spill_backend.is_some() {
540                                need_to_spill = true;
541                                break;
542                            } else {
543                                Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
544                            }
545                        }
546                        next_build_row_with_same_key[row_id] = hash_map.insert(build_key, row_id);
547                    }
548                }
549            }
550        }
551
552        if need_to_spill {
553            // A spilling version of hash join based on the RFC: Spill Hash Join https://github.com/risingwavelabs/rfcs/pull/91
554            // When HashJoinExecutor told memory is insufficient, JoinSpillManager will start to partition the hash table and spill to disk.
555            // After spilling the hash table, JoinSpillManager will consume all chunks from its build side input executor and probe side input executor.
556            // Finally, we would get e.g. 20 partitions. Each partition should contain a portion of the original build side input and probr side input data.
557            // A sub HashJoinExecutor would be used to consume each partition one by one.
558            // If memory is still not enough in the sub HashJoinExecutor, it will spill its inputs recursively.
559            info!(
560                "batch hash join executor {} starts to spill out",
561                &self.identity
562            );
563            let mut join_spill_manager = JoinSpillManager::new(
564                self.spill_backend.clone().unwrap(),
565                &self.identity,
566                DEFAULT_SPILL_PARTITION_NUM,
567                probe_data_types.clone(),
568                build_data_types.clone(),
569                self.chunk_size,
570                self.spill_metrics.clone(),
571            )?;
572            join_spill_manager.init_writers().await?;
573
574            // Release memory occupied by the hash map
575            self.mem_ctx.add(-mem_added_by_hash_table);
576            drop(hash_map);
577            drop(next_build_row_with_same_key);
578
579            // Spill buffered build side chunks
580            for chunk in build_side {
581                // Release the memory occupied by the buffered chunks
582                self.mem_ctx.add(-(chunk.estimated_heap_size() as i64));
583                let hash_codes = chunk.get_hash_values(
584                    self.build_key_idxs.as_slice(),
585                    join_spill_manager.spill_build_hasher,
586                );
587                join_spill_manager
588                    .write_build_side_chunk(
589                        chunk,
590                        hash_codes
591                            .into_iter()
592                            .map(|hash_code| hash_code.value())
593                            .collect(),
594                    )
595                    .await?;
596            }
597
598            // Spill build side chunks
599            #[for_await]
600            for chunk in build_side_stream {
601                let chunk = chunk?;
602                let hash_codes = chunk.get_hash_values(
603                    self.build_key_idxs.as_slice(),
604                    join_spill_manager.spill_build_hasher,
605                );
606                join_spill_manager
607                    .write_build_side_chunk(
608                        chunk,
609                        hash_codes
610                            .into_iter()
611                            .map(|hash_code| hash_code.value())
612                            .collect(),
613                    )
614                    .await?;
615            }
616
617            // Spill probe side chunks
618            #[for_await]
619            for chunk in self.probe_side_source.execute() {
620                let chunk = chunk?;
621                let hash_codes = chunk.get_hash_values(
622                    self.probe_key_idxs.as_slice(),
623                    join_spill_manager.spill_build_hasher,
624                );
625                join_spill_manager
626                    .write_probe_side_chunk(
627                        chunk,
628                        hash_codes
629                            .into_iter()
630                            .map(|hash_code| hash_code.value())
631                            .collect(),
632                    )
633                    .await?;
634            }
635
636            join_spill_manager.close_writers().await?;
637
638            // Process each partition one by one.
639            for i in 0..join_spill_manager.partition_num {
640                let partition_size = join_spill_manager.estimate_partition_size(i).await?;
641                let probe_side_stream = join_spill_manager.read_probe_side_partition(i).await?;
642                let build_side_stream = join_spill_manager.read_build_side_partition(i).await?;
643
644                let sub_hash_join_executor: HashJoinExecutor<K> = HashJoinExecutor::new_inner(
645                    self.join_type,
646                    self.output_indices.clone(),
647                    Box::new(WrapStreamExecutor::new(
648                        probe_schema.clone(),
649                        probe_side_stream,
650                    )),
651                    Box::new(WrapStreamExecutor::new(
652                        build_schema.clone(),
653                        build_side_stream,
654                    )),
655                    self.probe_key_idxs.clone(),
656                    self.build_key_idxs.clone(),
657                    self.null_matched.clone(),
658                    self.cond.clone(),
659                    format!("{}-sub{}", self.identity.clone(), i),
660                    self.chunk_size,
661                    self.asof_desc.clone(),
662                    self.spill_backend.clone(),
663                    self.spill_metrics.clone(),
664                    Some(partition_size),
665                    self.shutdown_rx.clone(),
666                    self.mem_ctx.clone(),
667                );
668
669                debug!(
670                    "create sub_hash_join {} for hash_join {} to spill",
671                    sub_hash_join_executor.identity, self.identity
672                );
673
674                let sub_hash_join_executor = Box::new(sub_hash_join_executor).execute();
675
676                #[for_await]
677                for chunk in sub_hash_join_executor {
678                    let chunk = chunk?;
679                    yield chunk;
680                }
681
682                // Clear files of the current partition.
683                join_spill_manager.clear_partition(i).await?;
684            }
685        } else {
686            let params = EquiJoinParams::new(
687                self.probe_side_source,
688                probe_data_types,
689                self.probe_key_idxs,
690                build_side,
691                build_data_types,
692                full_data_types,
693                hash_map,
694                next_build_row_with_same_key,
695                self.chunk_size,
696                self.shutdown_rx.clone(),
697                self.asof_desc,
698            );
699
700            if let Some(cond) = self.cond.as_ref()
701                && !params.is_asof_join()
702            {
703                let stream = match self.join_type {
704                    JoinType::Inner => Self::do_inner_join_with_non_equi_condition(params, cond),
705                    JoinType::LeftOuter => {
706                        Self::do_left_outer_join_with_non_equi_condition(params, cond)
707                    }
708                    JoinType::LeftSemi => {
709                        Self::do_left_semi_join_with_non_equi_condition(params, cond)
710                    }
711                    JoinType::LeftAnti => {
712                        Self::do_left_anti_join_with_non_equi_condition(params, cond)
713                    }
714                    JoinType::RightOuter => {
715                        Self::do_right_outer_join_with_non_equi_condition(params, cond)
716                    }
717                    JoinType::RightSemi => {
718                        Self::do_right_semi_anti_join_with_non_equi_condition::<false>(params, cond)
719                    }
720                    JoinType::RightAnti => {
721                        Self::do_right_semi_anti_join_with_non_equi_condition::<true>(params, cond)
722                    }
723                    JoinType::FullOuter => {
724                        Self::do_full_outer_join_with_non_equi_condition(params, cond)
725                    }
726                    JoinType::AsOfInner | JoinType::AsOfLeftOuter => {
727                        unreachable!("AsOf join should not reach here")
728                    }
729                };
730                // For non-equi join, we need an output chunk builder to align the output chunks.
731                let mut output_chunk_builder =
732                    DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
733                #[for_await]
734                for chunk in stream {
735                    for output_chunk in
736                        output_chunk_builder.append_chunk(chunk?.project(&self.output_indices))
737                    {
738                        yield output_chunk
739                    }
740                }
741                if let Some(output_chunk) = output_chunk_builder.consume_all() {
742                    yield output_chunk
743                }
744            } else {
745                let stream = match self.join_type {
746                    JoinType::Inner | JoinType::AsOfInner => Self::do_inner_join(params),
747                    JoinType::LeftOuter | JoinType::AsOfLeftOuter => {
748                        Self::do_left_outer_join(params)
749                    }
750                    JoinType::LeftSemi => Self::do_left_semi_anti_join::<false>(params),
751                    JoinType::LeftAnti => Self::do_left_semi_anti_join::<true>(params),
752                    JoinType::RightOuter => Self::do_right_outer_join(params),
753                    JoinType::RightSemi => Self::do_right_semi_anti_join::<false>(params),
754                    JoinType::RightAnti => Self::do_right_semi_anti_join::<true>(params),
755                    JoinType::FullOuter => Self::do_full_outer_join(params),
756                };
757                #[for_await]
758                for chunk in stream {
759                    yield chunk?.project(&self.output_indices)
760                }
761            }
762        }
763    }
764
765    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
766    pub async fn do_inner_join(
767        EquiJoinParams {
768            probe_side,
769            probe_key_idxs,
770            build_side,
771            full_data_types,
772            hash_map,
773            next_build_row_with_same_key,
774            chunk_size,
775            shutdown_rx,
776            asof_desc,
777            ..
778        }: EquiJoinParams<K>,
779    ) {
780        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
781        #[for_await]
782        for probe_chunk in probe_side.execute() {
783            let probe_chunk = probe_chunk?;
784            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
785            for (probe_row_id, probe_key) in probe_keys
786                .iter()
787                .enumerate()
788                .filter_by_bitmap(probe_chunk.visibility())
789            {
790                let build_side_row_iter =
791                    next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied());
792                if let Some(asof_desc) = &asof_desc {
793                    if let Some(build_row_id) = Self::find_asof_matched_rows(
794                        probe_chunk.row_at_unchecked_vis(probe_row_id),
795                        &build_side,
796                        build_side_row_iter,
797                        asof_desc,
798                    ) {
799                        shutdown_rx.check()?;
800                        if let Some(spilled) = Self::append_one_row(
801                            &mut chunk_builder,
802                            &probe_chunk,
803                            probe_row_id,
804                            &build_side[build_row_id.chunk_id()],
805                            build_row_id.row_id(),
806                        ) {
807                            yield spilled
808                        }
809                    }
810                } else {
811                    for build_row_id in build_side_row_iter {
812                        shutdown_rx.check()?;
813                        let build_chunk = &build_side[build_row_id.chunk_id()];
814                        if let Some(spilled) = Self::append_one_row(
815                            &mut chunk_builder,
816                            &probe_chunk,
817                            probe_row_id,
818                            build_chunk,
819                            build_row_id.row_id(),
820                        ) {
821                            yield spilled
822                        }
823                    }
824                }
825            }
826        }
827        if let Some(spilled) = chunk_builder.consume_all() {
828            yield spilled
829        }
830    }
831
832    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
833    pub async fn do_inner_join_with_non_equi_condition(
834        params: EquiJoinParams<K>,
835        cond: &BoxedExpression,
836    ) {
837        #[for_await]
838        for chunk in Self::do_inner_join(params) {
839            let mut chunk = chunk?;
840            chunk.set_visibility(cond.eval(&chunk).await?.as_bool().iter().collect());
841            yield chunk
842        }
843    }
844
845    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
846    pub async fn do_left_outer_join(
847        EquiJoinParams {
848            probe_side,
849            probe_key_idxs,
850            build_side,
851            build_data_types,
852            full_data_types,
853            hash_map,
854            next_build_row_with_same_key,
855            chunk_size,
856            shutdown_rx,
857            asof_desc,
858            ..
859        }: EquiJoinParams<K>,
860    ) {
861        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
862        #[for_await]
863        for probe_chunk in probe_side.execute() {
864            let probe_chunk = probe_chunk?;
865            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
866            for (probe_row_id, probe_key) in probe_keys
867                .iter()
868                .enumerate()
869                .filter_by_bitmap(probe_chunk.visibility())
870            {
871                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
872                    let build_side_row_iter =
873                        next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id));
874                    if let Some(asof_desc) = &asof_desc {
875                        if let Some(build_row_id) = Self::find_asof_matched_rows(
876                            probe_chunk.row_at_unchecked_vis(probe_row_id),
877                            &build_side,
878                            build_side_row_iter,
879                            asof_desc,
880                        ) {
881                            shutdown_rx.check()?;
882                            if let Some(spilled) = Self::append_one_row(
883                                &mut chunk_builder,
884                                &probe_chunk,
885                                probe_row_id,
886                                &build_side[build_row_id.chunk_id()],
887                                build_row_id.row_id(),
888                            ) {
889                                yield spilled
890                            }
891                        } else {
892                            shutdown_rx.check()?;
893                            let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
894                            if let Some(spilled) = Self::append_one_row_with_null_build_side(
895                                &mut chunk_builder,
896                                probe_row,
897                                build_data_types.len(),
898                            ) {
899                                yield spilled
900                            }
901                        }
902                    } else {
903                        for build_row_id in build_side_row_iter {
904                            shutdown_rx.check()?;
905                            let build_chunk = &build_side[build_row_id.chunk_id()];
906                            if let Some(spilled) = Self::append_one_row(
907                                &mut chunk_builder,
908                                &probe_chunk,
909                                probe_row_id,
910                                build_chunk,
911                                build_row_id.row_id(),
912                            ) {
913                                yield spilled
914                            }
915                        }
916                    }
917                } else {
918                    shutdown_rx.check()?;
919                    let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
920                    if let Some(spilled) = Self::append_one_row_with_null_build_side(
921                        &mut chunk_builder,
922                        probe_row,
923                        build_data_types.len(),
924                    ) {
925                        yield spilled
926                    }
927                }
928            }
929        }
930        if let Some(spilled) = chunk_builder.consume_all() {
931            yield spilled
932        }
933    }
934
935    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
936    pub async fn do_left_outer_join_with_non_equi_condition(
937        EquiJoinParams {
938            probe_side,
939            probe_data_types,
940            probe_key_idxs,
941            build_side,
942            build_data_types,
943            full_data_types,
944            hash_map,
945            next_build_row_with_same_key,
946            chunk_size,
947            shutdown_rx,
948            ..
949        }: EquiJoinParams<K>,
950        cond: &BoxedExpression,
951    ) {
952        let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size);
953        let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
954        let mut non_equi_state = LeftNonEquiJoinState {
955            probe_column_count: probe_data_types.len(),
956            ..Default::default()
957        };
958
959        #[for_await]
960        for probe_chunk in probe_side.execute() {
961            let probe_chunk = probe_chunk?;
962            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
963            for (probe_row_id, probe_key) in probe_keys
964                .iter()
965                .enumerate()
966                .filter_by_bitmap(probe_chunk.visibility())
967            {
968                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
969                    non_equi_state
970                        .first_output_row_id
971                        .push(chunk_builder.buffered_count());
972
973                    let mut build_row_id_iter = next_build_row_with_same_key
974                        .row_id_iter(Some(*first_matched_build_row_id))
975                        .peekable();
976                    while let Some(build_row_id) = build_row_id_iter.next() {
977                        shutdown_rx.check()?;
978                        let build_chunk = &build_side[build_row_id.chunk_id()];
979                        if let Some(spilled) = Self::append_one_row(
980                            &mut chunk_builder,
981                            &probe_chunk,
982                            probe_row_id,
983                            build_chunk,
984                            build_row_id.row_id(),
985                        ) {
986                            non_equi_state.has_more_output_rows =
987                                build_row_id_iter.peek().is_some();
988                            yield Self::process_left_outer_join_non_equi_condition(
989                                spilled,
990                                cond.as_ref(),
991                                &mut non_equi_state,
992                            )
993                            .await?
994                        }
995                    }
996                } else {
997                    shutdown_rx.check()?;
998                    let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
999                    if let Some(spilled) = Self::append_one_row_with_null_build_side(
1000                        &mut remaining_chunk_builder,
1001                        probe_row,
1002                        build_data_types.len(),
1003                    ) {
1004                        yield spilled
1005                    }
1006                }
1007            }
1008        }
1009        non_equi_state.has_more_output_rows = false;
1010        if let Some(spilled) = chunk_builder.consume_all() {
1011            yield Self::process_left_outer_join_non_equi_condition(
1012                spilled,
1013                cond.as_ref(),
1014                &mut non_equi_state,
1015            )
1016            .await?
1017        }
1018
1019        if let Some(spilled) = remaining_chunk_builder.consume_all() {
1020            yield spilled
1021        }
1022    }
1023
1024    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1025    pub async fn do_left_semi_anti_join<const ANTI_JOIN: bool>(
1026        EquiJoinParams {
1027            probe_side,
1028            probe_data_types,
1029            probe_key_idxs,
1030            hash_map,
1031            chunk_size,
1032            shutdown_rx,
1033            ..
1034        }: EquiJoinParams<K>,
1035    ) {
1036        let mut chunk_builder = DataChunkBuilder::new(probe_data_types, chunk_size);
1037        #[for_await]
1038        for probe_chunk in probe_side.execute() {
1039            let probe_chunk = probe_chunk?;
1040            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1041            for (probe_row_id, probe_key) in probe_keys
1042                .iter()
1043                .enumerate()
1044                .filter_by_bitmap(probe_chunk.visibility())
1045            {
1046                shutdown_rx.check()?;
1047                if !ANTI_JOIN {
1048                    if hash_map.contains_key(probe_key)
1049                        && let Some(spilled) = Self::append_one_probe_row(
1050                            &mut chunk_builder,
1051                            &probe_chunk,
1052                            probe_row_id,
1053                        )
1054                    {
1055                        yield spilled
1056                    }
1057                } else if hash_map.get(probe_key).is_none()
1058                    && let Some(spilled) =
1059                        Self::append_one_probe_row(&mut chunk_builder, &probe_chunk, probe_row_id)
1060                {
1061                    yield spilled
1062                }
1063            }
1064        }
1065        if let Some(spilled) = chunk_builder.consume_all() {
1066            yield spilled
1067        }
1068    }
1069
1070    /// High-level idea:
1071    /// 1. For each `probe_row`, append candidate rows to buffer.
1072    ///    Candidate rows: Those satisfying `equi_predicate` (==).
1073    /// 2. If buffer becomes full, process it.
1074    ///    Apply `non_equi_join` predicates e.g. `>=`, `<=` to filter rows.
1075    ///    Track if `probe_row` is matched to avoid duplicates.
1076    /// 3. If we matched `probe_row` in spilled chunk,
1077    ///    stop appending its candidate rows,
1078    ///    to avoid matching it again in next spilled chunk.
1079    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1080    pub async fn do_left_semi_join_with_non_equi_condition<'a>(
1081        EquiJoinParams {
1082            probe_side,
1083            probe_key_idxs,
1084            build_side,
1085            full_data_types,
1086            hash_map,
1087            next_build_row_with_same_key,
1088            chunk_size,
1089            shutdown_rx,
1090            ..
1091        }: EquiJoinParams<K>,
1092        cond: &'a BoxedExpression,
1093    ) {
1094        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1095        let mut non_equi_state = LeftNonEquiJoinState::default();
1096
1097        #[for_await]
1098        for probe_chunk in probe_side.execute() {
1099            let probe_chunk = probe_chunk?;
1100            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1101            for (probe_row_id, probe_key) in probe_keys
1102                .iter()
1103                .enumerate()
1104                .filter_by_bitmap(probe_chunk.visibility())
1105            {
1106                non_equi_state.found_matched = false;
1107                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1108                    non_equi_state
1109                        .first_output_row_id
1110                        .push(chunk_builder.buffered_count());
1111
1112                    for build_row_id in
1113                        next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id))
1114                    {
1115                        shutdown_rx.check()?;
1116                        if non_equi_state.found_matched {
1117                            break;
1118                        }
1119                        let build_chunk = &build_side[build_row_id.chunk_id()];
1120                        if let Some(spilled) = Self::append_one_row(
1121                            &mut chunk_builder,
1122                            &probe_chunk,
1123                            probe_row_id,
1124                            build_chunk,
1125                            build_row_id.row_id(),
1126                        ) {
1127                            yield Self::process_left_semi_anti_join_non_equi_condition::<false>(
1128                                spilled,
1129                                cond.as_ref(),
1130                                &mut non_equi_state,
1131                            )
1132                            .await?
1133                        }
1134                    }
1135                }
1136            }
1137        }
1138
1139        // Process remaining rows in buffer
1140        if let Some(spilled) = chunk_builder.consume_all() {
1141            yield Self::process_left_semi_anti_join_non_equi_condition::<false>(
1142                spilled,
1143                cond.as_ref(),
1144                &mut non_equi_state,
1145            )
1146            .await?
1147        }
1148    }
1149
1150    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1151    pub async fn do_left_anti_join_with_non_equi_condition(
1152        EquiJoinParams {
1153            probe_side,
1154            probe_data_types,
1155            probe_key_idxs,
1156            build_side,
1157            full_data_types,
1158            hash_map,
1159            next_build_row_with_same_key,
1160            chunk_size,
1161            shutdown_rx,
1162            ..
1163        }: EquiJoinParams<K>,
1164        cond: &BoxedExpression,
1165    ) {
1166        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1167        let mut remaining_chunk_builder = DataChunkBuilder::new(probe_data_types, chunk_size);
1168        let mut non_equi_state = LeftNonEquiJoinState::default();
1169
1170        #[for_await]
1171        for probe_chunk in probe_side.execute() {
1172            let probe_chunk = probe_chunk?;
1173            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1174            for (probe_row_id, probe_key) in probe_keys
1175                .iter()
1176                .enumerate()
1177                .filter_by_bitmap(probe_chunk.visibility())
1178            {
1179                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1180                    non_equi_state
1181                        .first_output_row_id
1182                        .push(chunk_builder.buffered_count());
1183                    let mut build_row_id_iter = next_build_row_with_same_key
1184                        .row_id_iter(Some(*first_matched_build_row_id))
1185                        .peekable();
1186                    while let Some(build_row_id) = build_row_id_iter.next() {
1187                        shutdown_rx.check()?;
1188                        let build_chunk = &build_side[build_row_id.chunk_id()];
1189                        if let Some(spilled) = Self::append_one_row(
1190                            &mut chunk_builder,
1191                            &probe_chunk,
1192                            probe_row_id,
1193                            build_chunk,
1194                            build_row_id.row_id(),
1195                        ) {
1196                            non_equi_state.has_more_output_rows =
1197                                build_row_id_iter.peek().is_some();
1198                            yield Self::process_left_semi_anti_join_non_equi_condition::<true>(
1199                                spilled,
1200                                cond.as_ref(),
1201                                &mut non_equi_state,
1202                            )
1203                            .await?
1204                        }
1205                    }
1206                } else if let Some(spilled) = Self::append_one_probe_row(
1207                    &mut remaining_chunk_builder,
1208                    &probe_chunk,
1209                    probe_row_id,
1210                ) {
1211                    yield spilled
1212                }
1213            }
1214        }
1215        non_equi_state.has_more_output_rows = false;
1216        if let Some(spilled) = chunk_builder.consume_all() {
1217            yield Self::process_left_semi_anti_join_non_equi_condition::<true>(
1218                spilled,
1219                cond.as_ref(),
1220                &mut non_equi_state,
1221            )
1222            .await?
1223        }
1224        if let Some(spilled) = remaining_chunk_builder.consume_all() {
1225            yield spilled
1226        }
1227    }
1228
1229    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1230    pub async fn do_right_outer_join(
1231        EquiJoinParams {
1232            probe_side,
1233            probe_data_types,
1234            probe_key_idxs,
1235            build_side,
1236            full_data_types,
1237            hash_map,
1238            next_build_row_with_same_key,
1239            chunk_size,
1240            shutdown_rx,
1241            ..
1242        }: EquiJoinParams<K>,
1243    ) {
1244        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1245        let mut build_row_matched =
1246            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1247
1248        #[for_await]
1249        for probe_chunk in probe_side.execute() {
1250            let probe_chunk = probe_chunk?;
1251            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1252            for (probe_row_id, probe_key) in probe_keys
1253                .iter()
1254                .enumerate()
1255                .filter_by_bitmap(probe_chunk.visibility())
1256            {
1257                for build_row_id in
1258                    next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1259                {
1260                    shutdown_rx.check()?;
1261                    build_row_matched[build_row_id] = true;
1262                    let build_chunk = &build_side[build_row_id.chunk_id()];
1263                    if let Some(spilled) = Self::append_one_row(
1264                        &mut chunk_builder,
1265                        &probe_chunk,
1266                        probe_row_id,
1267                        build_chunk,
1268                        build_row_id.row_id(),
1269                    ) {
1270                        yield spilled
1271                    }
1272                }
1273            }
1274        }
1275        #[for_await]
1276        for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1277            &mut chunk_builder,
1278            &build_side,
1279            &build_row_matched,
1280            probe_data_types.len(),
1281        ) {
1282            yield spilled?
1283        }
1284    }
1285
1286    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1287    pub async fn do_right_outer_join_with_non_equi_condition(
1288        EquiJoinParams {
1289            probe_side,
1290            probe_data_types,
1291            probe_key_idxs,
1292            build_side,
1293            full_data_types,
1294            hash_map,
1295            next_build_row_with_same_key,
1296            chunk_size,
1297            shutdown_rx,
1298            ..
1299        }: EquiJoinParams<K>,
1300        cond: &BoxedExpression,
1301    ) {
1302        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1303        let build_row_matched =
1304            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1305        let mut non_equi_state = RightNonEquiJoinState {
1306            build_row_matched,
1307            ..Default::default()
1308        };
1309
1310        #[for_await]
1311        for probe_chunk in probe_side.execute() {
1312            let probe_chunk = probe_chunk?;
1313            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1314            for (probe_row_id, probe_key) in probe_keys
1315                .iter()
1316                .enumerate()
1317                .filter_by_bitmap(probe_chunk.visibility())
1318            {
1319                for build_row_id in
1320                    next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1321                {
1322                    shutdown_rx.check()?;
1323                    non_equi_state.build_row_ids.push(build_row_id);
1324                    let build_chunk = &build_side[build_row_id.chunk_id()];
1325                    if let Some(spilled) = Self::append_one_row(
1326                        &mut chunk_builder,
1327                        &probe_chunk,
1328                        probe_row_id,
1329                        build_chunk,
1330                        build_row_id.row_id(),
1331                    ) {
1332                        yield Self::process_right_outer_join_non_equi_condition(
1333                            spilled,
1334                            cond.as_ref(),
1335                            &mut non_equi_state,
1336                        )
1337                        .await?
1338                    }
1339                }
1340            }
1341        }
1342        if let Some(spilled) = chunk_builder.consume_all() {
1343            yield Self::process_right_outer_join_non_equi_condition(
1344                spilled,
1345                cond.as_ref(),
1346                &mut non_equi_state,
1347            )
1348            .await?
1349        }
1350        #[for_await]
1351        for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1352            &mut chunk_builder,
1353            &build_side,
1354            &non_equi_state.build_row_matched,
1355            probe_data_types.len(),
1356        ) {
1357            yield spilled?
1358        }
1359    }
1360
1361    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1362    pub async fn do_right_semi_anti_join<const ANTI_JOIN: bool>(
1363        EquiJoinParams {
1364            probe_side,
1365            probe_key_idxs,
1366            build_side,
1367            build_data_types,
1368            hash_map,
1369            next_build_row_with_same_key,
1370            chunk_size,
1371            shutdown_rx,
1372            ..
1373        }: EquiJoinParams<K>,
1374    ) {
1375        let mut chunk_builder = DataChunkBuilder::new(build_data_types, chunk_size);
1376        let mut build_row_matched =
1377            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1378
1379        #[for_await]
1380        for probe_chunk in probe_side.execute() {
1381            let probe_chunk = probe_chunk?;
1382            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1383            for probe_key in probe_keys.iter().filter_by_bitmap(probe_chunk.visibility()) {
1384                for build_row_id in
1385                    next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1386                {
1387                    shutdown_rx.check()?;
1388                    build_row_matched[build_row_id] = true;
1389                }
1390            }
1391        }
1392        #[for_await]
1393        for spilled in Self::handle_remaining_build_rows_for_right_semi_anti_join::<ANTI_JOIN>(
1394            &mut chunk_builder,
1395            &build_side,
1396            &build_row_matched,
1397        ) {
1398            yield spilled?
1399        }
1400    }
1401
1402    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1403    pub async fn do_right_semi_anti_join_with_non_equi_condition<const ANTI_JOIN: bool>(
1404        EquiJoinParams {
1405            probe_side,
1406            probe_key_idxs,
1407            build_side,
1408            build_data_types,
1409            full_data_types,
1410            hash_map,
1411            next_build_row_with_same_key,
1412            chunk_size,
1413            shutdown_rx,
1414            ..
1415        }: EquiJoinParams<K>,
1416        cond: &BoxedExpression,
1417    ) {
1418        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1419        let mut remaining_chunk_builder = DataChunkBuilder::new(build_data_types, chunk_size);
1420        let build_row_matched =
1421            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1422        let mut non_equi_state = RightNonEquiJoinState {
1423            build_row_matched,
1424            ..Default::default()
1425        };
1426
1427        #[for_await]
1428        for probe_chunk in probe_side.execute() {
1429            let probe_chunk = probe_chunk?;
1430            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1431            for (probe_row_id, probe_key) in probe_keys
1432                .iter()
1433                .enumerate()
1434                .filter_by_bitmap(probe_chunk.visibility())
1435            {
1436                for build_row_id in
1437                    next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1438                {
1439                    shutdown_rx.check()?;
1440                    non_equi_state.build_row_ids.push(build_row_id);
1441                    let build_chunk = &build_side[build_row_id.chunk_id()];
1442                    if let Some(spilled) = Self::append_one_row(
1443                        &mut chunk_builder,
1444                        &probe_chunk,
1445                        probe_row_id,
1446                        build_chunk,
1447                        build_row_id.row_id(),
1448                    ) {
1449                        Self::process_right_semi_anti_join_non_equi_condition(
1450                            spilled,
1451                            cond.as_ref(),
1452                            &mut non_equi_state,
1453                        )
1454                        .await?
1455                    }
1456                }
1457            }
1458        }
1459        if let Some(spilled) = chunk_builder.consume_all() {
1460            Self::process_right_semi_anti_join_non_equi_condition(
1461                spilled,
1462                cond.as_ref(),
1463                &mut non_equi_state,
1464            )
1465            .await?
1466        }
1467        #[for_await]
1468        for spilled in Self::handle_remaining_build_rows_for_right_semi_anti_join::<ANTI_JOIN>(
1469            &mut remaining_chunk_builder,
1470            &build_side,
1471            &non_equi_state.build_row_matched,
1472        ) {
1473            yield spilled?
1474        }
1475    }
1476
1477    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1478    pub async fn do_full_outer_join(
1479        EquiJoinParams {
1480            probe_side,
1481            probe_data_types,
1482            probe_key_idxs,
1483            build_side,
1484            build_data_types,
1485            full_data_types,
1486            hash_map,
1487            next_build_row_with_same_key,
1488            chunk_size,
1489            shutdown_rx,
1490            ..
1491        }: EquiJoinParams<K>,
1492    ) {
1493        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1494        let mut build_row_matched =
1495            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1496
1497        #[for_await]
1498        for probe_chunk in probe_side.execute() {
1499            let probe_chunk = probe_chunk?;
1500            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1501            for (probe_row_id, probe_key) in probe_keys
1502                .iter()
1503                .enumerate()
1504                .filter_by_bitmap(probe_chunk.visibility())
1505            {
1506                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1507                    for build_row_id in
1508                        next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id))
1509                    {
1510                        shutdown_rx.check()?;
1511                        build_row_matched[build_row_id] = true;
1512                        let build_chunk = &build_side[build_row_id.chunk_id()];
1513                        if let Some(spilled) = Self::append_one_row(
1514                            &mut chunk_builder,
1515                            &probe_chunk,
1516                            probe_row_id,
1517                            build_chunk,
1518                            build_row_id.row_id(),
1519                        ) {
1520                            yield spilled
1521                        }
1522                    }
1523                } else {
1524                    let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
1525                    if let Some(spilled) = Self::append_one_row_with_null_build_side(
1526                        &mut chunk_builder,
1527                        probe_row,
1528                        build_data_types.len(),
1529                    ) {
1530                        yield spilled
1531                    }
1532                }
1533            }
1534        }
1535        #[for_await]
1536        for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1537            &mut chunk_builder,
1538            &build_side,
1539            &build_row_matched,
1540            probe_data_types.len(),
1541        ) {
1542            yield spilled?
1543        }
1544    }
1545
1546    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1547    pub async fn do_full_outer_join_with_non_equi_condition(
1548        EquiJoinParams {
1549            probe_side,
1550            probe_data_types,
1551            probe_key_idxs,
1552            build_side,
1553            build_data_types,
1554            full_data_types,
1555            hash_map,
1556            next_build_row_with_same_key,
1557            chunk_size,
1558            shutdown_rx,
1559            ..
1560        }: EquiJoinParams<K>,
1561        cond: &BoxedExpression,
1562    ) {
1563        let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size);
1564        let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1565        let mut left_non_equi_state = LeftNonEquiJoinState {
1566            probe_column_count: probe_data_types.len(),
1567            ..Default::default()
1568        };
1569        let build_row_matched =
1570            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1571        let mut right_non_equi_state = RightNonEquiJoinState {
1572            build_row_matched,
1573            ..Default::default()
1574        };
1575
1576        #[for_await]
1577        for probe_chunk in probe_side.execute() {
1578            let probe_chunk = probe_chunk?;
1579            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1580            for (probe_row_id, probe_key) in probe_keys
1581                .iter()
1582                .enumerate()
1583                .filter_by_bitmap(probe_chunk.visibility())
1584            {
1585                left_non_equi_state.found_matched = false;
1586                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1587                    left_non_equi_state
1588                        .first_output_row_id
1589                        .push(chunk_builder.buffered_count());
1590                    let mut build_row_id_iter = next_build_row_with_same_key
1591                        .row_id_iter(Some(*first_matched_build_row_id))
1592                        .peekable();
1593                    while let Some(build_row_id) = build_row_id_iter.next() {
1594                        shutdown_rx.check()?;
1595                        right_non_equi_state.build_row_ids.push(build_row_id);
1596                        let build_chunk = &build_side[build_row_id.chunk_id()];
1597                        if let Some(spilled) = Self::append_one_row(
1598                            &mut chunk_builder,
1599                            &probe_chunk,
1600                            probe_row_id,
1601                            build_chunk,
1602                            build_row_id.row_id(),
1603                        ) {
1604                            left_non_equi_state.has_more_output_rows =
1605                                build_row_id_iter.peek().is_some();
1606                            yield Self::process_full_outer_join_non_equi_condition(
1607                                spilled,
1608                                cond.as_ref(),
1609                                &mut left_non_equi_state,
1610                                &mut right_non_equi_state,
1611                            )
1612                            .await?
1613                        }
1614                    }
1615                } else {
1616                    shutdown_rx.check()?;
1617                    let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
1618                    if let Some(spilled) = Self::append_one_row_with_null_build_side(
1619                        &mut remaining_chunk_builder,
1620                        probe_row,
1621                        build_data_types.len(),
1622                    ) {
1623                        yield spilled
1624                    }
1625                }
1626            }
1627        }
1628        left_non_equi_state.has_more_output_rows = false;
1629        if let Some(spilled) = chunk_builder.consume_all() {
1630            yield Self::process_full_outer_join_non_equi_condition(
1631                spilled,
1632                cond.as_ref(),
1633                &mut left_non_equi_state,
1634                &mut right_non_equi_state,
1635            )
1636            .await?
1637        }
1638        #[for_await]
1639        for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1640            &mut remaining_chunk_builder,
1641            &build_side,
1642            &right_non_equi_state.build_row_matched,
1643            probe_data_types.len(),
1644        ) {
1645            yield spilled?
1646        }
1647    }
1648
1649    /// Process output chunk for left outer join when non-equi condition is presented.
1650    ///
1651    /// # Arguments
1652    /// * `chunk` - Output chunk from `do_left_outer_join_with_non_equi_condition`, containing:
1653    ///     - Concatenation of probe row and its corresponding build row according to the hash map.
1654    ///     - Concatenation of probe row and `NULL` build row, if there is no matched build row
1655    ///       found for the probe row.
1656    /// * `cond` - Non-equi join condition.
1657    /// * `probe_column_count` - The number of columns in the probe side.
1658    /// * `first_output_row_id` - The offset of the first output row in `chunk` for each probe side
1659    ///   row that has been processed.
1660    /// * `has_more_output_rows` - Whether the probe row being processed currently has output rows
1661    ///   in next output chunk.
1662    /// * `found_matched` - Whether the probe row being processed currently has matched non-NULL
1663    ///   build rows in last output chunk.
1664    ///
1665    /// # Examples
1666    /// Assume we have two tables `t1` and `t2` as probe side and build side, respectively.
1667    /// ```sql
1668    /// CREATE TABLE t1 (v1 int, v2 int);
1669    /// CREATE TABLE t2 (v3 int);
1670    /// ```
1671    ///
1672    /// Now we de left outer join on `t1` and `t2`, as the following query shows:
1673    /// ```sql
1674    /// SELECT * FROM t1 LEFT JOIN t2 ON t1.v1 = t2.v3 AND t1.v2 <> t2.v3;
1675    /// ```
1676    ///
1677    /// Assume the chunk builder in `do_left_outer_join_with_non_equi_condition` has buffer size 5,
1678    /// and we have the following chunk as the first output ('-' represents NULL).
1679    ///
1680    /// | offset | v1 | v2 | v3 |
1681    /// |---|---|---|---|
1682    /// | 0 | 1 | 2 | 1 |
1683    /// | 1 | 1 | 1 | 1 |
1684    /// | 2 | 2 | 3 | - |
1685    /// | 3 | 3 | 3 | 3 |
1686    /// | 4 | 3 | 3 | 3 |
1687    ///
1688    /// We have the following precondition:
1689    /// ```ignore
1690    /// assert_eq!(probe_column_count, 2);
1691    /// assert_eq!(first_out_row_id, vec![0, 1, 2, 3]);
1692    /// assert_eq!(has_more_output_rows);
1693    /// assert_eq!(!found_matched);
1694    /// ```
1695    ///
1696    /// In `process_left_outer_join_non_equi_condition`, we transform the chunk in following steps.
1697    ///
1698    /// 1. Evaluate the non-equi condition on the chunk. Here the condition is `t1.v2 <> t2.v3`.
1699    ///
1700    /// We get the result array:
1701    ///
1702    /// | offset | value |
1703    /// | --- | --- |
1704    /// | 0 | true |
1705    /// | 1 | false |
1706    /// | 2 | false |
1707    /// | 3 | false |
1708    /// | 4 | false |
1709    ///
1710    /// 2. Set the build side columns to NULL if the corresponding result value is false.
1711    ///
1712    /// The chunk is changed to:
1713    ///
1714    /// | offset | v1 | v2 | v3 |
1715    /// |---|---|---|---|
1716    /// | 0 | 1 | 2 | 1 |
1717    /// | 1 | 1 | 1 | - |
1718    /// | 2 | 2 | 3 | - |
1719    /// | 3 | 3 | 3 | - |
1720    /// | 4 | 3 | 3 | - |
1721    ///
1722    /// 3. Remove duplicate rows with NULL build side. This is done by setting the visibility bitmap
1723    ///    of the chunk.
1724    ///
1725    /// | offset | v1 | v2 | v3 |
1726    /// |---|---|---|---|
1727    /// | 0 | 1 | 2 | 1 |
1728    /// | 1 | 1 | 1 | - |
1729    /// | 2 | 2 | 3 | - |
1730    /// | 3 | ~~3~~ | ~~3~~ | ~~-~~ |
1731    /// | 4 | ~~3~~ | ~~3~~ | ~~-~~ |
1732    ///
1733    /// For the probe row being processed currently (`(3, 3)` here), we don't have output rows with
1734    /// non-NULL build side, so we set `found_matched` to false.
1735    ///
1736    /// In `do_left_outer_join_with_non_equi_condition`, we have next output chunk as follows:
1737    ///
1738    /// | offset | v1 | v2 | v3 |
1739    /// |---|---|---|---|
1740    /// | 0 | 3 | 3 | 3 |
1741    /// | 1 | 3 | 3 | 3 |
1742    /// | 2 | 5 | 5 | - |
1743    /// | 3 | 5 | 3 | - |
1744    /// | 4 | 5 | 3 | - |
1745    ///
1746    /// This time We have the following precondition:
1747    /// ```ignore
1748    /// assert_eq!(probe_column_count, 2);
1749    /// assert_eq!(first_out_row_id, vec![2, 3]);
1750    /// assert_eq!(!has_more_output_rows);
1751    /// assert_eq!(!found_matched);
1752    /// ```
1753    ///
1754    /// The transformed chunk is as follows after the same steps.
1755    ///
1756    /// | offset | v1 | v2 | v3 |
1757    /// |---|---|---|---|
1758    /// | 0 | ~~3~~ | ~~3~~ | ~~3~~ |
1759    /// | 1 | 3 | 3 | - |
1760    /// | 2 | 5 | 5 | - |
1761    /// | 3 | 5 | 3 | - |
1762    /// | 4 | ~~5~~ | ~~3~~ | ~~-~~ |
1763    ///
1764    /// After we add these chunks to output chunk builder in `do_execute`, we get the final output:
1765    ///
1766    /// Chunk 1
1767    ///
1768    /// | offset | v1 | v2 | v3 |
1769    /// |---|---|---|---|
1770    /// | 0 | 1 | 2 | 1 |
1771    /// | 1 | 1 | 1 | - |
1772    /// | 2 | 2 | 3 | - |
1773    /// | 3 | 3 | 3 | - |
1774    /// | 4 | 5 | 5 | - |
1775    ///
1776    /// Chunk 2
1777    ///
1778    /// | offset | v1 | v2 | v3 |
1779    /// |---|---|---|---|
1780    /// | 0 | 5 | 3 | - |
1781    ///
1782    ///
1783    /// For more information about how `process_*_join_non_equi_condition` work, see their unit
1784    /// tests.
1785    async fn process_left_outer_join_non_equi_condition(
1786        chunk: DataChunk,
1787        cond: &dyn Expression,
1788        LeftNonEquiJoinState {
1789            probe_column_count,
1790            first_output_row_id,
1791            has_more_output_rows,
1792            found_matched,
1793        }: &mut LeftNonEquiJoinState,
1794    ) -> Result<DataChunk> {
1795        let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1796        Ok(DataChunkMutator(chunk)
1797            .nullify_build_side_for_non_equi_condition(&filter, *probe_column_count)
1798            .remove_duplicate_rows_for_left_outer_join(
1799                &filter,
1800                first_output_row_id,
1801                *has_more_output_rows,
1802                found_matched,
1803            )
1804            .take())
1805    }
1806
1807    /// Filters for candidate rows which satisfy `non_equi` predicate.
1808    /// Removes duplicate rows.
1809    async fn process_left_semi_anti_join_non_equi_condition<const ANTI_JOIN: bool>(
1810        chunk: DataChunk,
1811        cond: &dyn Expression,
1812        LeftNonEquiJoinState {
1813            first_output_row_id,
1814            found_matched,
1815            has_more_output_rows,
1816            ..
1817        }: &mut LeftNonEquiJoinState,
1818    ) -> Result<DataChunk> {
1819        let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1820        Ok(DataChunkMutator(chunk)
1821            .remove_duplicate_rows_for_left_semi_anti_join::<ANTI_JOIN>(
1822                &filter,
1823                first_output_row_id,
1824                *has_more_output_rows,
1825                found_matched,
1826            )
1827            .take())
1828    }
1829
1830    async fn process_right_outer_join_non_equi_condition(
1831        chunk: DataChunk,
1832        cond: &dyn Expression,
1833        RightNonEquiJoinState {
1834            build_row_ids,
1835            build_row_matched,
1836        }: &mut RightNonEquiJoinState,
1837    ) -> Result<DataChunk> {
1838        let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1839        Ok(DataChunkMutator(chunk)
1840            .remove_duplicate_rows_for_right_outer_join(&filter, build_row_ids, build_row_matched)
1841            .take())
1842    }
1843
1844    async fn process_right_semi_anti_join_non_equi_condition(
1845        chunk: DataChunk,
1846        cond: &dyn Expression,
1847        RightNonEquiJoinState {
1848            build_row_ids,
1849            build_row_matched,
1850        }: &mut RightNonEquiJoinState,
1851    ) -> Result<()> {
1852        let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1853        DataChunkMutator(chunk).remove_duplicate_rows_for_right_semi_anti_join(
1854            &filter,
1855            build_row_ids,
1856            build_row_matched,
1857        );
1858        Ok(())
1859    }
1860
1861    async fn process_full_outer_join_non_equi_condition(
1862        chunk: DataChunk,
1863        cond: &dyn Expression,
1864        left_non_equi_state: &mut LeftNonEquiJoinState,
1865        right_non_equi_state: &mut RightNonEquiJoinState,
1866    ) -> Result<DataChunk> {
1867        let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1868        Ok(DataChunkMutator(chunk)
1869            .nullify_build_side_for_non_equi_condition(
1870                &filter,
1871                left_non_equi_state.probe_column_count,
1872            )
1873            .remove_duplicate_rows_for_full_outer_join(
1874                &filter,
1875                left_non_equi_state,
1876                right_non_equi_state,
1877            )
1878            .take())
1879    }
1880
1881    #[try_stream(ok = DataChunk, error = BatchError)]
1882    async fn handle_remaining_build_rows_for_right_outer_join<'a>(
1883        chunk_builder: &'a mut DataChunkBuilder,
1884        build_side: &'a [DataChunk],
1885        build_row_matched: &'a ChunkedData<bool>,
1886        probe_column_count: usize,
1887    ) {
1888        for build_row_id in build_row_matched
1889            .all_row_ids()
1890            .filter(|build_row_id| !build_row_matched[*build_row_id])
1891        {
1892            let build_row =
1893                build_side[build_row_id.chunk_id()].row_at_unchecked_vis(build_row_id.row_id());
1894            if let Some(spilled) = Self::append_one_row_with_null_probe_side(
1895                chunk_builder,
1896                build_row,
1897                probe_column_count,
1898            ) {
1899                yield spilled
1900            }
1901        }
1902        if let Some(spilled) = chunk_builder.consume_all() {
1903            yield spilled
1904        }
1905    }
1906
1907    #[try_stream(ok = DataChunk, error = BatchError)]
1908    async fn handle_remaining_build_rows_for_right_semi_anti_join<'a, const ANTI_JOIN: bool>(
1909        chunk_builder: &'a mut DataChunkBuilder,
1910        build_side: &'a [DataChunk],
1911        build_row_matched: &'a ChunkedData<bool>,
1912    ) {
1913        for build_row_id in build_row_matched.all_row_ids().filter(|build_row_id| {
1914            if !ANTI_JOIN {
1915                build_row_matched[*build_row_id]
1916            } else {
1917                !build_row_matched[*build_row_id]
1918            }
1919        }) {
1920            if let Some(spilled) = Self::append_one_build_row(
1921                chunk_builder,
1922                &build_side[build_row_id.chunk_id()],
1923                build_row_id.row_id(),
1924            ) {
1925                yield spilled
1926            }
1927        }
1928        if let Some(spilled) = chunk_builder.consume_all() {
1929            yield spilled
1930        }
1931    }
1932
1933    fn append_one_row(
1934        chunk_builder: &mut DataChunkBuilder,
1935        probe_chunk: &DataChunk,
1936        probe_row_id: usize,
1937        build_chunk: &DataChunk,
1938        build_row_id: usize,
1939    ) -> Option<DataChunk> {
1940        chunk_builder.append_one_row_from_array_elements(
1941            probe_chunk.columns().iter().map(|c| c.as_ref()),
1942            probe_row_id,
1943            build_chunk.columns().iter().map(|c| c.as_ref()),
1944            build_row_id,
1945        )
1946    }
1947
1948    fn append_one_probe_row(
1949        chunk_builder: &mut DataChunkBuilder,
1950        probe_chunk: &DataChunk,
1951        probe_row_id: usize,
1952    ) -> Option<DataChunk> {
1953        chunk_builder.append_one_row_from_array_elements(
1954            probe_chunk.columns().iter().map(|c| c.as_ref()),
1955            probe_row_id,
1956            empty(),
1957            0,
1958        )
1959    }
1960
1961    fn append_one_build_row(
1962        chunk_builder: &mut DataChunkBuilder,
1963        build_chunk: &DataChunk,
1964        build_row_id: usize,
1965    ) -> Option<DataChunk> {
1966        chunk_builder.append_one_row_from_array_elements(
1967            empty(),
1968            0,
1969            build_chunk.columns().iter().map(|c| c.as_ref()),
1970            build_row_id,
1971        )
1972    }
1973
1974    fn append_one_row_with_null_build_side(
1975        chunk_builder: &mut DataChunkBuilder,
1976        probe_row_ref: RowRef<'_>,
1977        build_column_count: usize,
1978    ) -> Option<DataChunk> {
1979        chunk_builder.append_one_row(probe_row_ref.chain(repeat_n(Datum::None, build_column_count)))
1980    }
1981
1982    fn append_one_row_with_null_probe_side(
1983        chunk_builder: &mut DataChunkBuilder,
1984        build_row_ref: RowRef<'_>,
1985        probe_column_count: usize,
1986    ) -> Option<DataChunk> {
1987        chunk_builder.append_one_row(repeat_n(Datum::None, probe_column_count).chain(build_row_ref))
1988    }
1989
1990    fn find_asof_matched_rows(
1991        probe_row_ref: RowRef<'_>,
1992        build_side: &[DataChunk],
1993        build_side_row_iter: RowIdIter<'_>,
1994        asof_join_condition: &AsOfDesc,
1995    ) -> Option<RowId> {
1996        let probe_inequality_value = probe_row_ref.datum_at(asof_join_condition.left_idx);
1997        if let Some(probe_inequality_scalar) = probe_inequality_value {
1998            let mut result_row_id: Option<RowId> = None;
1999            let mut build_row_ref;
2000
2001            for build_row_id in build_side_row_iter {
2002                build_row_ref =
2003                    build_side[build_row_id.chunk_id()].row_at_unchecked_vis(build_row_id.row_id());
2004                let build_inequality_value = build_row_ref.datum_at(asof_join_condition.right_idx);
2005                if let Some(build_inequality_scalar) = build_inequality_value {
2006                    let mut pick_result = |compare: fn(Ordering) -> bool| {
2007                        if let Some(result_row_id_inner) = result_row_id {
2008                            let result_row_ref = build_side[result_row_id_inner.chunk_id()]
2009                                .row_at_unchecked_vis(result_row_id_inner.row_id());
2010                            let result_inequality_scalar = result_row_ref
2011                                .datum_at(asof_join_condition.right_idx)
2012                                .unwrap();
2013                            if compare(
2014                                probe_inequality_scalar.default_cmp(&build_inequality_scalar),
2015                            ) && compare(
2016                                build_inequality_scalar.default_cmp(&result_inequality_scalar),
2017                            ) {
2018                                result_row_id = Some(build_row_id);
2019                            }
2020                        } else if compare(
2021                            probe_inequality_scalar.default_cmp(&build_inequality_scalar),
2022                        ) {
2023                            result_row_id = Some(build_row_id);
2024                        }
2025                    };
2026                    match asof_join_condition.inequality_type {
2027                        AsOfInequalityType::Lt => {
2028                            pick_result(Ordering::is_lt);
2029                        }
2030                        AsOfInequalityType::Le => {
2031                            pick_result(Ordering::is_le);
2032                        }
2033                        AsOfInequalityType::Gt => {
2034                            pick_result(Ordering::is_gt);
2035                        }
2036                        AsOfInequalityType::Ge => {
2037                            pick_result(Ordering::is_ge);
2038                        }
2039                    }
2040                }
2041            }
2042            result_row_id
2043        } else {
2044            None
2045        }
2046    }
2047}
2048
2049/// `DataChunkMutator` transforms the given data chunk for non-equi join.
2050#[repr(transparent)]
2051struct DataChunkMutator(DataChunk);
2052
2053impl DataChunkMutator {
2054    fn nullify_build_side_for_non_equi_condition(
2055        self,
2056        filter: &Bitmap,
2057        probe_column_count: usize,
2058    ) -> Self {
2059        let (mut columns, vis) = self.0.into_parts();
2060
2061        for build_column in columns.split_off(probe_column_count) {
2062            // Is it really safe to use Arc::try_unwrap here?
2063            let mut array = Arc::try_unwrap(build_column).unwrap();
2064            array.set_bitmap(array.null_bitmap() & filter);
2065            columns.push(array.into());
2066        }
2067
2068        Self(DataChunk::new(columns, vis))
2069    }
2070
2071    fn remove_duplicate_rows_for_left_outer_join(
2072        mut self,
2073        filter: &Bitmap,
2074        first_output_row_ids: &mut Vec<usize>,
2075        has_more_output_rows: bool,
2076        found_non_null: &mut bool,
2077    ) -> Self {
2078        let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2079
2080        for (&start_row_id, &end_row_id) in iter::once(&0)
2081            .chain(first_output_row_ids.iter())
2082            .tuple_windows()
2083            .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2084        {
2085            for row_id in start_row_id..end_row_id {
2086                if filter.is_set(row_id) {
2087                    *found_non_null = true;
2088                    new_visibility.set(row_id, true);
2089                }
2090            }
2091            if !*found_non_null {
2092                new_visibility.set(start_row_id, true);
2093            }
2094            *found_non_null = false;
2095        }
2096
2097        let start_row_id = first_output_row_ids.last().copied().unwrap_or_default();
2098        for row_id in start_row_id..filter.len() {
2099            if filter.is_set(row_id) {
2100                *found_non_null = true;
2101                new_visibility.set(row_id, true);
2102            }
2103        }
2104        if !has_more_output_rows {
2105            if !*found_non_null {
2106                new_visibility.set(start_row_id, true);
2107            }
2108            *found_non_null = false;
2109        }
2110
2111        first_output_row_ids.clear();
2112
2113        self.0
2114            .set_visibility(new_visibility.finish() & self.0.visibility());
2115        self
2116    }
2117
2118    /// Removes duplicate rows using `filter`
2119    /// and only returns the first match for each window.
2120    /// Windows are indicated by `first_output_row_ids`.
2121    fn remove_duplicate_rows_for_left_semi_anti_join<const ANTI_JOIN: bool>(
2122        mut self,
2123        filter: &Bitmap,
2124        first_output_row_ids: &mut Vec<usize>,
2125        has_more_output_rows: bool,
2126        found_matched: &mut bool,
2127    ) -> Self {
2128        let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2129
2130        for (&start_row_id, &end_row_id) in iter::once(&0)
2131            .chain(first_output_row_ids.iter())
2132            .tuple_windows()
2133            .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2134        {
2135            for row_id in start_row_id..end_row_id {
2136                if filter.is_set(row_id) {
2137                    if !ANTI_JOIN && !*found_matched {
2138                        new_visibility.set(row_id, true);
2139                    }
2140                    *found_matched = true;
2141                    break;
2142                }
2143            }
2144            if ANTI_JOIN && !*found_matched {
2145                new_visibility.set(start_row_id, true);
2146            }
2147            *found_matched = false;
2148        }
2149
2150        let start_row_id = first_output_row_ids.last().copied().unwrap_or_default();
2151        for row_id in start_row_id..filter.len() {
2152            if filter.is_set(row_id) {
2153                if !ANTI_JOIN && !*found_matched {
2154                    new_visibility.set(row_id, true);
2155                }
2156                *found_matched = true;
2157                break;
2158            }
2159        }
2160        if !has_more_output_rows && ANTI_JOIN {
2161            if !*found_matched {
2162                new_visibility.set(start_row_id, true);
2163            }
2164            *found_matched = false;
2165        }
2166
2167        first_output_row_ids.clear();
2168
2169        self.0
2170            .set_visibility(new_visibility.finish() & self.0.visibility());
2171        self
2172    }
2173
2174    fn remove_duplicate_rows_for_right_outer_join(
2175        mut self,
2176        filter: &Bitmap,
2177        build_row_ids: &mut Vec<RowId>,
2178        build_row_matched: &mut ChunkedData<bool>,
2179    ) -> Self {
2180        let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2181        for (output_row_id, (output_row_non_null, &build_row_id)) in
2182            filter.iter().zip_eq_fast(build_row_ids.iter()).enumerate()
2183        {
2184            if output_row_non_null {
2185                build_row_matched[build_row_id] = true;
2186                new_visibility.set(output_row_id, true);
2187            }
2188        }
2189
2190        build_row_ids.clear();
2191
2192        self.0
2193            .set_visibility(new_visibility.finish() & self.0.visibility());
2194        self
2195    }
2196
2197    fn remove_duplicate_rows_for_right_semi_anti_join(
2198        self,
2199        filter: &Bitmap,
2200        build_row_ids: &mut Vec<RowId>,
2201        build_row_matched: &mut ChunkedData<bool>,
2202    ) {
2203        for (output_row_non_null, &build_row_id) in filter.iter().zip_eq_fast(build_row_ids.iter())
2204        {
2205            if output_row_non_null {
2206                build_row_matched[build_row_id] = true;
2207            }
2208        }
2209
2210        build_row_ids.clear();
2211    }
2212
2213    fn remove_duplicate_rows_for_full_outer_join(
2214        mut self,
2215        filter: &Bitmap,
2216        LeftNonEquiJoinState {
2217            first_output_row_id,
2218            has_more_output_rows,
2219            found_matched,
2220            ..
2221        }: &mut LeftNonEquiJoinState,
2222        RightNonEquiJoinState {
2223            build_row_ids,
2224            build_row_matched,
2225        }: &mut RightNonEquiJoinState,
2226    ) -> Self {
2227        let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2228
2229        for (&start_row_id, &end_row_id) in iter::once(&0)
2230            .chain(first_output_row_id.iter())
2231            .tuple_windows()
2232            .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2233        {
2234            for row_id in start_row_id..end_row_id {
2235                if filter.is_set(row_id) {
2236                    *found_matched = true;
2237                    new_visibility.set(row_id, true);
2238                }
2239            }
2240            if !*found_matched {
2241                new_visibility.set(start_row_id, true);
2242            }
2243            *found_matched = false;
2244        }
2245
2246        let start_row_id = first_output_row_id.last().copied().unwrap_or_default();
2247        for row_id in start_row_id..filter.len() {
2248            if filter.is_set(row_id) {
2249                *found_matched = true;
2250                new_visibility.set(row_id, true);
2251            }
2252        }
2253        if !*has_more_output_rows && !*found_matched {
2254            new_visibility.set(start_row_id, true);
2255        }
2256
2257        first_output_row_id.clear();
2258
2259        for (output_row_id, (output_row_non_null, &build_row_id)) in
2260            filter.iter().zip_eq_fast(build_row_ids.iter()).enumerate()
2261        {
2262            if output_row_non_null {
2263                build_row_matched[build_row_id] = true;
2264                new_visibility.set(output_row_id, true);
2265            }
2266        }
2267
2268        build_row_ids.clear();
2269
2270        self.0
2271            .set_visibility(new_visibility.finish() & self.0.visibility());
2272        self
2273    }
2274
2275    fn take(self) -> DataChunk {
2276        self.0
2277    }
2278}
2279
2280impl BoxedExecutorBuilder for HashJoinExecutor<()> {
2281    async fn new_boxed_executor(
2282        context: &ExecutorBuilder<'_>,
2283        inputs: Vec<BoxedExecutor>,
2284    ) -> Result<BoxedExecutor> {
2285        let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap();
2286
2287        let hash_join_node = try_match_expand!(
2288            context.plan_node().get_node_body().unwrap(),
2289            NodeBody::HashJoin
2290        )?;
2291
2292        let join_type = JoinType::from_prost(hash_join_node.get_join_type()?);
2293
2294        let cond = match hash_join_node.get_condition() {
2295            Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
2296            Err(_) => None,
2297        };
2298
2299        let left_key_idxs = hash_join_node
2300            .get_left_key()
2301            .iter()
2302            .map(|&idx| idx as usize)
2303            .collect_vec();
2304        let right_key_idxs = hash_join_node
2305            .get_right_key()
2306            .iter()
2307            .map(|&idx| idx as usize)
2308            .collect_vec();
2309
2310        ensure!(left_key_idxs.len() == right_key_idxs.len());
2311
2312        let right_data_types = right_child.schema().data_types();
2313        let right_key_types = right_key_idxs
2314            .iter()
2315            .map(|&idx| right_data_types[idx].clone())
2316            .collect_vec();
2317
2318        let output_indices: Vec<usize> = hash_join_node
2319            .get_output_indices()
2320            .iter()
2321            .map(|&x| x as usize)
2322            .collect();
2323
2324        let identity = context.plan_node().get_identity().clone();
2325
2326        let asof_desc = hash_join_node
2327            .asof_desc
2328            .map(|desc| AsOfDesc::from_protobuf(&desc))
2329            .transpose()?;
2330
2331        Ok(HashJoinExecutorArgs {
2332            join_type,
2333            output_indices,
2334            probe_side_source: left_child,
2335            build_side_source: right_child,
2336            probe_key_idxs: left_key_idxs,
2337            build_key_idxs: right_key_idxs,
2338            null_matched: hash_join_node.get_null_safe().clone(),
2339            cond,
2340            identity: identity.clone(),
2341            right_key_types,
2342            chunk_size: context.context().get_config().developer.chunk_size,
2343            asof_desc,
2344            spill_backend: if context.context().get_config().enable_spill {
2345                Some(Disk)
2346            } else {
2347                None
2348            },
2349            spill_metrics: context.context().spill_metrics(),
2350            shutdown_rx: context.shutdown_rx().clone(),
2351            mem_ctx: context.context().create_executor_mem_context(&identity),
2352        }
2353        .dispatch())
2354    }
2355}
2356
2357struct HashJoinExecutorArgs {
2358    join_type: JoinType,
2359    output_indices: Vec<usize>,
2360    probe_side_source: BoxedExecutor,
2361    build_side_source: BoxedExecutor,
2362    probe_key_idxs: Vec<usize>,
2363    build_key_idxs: Vec<usize>,
2364    null_matched: Vec<bool>,
2365    cond: Option<BoxedExpression>,
2366    identity: String,
2367    right_key_types: Vec<DataType>,
2368    chunk_size: usize,
2369    asof_desc: Option<AsOfDesc>,
2370    spill_backend: Option<SpillBackend>,
2371    spill_metrics: Arc<BatchSpillMetrics>,
2372    shutdown_rx: ShutdownToken,
2373    mem_ctx: MemoryContext,
2374}
2375
2376impl HashKeyDispatcher for HashJoinExecutorArgs {
2377    type Output = BoxedExecutor;
2378
2379    fn dispatch_impl<K: HashKey>(self) -> Self::Output {
2380        Box::new(HashJoinExecutor::<K>::new(
2381            self.join_type,
2382            self.output_indices,
2383            self.probe_side_source,
2384            self.build_side_source,
2385            self.probe_key_idxs,
2386            self.build_key_idxs,
2387            self.null_matched,
2388            self.cond.map(Arc::new),
2389            self.identity,
2390            self.chunk_size,
2391            self.asof_desc,
2392            self.spill_backend,
2393            self.spill_metrics,
2394            self.shutdown_rx,
2395            self.mem_ctx,
2396        ))
2397    }
2398
2399    fn data_types(&self) -> &[DataType] {
2400        &self.right_key_types
2401    }
2402}
2403
2404impl<K> HashJoinExecutor<K> {
2405    #[allow(clippy::too_many_arguments)]
2406    pub fn new(
2407        join_type: JoinType,
2408        output_indices: Vec<usize>,
2409        probe_side_source: BoxedExecutor,
2410        build_side_source: BoxedExecutor,
2411        probe_key_idxs: Vec<usize>,
2412        build_key_idxs: Vec<usize>,
2413        null_matched: Vec<bool>,
2414        cond: Option<Arc<BoxedExpression>>,
2415        identity: String,
2416        chunk_size: usize,
2417        asof_desc: Option<AsOfDesc>,
2418        spill_backend: Option<SpillBackend>,
2419        spill_metrics: Arc<BatchSpillMetrics>,
2420        shutdown_rx: ShutdownToken,
2421        mem_ctx: MemoryContext,
2422    ) -> Self {
2423        Self::new_inner(
2424            join_type,
2425            output_indices,
2426            probe_side_source,
2427            build_side_source,
2428            probe_key_idxs,
2429            build_key_idxs,
2430            null_matched,
2431            cond,
2432            identity,
2433            chunk_size,
2434            asof_desc,
2435            spill_backend,
2436            spill_metrics,
2437            None,
2438            shutdown_rx,
2439            mem_ctx,
2440        )
2441    }
2442
2443    #[allow(clippy::too_many_arguments)]
2444    fn new_inner(
2445        join_type: JoinType,
2446        output_indices: Vec<usize>,
2447        probe_side_source: BoxedExecutor,
2448        build_side_source: BoxedExecutor,
2449        probe_key_idxs: Vec<usize>,
2450        build_key_idxs: Vec<usize>,
2451        null_matched: Vec<bool>,
2452        cond: Option<Arc<BoxedExpression>>,
2453        identity: String,
2454        chunk_size: usize,
2455        asof_desc: Option<AsOfDesc>,
2456        spill_backend: Option<SpillBackend>,
2457        spill_metrics: Arc<BatchSpillMetrics>,
2458        memory_upper_bound: Option<u64>,
2459        shutdown_rx: ShutdownToken,
2460        mem_ctx: MemoryContext,
2461    ) -> Self {
2462        assert_eq!(probe_key_idxs.len(), build_key_idxs.len());
2463        assert_eq!(probe_key_idxs.len(), null_matched.len());
2464        let original_schema = match join_type {
2465            JoinType::LeftSemi | JoinType::LeftAnti => probe_side_source.schema().clone(),
2466            JoinType::RightSemi | JoinType::RightAnti => build_side_source.schema().clone(),
2467            _ => Schema::from_iter(
2468                probe_side_source
2469                    .schema()
2470                    .fields()
2471                    .iter()
2472                    .chain(build_side_source.schema().fields().iter())
2473                    .cloned(),
2474            ),
2475        };
2476        let schema = Schema::from_iter(
2477            output_indices
2478                .iter()
2479                .map(|&idx| original_schema[idx].clone()),
2480        );
2481        Self {
2482            join_type,
2483            original_schema,
2484            schema,
2485            output_indices,
2486            probe_side_source,
2487            build_side_source,
2488            probe_key_idxs,
2489            build_key_idxs,
2490            null_matched,
2491            cond,
2492            identity,
2493            chunk_size,
2494            asof_desc,
2495            shutdown_rx,
2496            spill_backend,
2497            spill_metrics,
2498            memory_upper_bound,
2499            mem_ctx,
2500            _phantom: PhantomData,
2501        }
2502    }
2503}
2504
2505#[cfg(test)]
2506mod tests {
2507    use futures::StreamExt;
2508    use futures_async_stream::for_await;
2509    use itertools::Itertools;
2510    use risingwave_common::array::{ArrayBuilderImpl, DataChunk};
2511    use risingwave_common::catalog::{Field, Schema};
2512    use risingwave_common::hash::Key32;
2513    use risingwave_common::memory::MemoryContext;
2514    use risingwave_common::metrics::LabelGuardedIntGauge;
2515    use risingwave_common::test_prelude::DataChunkTestExt;
2516    use risingwave_common::types::DataType;
2517    use risingwave_common::util::iter_util::ZipEqDebug;
2518    use risingwave_common::util::memcmp_encoding::encode_chunk;
2519    use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
2520    use risingwave_expr::expr::{BoxedExpression, build_from_pretty};
2521
2522    use super::{
2523        AsOfDesc, AsOfInequalityType, ChunkedData, HashJoinExecutor, JoinType,
2524        LeftNonEquiJoinState, RightNonEquiJoinState, RowId,
2525    };
2526    use crate::error::Result;
2527    use crate::executor::test_utils::MockExecutor;
2528    use crate::executor::{BoxedExecutor, Executor};
2529    use crate::monitor::BatchSpillMetrics;
2530    use crate::spill::spill_op::SpillBackend;
2531    use crate::task::ShutdownToken;
2532
2533    const CHUNK_SIZE: usize = 1024;
2534
2535    struct DataChunkMerger {
2536        array_builders: Vec<ArrayBuilderImpl>,
2537        array_len: usize,
2538    }
2539
2540    impl DataChunkMerger {
2541        fn new(data_types: Vec<DataType>) -> Result<Self> {
2542            let array_builders = data_types
2543                .iter()
2544                .map(|data_type| data_type.create_array_builder(CHUNK_SIZE))
2545                .collect();
2546
2547            Ok(Self {
2548                array_builders,
2549                array_len: 0,
2550            })
2551        }
2552
2553        fn append(&mut self, data_chunk: &DataChunk) -> Result<()> {
2554            ensure!(self.array_builders.len() == data_chunk.dimension());
2555            for idx in 0..self.array_builders.len() {
2556                self.array_builders[idx].append_array(data_chunk.column_at(idx));
2557            }
2558            self.array_len += data_chunk.capacity();
2559
2560            Ok(())
2561        }
2562
2563        fn finish(self) -> Result<DataChunk> {
2564            let columns = self
2565                .array_builders
2566                .into_iter()
2567                .map(|b| b.finish().into())
2568                .collect();
2569
2570            Ok(DataChunk::new(columns, self.array_len))
2571        }
2572    }
2573
2574    /// Sort each row in the data chunk and compare with the rows in the data chunk.
2575    fn compare_data_chunk_with_rowsort(left: &DataChunk, right: &DataChunk) -> bool {
2576        assert!(left.is_vis_compacted());
2577        assert!(right.is_vis_compacted());
2578
2579        if left.cardinality() != right.cardinality() {
2580            return false;
2581        }
2582
2583        // Sort and compare
2584        let column_orders = (0..left.columns().len())
2585            .map(|i| ColumnOrder::new(i, OrderType::ascending()))
2586            .collect_vec();
2587        let left_encoded_chunk = encode_chunk(left, &column_orders).unwrap();
2588        let mut sorted_left = left_encoded_chunk
2589            .into_iter()
2590            .enumerate()
2591            .map(|(row_id, row)| (left.row_at_unchecked_vis(row_id), row))
2592            .collect_vec();
2593        sorted_left.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
2594
2595        let right_encoded_chunk = encode_chunk(right, &column_orders).unwrap();
2596        let mut sorted_right = right_encoded_chunk
2597            .into_iter()
2598            .enumerate()
2599            .map(|(row_id, row)| (right.row_at_unchecked_vis(row_id), row))
2600            .collect_vec();
2601        sorted_right.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
2602
2603        sorted_left
2604            .into_iter()
2605            .map(|(row, _)| row)
2606            .zip_eq_debug(sorted_right.into_iter().map(|(row, _)| row))
2607            .all(|(row1, row2)| row1 == row2)
2608    }
2609
2610    struct TestFixture {
2611        left_types: Vec<DataType>,
2612        right_types: Vec<DataType>,
2613        join_type: JoinType,
2614    }
2615
2616    /// Sql for creating test data:
2617    /// ```sql
2618    /// drop table t1 if exists;
2619    /// create table t1(v1 int, v2 float);
2620    /// insert into t1 values
2621    /// (1, 6.1::FLOAT), (2, null), (null, 8.4::FLOAT), (3, 3.9::FLOAT), (null, null),
2622    /// (4, 6.6::FLOAT), (3, null), (null, 0.7::FLOAT), (5, null), (null, 5.5::FLOAT);
2623    ///
2624    /// drop table t2 if exists;
2625    /// create table t2(v1 int, v2 real);
2626    /// insert into t2 values
2627    /// (8, 6.1::REAL), (2, null), (null, 8.9::REAL), (3, null), (null, 3.5::REAL),
2628    /// (6, null), (4, 7.5::REAL), (6, null), (null, 8::REAL), (7, null),
2629    /// (null, 9.1::REAL), (9, null), (3, 3.7::REAL), (9, null), (null, 9.6::REAL),
2630    /// (100, null), (null, 8.18::REAL), (200, null);
2631    /// ```
2632    impl TestFixture {
2633        fn with_join_type(join_type: JoinType) -> Self {
2634            Self {
2635                left_types: vec![DataType::Int32, DataType::Float32],
2636                right_types: vec![DataType::Int32, DataType::Float64],
2637                join_type,
2638            }
2639        }
2640
2641        fn create_left_executor(&self) -> BoxedExecutor {
2642            let schema = Schema {
2643                fields: vec![
2644                    Field::unnamed(DataType::Int32),
2645                    Field::unnamed(DataType::Float32),
2646                ],
2647            };
2648            let mut executor = MockExecutor::new(schema);
2649
2650            executor.add(DataChunk::from_pretty(
2651                "i f
2652                 1 6.1
2653                 2 .
2654                 . 8.4
2655                 3 3.9
2656                 . .  ",
2657            ));
2658
2659            executor.add(DataChunk::from_pretty(
2660                "i f
2661                 4 6.6
2662                 3 .
2663                 . 0.7
2664                 5 .
2665                 . 5.5",
2666            ));
2667
2668            Box::new(executor)
2669        }
2670
2671        fn create_right_executor(&self) -> BoxedExecutor {
2672            let schema = Schema {
2673                fields: vec![
2674                    Field::unnamed(DataType::Int32),
2675                    Field::unnamed(DataType::Float64),
2676                ],
2677            };
2678            let mut executor = MockExecutor::new(schema);
2679
2680            executor.add(DataChunk::from_pretty(
2681                "i F
2682                 8 6.1
2683                 2 .
2684                 . 8.9
2685                 3 .
2686                 . 3.5
2687                 6 .  ",
2688            ));
2689
2690            executor.add(DataChunk::from_pretty(
2691                "i F
2692                 4 7.5
2693                 6 .
2694                 . 8
2695                 7 .
2696                 . 9.1
2697                 9 .  ",
2698            ));
2699
2700            executor.add(DataChunk::from_pretty(
2701                "  i F
2702                   3 3.7
2703                   9 .
2704                   . 9.6
2705                 100 .
2706                   . 8.18
2707                 200 .   ",
2708            ));
2709
2710            Box::new(executor)
2711        }
2712
2713        fn output_data_types(&self) -> Vec<DataType> {
2714            let join_type = self.join_type;
2715            if join_type.keep_all() {
2716                [self.left_types.clone(), self.right_types.clone()].concat()
2717            } else if join_type.keep_left() {
2718                self.left_types.clone()
2719            } else if join_type.keep_right() {
2720                self.right_types.clone()
2721            } else {
2722                unreachable!()
2723            }
2724        }
2725
2726        fn create_cond() -> BoxedExpression {
2727            build_from_pretty("(less_than:boolean $1:float4 $3:float8)")
2728        }
2729
2730        fn create_join_executor_with_chunk_size_and_executors(
2731            &self,
2732            has_non_equi_cond: bool,
2733            null_safe: bool,
2734            chunk_size: usize,
2735            left_child: BoxedExecutor,
2736            right_child: BoxedExecutor,
2737            shutdown_rx: ShutdownToken,
2738            parent_mem_ctx: Option<MemoryContext>,
2739            test_spill: bool,
2740        ) -> BoxedExecutor {
2741            let join_type = self.join_type;
2742
2743            let output_indices = (0..match join_type {
2744                JoinType::LeftSemi | JoinType::LeftAnti => left_child.schema().fields().len(),
2745                JoinType::RightSemi | JoinType::RightAnti => right_child.schema().fields().len(),
2746                _ => left_child.schema().fields().len() + right_child.schema().fields().len(),
2747            })
2748                .collect();
2749
2750            let cond = if has_non_equi_cond {
2751                Some(Self::create_cond().into())
2752            } else {
2753                None
2754            };
2755
2756            let mem_ctx = if test_spill {
2757                MemoryContext::new_with_mem_limit(
2758                    parent_mem_ctx,
2759                    LabelGuardedIntGauge::test_int_gauge::<4>(),
2760                    0,
2761                )
2762            } else {
2763                MemoryContext::new(parent_mem_ctx, LabelGuardedIntGauge::test_int_gauge::<4>())
2764            };
2765            Box::new(HashJoinExecutor::<Key32>::new(
2766                join_type,
2767                output_indices,
2768                left_child,
2769                right_child,
2770                vec![0],
2771                vec![0],
2772                vec![null_safe],
2773                cond,
2774                "HashJoinExecutor".to_owned(),
2775                chunk_size,
2776                None,
2777                if test_spill {
2778                    Some(SpillBackend::Memory)
2779                } else {
2780                    None
2781                },
2782                BatchSpillMetrics::for_test(),
2783                shutdown_rx,
2784                mem_ctx,
2785            ))
2786        }
2787
2788        async fn do_test(&self, expected: DataChunk, has_non_equi_cond: bool, null_safe: bool) {
2789            let left_executor = self.create_left_executor();
2790            let right_executor = self.create_right_executor();
2791            self.do_test_with_chunk_size_and_executors(
2792                expected.clone(),
2793                has_non_equi_cond,
2794                null_safe,
2795                self::CHUNK_SIZE,
2796                left_executor,
2797                right_executor,
2798                false,
2799            )
2800            .await;
2801
2802            // Test spill
2803            let left_executor = self.create_left_executor();
2804            let right_executor = self.create_right_executor();
2805            self.do_test_with_chunk_size_and_executors(
2806                expected,
2807                has_non_equi_cond,
2808                null_safe,
2809                self::CHUNK_SIZE,
2810                left_executor,
2811                right_executor,
2812                true,
2813            )
2814            .await;
2815        }
2816
2817        async fn do_test_with_chunk_size_and_executors(
2818            &self,
2819            expected: DataChunk,
2820            has_non_equi_cond: bool,
2821            null_safe: bool,
2822            chunk_size: usize,
2823            left_executor: BoxedExecutor,
2824            right_executor: BoxedExecutor,
2825            test_spill: bool,
2826        ) {
2827            let parent_mem_context =
2828                MemoryContext::root(LabelGuardedIntGauge::test_int_gauge::<4>(), u64::MAX);
2829
2830            {
2831                let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2832                    has_non_equi_cond,
2833                    null_safe,
2834                    chunk_size,
2835                    left_executor,
2836                    right_executor,
2837                    ShutdownToken::empty(),
2838                    Some(parent_mem_context.clone()),
2839                    test_spill,
2840                );
2841
2842                let mut data_chunk_merger = DataChunkMerger::new(self.output_data_types()).unwrap();
2843
2844                let fields = &join_executor.schema().fields;
2845
2846                if self.join_type.keep_all() {
2847                    assert_eq!(fields[1].data_type, DataType::Float32);
2848                    assert_eq!(fields[3].data_type, DataType::Float64);
2849                } else if self.join_type.keep_left() {
2850                    assert_eq!(fields[1].data_type, DataType::Float32);
2851                } else if self.join_type.keep_right() {
2852                    assert_eq!(fields[1].data_type, DataType::Float64)
2853                } else {
2854                    unreachable!()
2855                }
2856
2857                let mut stream = join_executor.execute();
2858
2859                while let Some(data_chunk) = stream.next().await {
2860                    let data_chunk = data_chunk.unwrap();
2861                    let data_chunk = data_chunk.compact_vis();
2862                    data_chunk_merger.append(&data_chunk).unwrap();
2863                }
2864
2865                let result_chunk = data_chunk_merger.finish().unwrap();
2866                println!("expected: {:?}", expected);
2867                println!("result: {:?}", result_chunk);
2868
2869                // TODO: Replace this with unsorted comparison
2870                // assert_eq!(expected, result_chunk);
2871                assert!(compare_data_chunk_with_rowsort(&expected, &result_chunk));
2872            }
2873
2874            assert_eq!(0, parent_mem_context.get_bytes_used());
2875        }
2876
2877        async fn do_test_shutdown(&self, has_non_equi_cond: bool) {
2878            // Test `ShutdownMsg::Cancel`
2879            let left_executor = self.create_left_executor();
2880            let right_executor = self.create_right_executor();
2881            let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
2882            let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2883                has_non_equi_cond,
2884                false,
2885                self::CHUNK_SIZE,
2886                left_executor,
2887                right_executor,
2888                shutdown_rx,
2889                None,
2890                false,
2891            );
2892            shutdown_tx.cancel();
2893            #[for_await]
2894            for chunk in join_executor.execute() {
2895                assert!(chunk.is_err());
2896                break;
2897            }
2898
2899            // Test `ShutdownMsg::Abort`
2900            let left_executor = self.create_left_executor();
2901            let right_executor = self.create_right_executor();
2902            let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
2903            let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2904                has_non_equi_cond,
2905                false,
2906                self::CHUNK_SIZE,
2907                left_executor,
2908                right_executor,
2909                shutdown_rx,
2910                None,
2911                false,
2912            );
2913            shutdown_tx.abort("test");
2914            #[for_await]
2915            for chunk in join_executor.execute() {
2916                assert!(chunk.is_err());
2917                break;
2918            }
2919        }
2920    }
2921
2922    /// Sql:
2923    /// ```sql
2924    /// select * from t1 join t2 on t1.v1 = t2.v1;
2925    /// ```
2926    #[tokio::test]
2927    async fn test_inner_join() {
2928        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2929
2930        let expected_chunk = DataChunk::from_pretty(
2931            "i   f   i   F
2932             2   .   2   .
2933             3   3.9 3   3.7
2934             3   3.9 3   .
2935             4   6.6 4   7.5
2936             3   .   3   3.7
2937             3   .   3   .",
2938        );
2939
2940        test_fixture.do_test(expected_chunk, false, false).await;
2941    }
2942
2943    /// Sql:
2944    /// ```sql
2945    /// select * from t1 join t2 on t1.v1 is not distinct from t2.v1;
2946    /// ```
2947    #[tokio::test]
2948    async fn test_null_safe_inner_join() {
2949        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2950
2951        let expected_chunk = DataChunk::from_pretty(
2952            "i   f   i   F
2953             2    .  2     .
2954             .  8.4  .  8.18
2955             .  8.4  .  9.6
2956             .  8.4  .  9.1
2957             .  8.4  .  8
2958             .  8.4  .  3.5
2959             .  8.4  .  8.9
2960             3  3.9  3  3.7
2961             3  3.9  3     .
2962             .    .  .  8.18
2963             .    .  .  9.6
2964             .    .  .  9.1
2965             .    .  .  8
2966             .    .  .  3.5
2967             .    .  .  8.9
2968             4  6.6  4  7.5
2969             3    .  3  3.7
2970             3    .  3     .
2971             .  0.7  .  8.18
2972             .  0.7  .  9.6
2973             .  0.7  .  9.1
2974             .  0.7  .  8
2975             .  0.7  .  3.5
2976             .  0.7  .  8.9
2977             .  5.5  .  8.18
2978             .  5.5  .  9.6
2979             .  5.5  .  9.1
2980             .  5.5  .  8
2981             .  5.5  .  3.5
2982             .  5.5  .  8.9",
2983        );
2984
2985        test_fixture.do_test(expected_chunk, false, true).await;
2986    }
2987
2988    /// Sql:
2989    /// ```sql
2990    /// select * from t1 join t2 on t1.v1 = t2.v1 and t1.v2 < t2.v2;
2991    /// ```
2992    #[tokio::test]
2993    async fn test_inner_join_with_non_equi_condition() {
2994        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2995
2996        let expected_chunk = DataChunk::from_pretty(
2997            "i   f   i   F
2998             4   6.6 4   7.5",
2999        );
3000
3001        test_fixture.do_test(expected_chunk, true, false).await;
3002    }
3003
3004    /// Sql:
3005    /// ```sql
3006    /// select t1.v2 as t1_v2, t2.v2 as t2_v2 from t1 left outer join t2 on t1.v1 = t2.v1;
3007    /// ```
3008    #[tokio::test]
3009    async fn test_left_outer_join() {
3010        let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
3011
3012        let expected_chunk = DataChunk::from_pretty(
3013            "i   f   i   F
3014             1   6.1 .   .
3015             2   .   2   .
3016             .   8.4 .   .
3017             3   3.9 3   3.7
3018             3   3.9 3   .
3019             .   .   .   .
3020             4   6.6 4   7.5
3021             3   .   3   3.7
3022             3   .   3   .
3023             .   0.7 .   .
3024             5   .   .   .
3025             .   5.5 .   .",
3026        );
3027
3028        test_fixture.do_test(expected_chunk, false, false).await;
3029    }
3030
3031    /// Sql:
3032    /// ```sql
3033    /// select * from t1 left outer join t2 on t1.v1 = t2.v1 and t1.v2 < t2.v2;
3034    /// ```
3035    #[tokio::test]
3036    async fn test_left_outer_join_with_non_equi_condition() {
3037        let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
3038
3039        let expected_chunk = DataChunk::from_pretty(
3040            "i   f   i   F
3041             2   .   .   .
3042             3   3.9 .   .
3043             4   6.6 4   7.5
3044             3   .   .   .
3045             1   6.1 .   .
3046             .   8.4 .   .
3047             .   .   .   .
3048             .   0.7 .   .
3049             5   .   .   .
3050             .   5.5 .   .",
3051        );
3052
3053        test_fixture.do_test(expected_chunk, true, false).await;
3054    }
3055
3056    /// Sql:
3057    /// ```sql
3058    /// select * from t1 right outer join t2 on t1.v1 = t2.v1;
3059    /// ```
3060    #[tokio::test]
3061    async fn test_right_outer_join() {
3062        let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
3063
3064        let expected_chunk = DataChunk::from_pretty(
3065            "i   f   i   F
3066             2   .   2   .
3067             3   3.9 3   3.7
3068             3   3.9 3   .
3069             4   6.6 4   7.5
3070             3   .   3   3.7
3071             3   .   3   .
3072             .   .   8   6.1
3073             .   .   .   8.9
3074             .   .   .   3.5
3075             .   .   6   .
3076             .   .   6   .
3077             .   .   .   8
3078             .   .   7   .
3079             .   .   .   9.1
3080             .   .   9   .
3081             .   .   9   .
3082             .   .   .   9.6
3083             .   .   100 .
3084             .   .   .   8.18
3085             .   .   200 .",
3086        );
3087
3088        test_fixture.do_test(expected_chunk, false, false).await;
3089    }
3090
3091    /// Sql:
3092    /// ```sql
3093    /// select * from t1 left outer join t2 on t1.v1 = t2.v1 and t1.v2 < t2.v2;
3094    /// ```
3095    #[tokio::test]
3096    async fn test_right_outer_join_with_non_equi_condition() {
3097        let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
3098
3099        let expected_chunk = DataChunk::from_pretty(
3100            "i   f   i   F
3101             4   6.6 4   7.5
3102             .   .   8   6.1
3103             .   .   2   .
3104             .   .   .   8.9
3105             .   .   3   .
3106             .   .   .   3.5
3107             .   .   6   .
3108             .   .   6   .
3109             .   .   .   8
3110             .   .   7   .
3111             .   .   .   9.1
3112             .   .   9   .
3113             .   .   3   3.7
3114             .   .   9   .
3115             .   .   .   9.6
3116             .   .   100 .
3117             .   .   .   8.18
3118             .   .   200 .",
3119        );
3120
3121        test_fixture.do_test(expected_chunk, true, false).await;
3122    }
3123
3124    /// ```sql
3125    /// select * from t1 full outer join t2 on t1.v1 = t2.v1;
3126    /// ```
3127    #[tokio::test]
3128    async fn test_full_outer_join() {
3129        let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
3130
3131        let expected_chunk = DataChunk::from_pretty(
3132            "i   f   i   F
3133             1   6.1 .   .
3134             2   .   2   .
3135             .   8.4 .   .
3136             3   3.9 3   3.7
3137             3   3.9 3   .
3138             .   .   .   .
3139             4   6.6 4   7.5
3140             3   .   3   3.7
3141             3   .   3   .
3142             .   0.7 .   .
3143             5   .   .   .
3144             .   5.5 .   .
3145             .   .   8   6.1
3146             .   .   .   8.9
3147             .   .   .   3.5
3148             .   .   6   .
3149             .   .   6   .
3150             .   .   .   8
3151             .   .   7   .
3152             .   .   .   9.1
3153             .   .   9   .
3154             .   .   9   .
3155             .   .   .   9.6
3156             .   .   100 .
3157             .   .   .   8.18
3158             .   .   200 .",
3159        );
3160
3161        test_fixture.do_test(expected_chunk, false, false).await;
3162    }
3163
3164    /// ```sql
3165    /// select * from t1 full outer join t2 on t1.v1 = t2.v1 and t1.v2 < t2.v2;
3166    /// ```
3167    #[tokio::test]
3168    async fn test_full_outer_join_with_non_equi_condition() {
3169        let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
3170
3171        let expected_chunk = DataChunk::from_pretty(
3172            "i   f   i   F
3173             2   .   .   .
3174             3   3.9 .   .
3175             4   6.6 4   7.5
3176             3   .   .   .
3177             1   6.1 .   .
3178             .   8.4 .   .
3179             .   .   .   .
3180             .   0.7 .   .
3181             5   .   .   .
3182             .   5.5 .   .
3183             .   .   8   6.1
3184             .   .   2   .
3185             .   .   .   8.9
3186             .   .   3   .
3187             .   .   .   3.5
3188             .   .   6   .
3189             .   .   6   .
3190             .   .   .   8
3191             .   .   7   .
3192             .   .   .   9.1
3193             .   .   9   .
3194             .   .   3   3.7
3195             .   .   9   .
3196             .   .   .   9.6
3197             .   .   100 .
3198             .   .   .   8.18
3199             .   .   200 .",
3200        );
3201
3202        test_fixture.do_test(expected_chunk, true, false).await;
3203    }
3204
3205    #[tokio::test]
3206    async fn test_left_anti_join() {
3207        let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
3208
3209        let expected_chunk = DataChunk::from_pretty(
3210            "i   f
3211             1   6.1
3212             .   8.4
3213             .   .
3214             .   0.7
3215             5   .
3216             .   5.5",
3217        );
3218
3219        test_fixture.do_test(expected_chunk, false, false).await;
3220    }
3221
3222    #[tokio::test]
3223    async fn test_left_anti_join_with_non_equi_condition() {
3224        let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
3225
3226        let expected_chunk = DataChunk::from_pretty(
3227            "i   f
3228             2   .
3229             3   3.9
3230             3   .
3231             1   6.1
3232             .   8.4
3233             .   .
3234             .   0.7
3235             5   .
3236             .   5.5",
3237        );
3238
3239        test_fixture.do_test(expected_chunk, true, false).await;
3240    }
3241
3242    #[tokio::test]
3243    async fn test_left_semi_join() {
3244        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3245
3246        let expected_chunk = DataChunk::from_pretty(
3247            "i   f
3248             2   .
3249             3   3.9
3250             4   6.6
3251             3   .",
3252        );
3253
3254        test_fixture.do_test(expected_chunk, false, false).await;
3255    }
3256
3257    #[tokio::test]
3258    async fn test_left_semi_join_with_non_equi_condition() {
3259        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3260
3261        let expected_chunk = DataChunk::from_pretty(
3262            "i   f
3263             4   6.6",
3264        );
3265
3266        test_fixture.do_test(expected_chunk, true, false).await;
3267    }
3268
3269    /// Tests handling of edge case:
3270    /// Match is found for a `probe_row`,
3271    /// but there are still candidate rows in the iterator for that `probe_row`.
3272    /// These should not be buffered or we will have duplicate rows in output.
3273    #[tokio::test]
3274    async fn test_left_semi_join_with_non_equi_condition_duplicates() {
3275        let schema = Schema {
3276            fields: vec![
3277                Field::unnamed(DataType::Int32),
3278                Field::unnamed(DataType::Float32),
3279            ],
3280        };
3281
3282        // Build side
3283        let mut left_executor = MockExecutor::new(schema);
3284        left_executor.add(DataChunk::from_pretty(
3285            "i f
3286                 1 1.0
3287                 1 1.0
3288                 1 1.0
3289                 1 1.0
3290                 2 1.0",
3291        ));
3292
3293        // Probe side
3294        let schema = Schema {
3295            fields: vec![
3296                Field::unnamed(DataType::Int32),
3297                Field::unnamed(DataType::Float64),
3298            ],
3299        };
3300        let mut right_executor = MockExecutor::new(schema);
3301        right_executor.add(DataChunk::from_pretty(
3302            "i F
3303                 1 2.0
3304                 1 2.0
3305                 1 2.0
3306                 1 2.0
3307                 2 2.0",
3308        ));
3309
3310        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3311        let expected_chunk = DataChunk::from_pretty(
3312            "i f
3313            1 1.0
3314            1 1.0
3315            1 1.0
3316            1 1.0
3317            2 1.0",
3318        );
3319
3320        test_fixture
3321            .do_test_with_chunk_size_and_executors(
3322                expected_chunk,
3323                true,
3324                false,
3325                3,
3326                Box::new(left_executor),
3327                Box::new(right_executor),
3328                false,
3329            )
3330            .await;
3331    }
3332
3333    #[tokio::test]
3334    async fn test_right_anti_join() {
3335        let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
3336
3337        let expected_chunk = DataChunk::from_pretty(
3338            "i   F
3339             8   6.1
3340             .   8.9
3341             .   3.5
3342             6   .
3343             6   .
3344             .   8.0
3345             7   .
3346             .   9.1
3347             9   .
3348             9   .
3349             .   9.6
3350             100 .
3351             .   8.18
3352             200 .",
3353        );
3354
3355        test_fixture.do_test(expected_chunk, false, false).await;
3356    }
3357
3358    #[tokio::test]
3359    async fn test_right_anti_join_with_non_equi_condition() {
3360        let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
3361
3362        let expected_chunk = DataChunk::from_pretty(
3363            "i   F
3364             8   6.1
3365             2   .
3366             .   8.9
3367             3   .
3368             .   3.5
3369             6   .
3370             6   .
3371             .   8
3372             7   .
3373             .   9.1
3374             9   .
3375             3   3.7
3376             9   .
3377             .   9.6
3378             100 .
3379             .   8.18
3380             200 .",
3381        );
3382
3383        test_fixture.do_test(expected_chunk, true, false).await;
3384    }
3385
3386    #[tokio::test]
3387    async fn test_right_semi_join() {
3388        let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
3389
3390        let expected_chunk = DataChunk::from_pretty(
3391            "i   F
3392             2   .
3393             3   .
3394             4   7.5
3395             3   3.7",
3396        );
3397
3398        test_fixture.do_test(expected_chunk, false, false).await;
3399    }
3400
3401    #[tokio::test]
3402    async fn test_right_semi_join_with_non_equi_condition() {
3403        let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
3404
3405        let expected_chunk = DataChunk::from_pretty(
3406            "i   F
3407             4   7.5",
3408        );
3409
3410        test_fixture.do_test(expected_chunk, true, false).await;
3411    }
3412
3413    #[tokio::test]
3414    async fn test_process_left_outer_join_non_equi_condition() {
3415        let chunk = DataChunk::from_pretty(
3416            "i   f   i   F
3417             1   3.5 1   5.5
3418             1   3.5 1   2.5
3419             2   4.0 .   .
3420             3   5.0 3   4.0
3421             3   5.0 3   3.0
3422             3   5.0 3   4.0
3423             3   5.0 3   3.0
3424             4   1.0 4   0
3425             4   1.0 4   9.0",
3426        );
3427        let expect = DataChunk::from_pretty(
3428            "i   f   i   F
3429             1   3.5 1   5.5
3430             2   4.0 .   .
3431             3   5.0 .   .
3432             3   5.0 .   .
3433             4   1.0 4   9.0",
3434        );
3435        let cond = TestFixture::create_cond();
3436        let mut state = LeftNonEquiJoinState {
3437            probe_column_count: 2,
3438            first_output_row_id: vec![0, 2, 3, 5, 7],
3439            has_more_output_rows: true,
3440            found_matched: false,
3441        };
3442        assert!(compare_data_chunk_with_rowsort(
3443            &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3444                chunk,
3445                cond.as_ref(),
3446                &mut state
3447            )
3448            .await
3449            .unwrap()
3450            .compact_vis(),
3451            &expect
3452        ));
3453        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3454        assert!(state.found_matched);
3455
3456        let chunk = DataChunk::from_pretty(
3457            "i   f   i   F
3458             4   1.0 4   0.6
3459             4   1.0 4   2.0
3460             5   4.0 5   .
3461             6   7.0 6   .
3462             6   7.0 6   5.0",
3463        );
3464        let expect = DataChunk::from_pretty(
3465            "i   f   i   F
3466             4   1.0 4   2.0
3467             5   4.0 .   .
3468             6   7.0 .   .",
3469        );
3470        state.first_output_row_id = vec![2, 3];
3471        state.has_more_output_rows = false;
3472        assert!(compare_data_chunk_with_rowsort(
3473            &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3474                chunk,
3475                cond.as_ref(),
3476                &mut state
3477            )
3478            .await
3479            .unwrap()
3480            .compact_vis(),
3481            &expect
3482        ));
3483        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3484        assert!(!state.found_matched);
3485
3486        let chunk = DataChunk::from_pretty(
3487            "i   f   i   F
3488             4   1.0 4   0.6
3489             4   1.0 4   1.0
3490             5   4.0 5   .
3491             6   7.0 6   .
3492             6   7.0 6   8.0",
3493        );
3494        let expect = DataChunk::from_pretty(
3495            "i   f   i   F
3496             4   1.0 .   .
3497             5   4.0 .   .
3498             6   7.0 6   8.0",
3499        );
3500        state.first_output_row_id = vec![2, 3];
3501        state.has_more_output_rows = false;
3502        assert!(compare_data_chunk_with_rowsort(
3503            &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3504                chunk,
3505                cond.as_ref(),
3506                &mut state
3507            )
3508            .await
3509            .unwrap()
3510            .compact_vis(),
3511            &expect
3512        ));
3513        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3514        assert!(!state.found_matched);
3515    }
3516
3517    #[tokio::test]
3518    async fn test_process_left_semi_join_non_equi_condition() {
3519        let chunk = DataChunk::from_pretty(
3520            "i   f   i   F
3521             1   3.5 1   5.5
3522             1   3.5 1   2.5
3523             2   4.0 .   .
3524             3   5.0 3   4.0
3525             3   5.0 3   3.0
3526             3   5.0 3   4.0
3527             3   5.0 3   3.0
3528             4   1.0 4   0
3529             4   1.0 4   0.5",
3530        );
3531        let expect = DataChunk::from_pretty(
3532            "i   f   i   F
3533             1   3.5 1   5.5",
3534        );
3535        let cond = TestFixture::create_cond();
3536        let mut state = LeftNonEquiJoinState {
3537            probe_column_count: 2,
3538            first_output_row_id: vec![0, 2, 3, 5, 7],
3539            found_matched: false,
3540            ..Default::default()
3541        };
3542        assert!(compare_data_chunk_with_rowsort(
3543            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3544                chunk,
3545                cond.as_ref(),
3546                &mut state
3547            )
3548            .await
3549            .unwrap()
3550            .compact_vis(),
3551            &expect
3552        ));
3553        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3554        assert!(!state.found_matched);
3555
3556        let chunk = DataChunk::from_pretty(
3557            "i   f   i   F
3558             4   1.0 4   0.6
3559             4   1.0 4   2.0
3560             5   4.0 5   .
3561             6   7.0 6   .
3562             6   7.0 6   5.0",
3563        );
3564        let expect = DataChunk::from_pretty(
3565            "i   f   i   F
3566             4   1.0 4   2.0",
3567        );
3568        state.first_output_row_id = vec![2, 3];
3569        assert!(compare_data_chunk_with_rowsort(
3570            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3571                chunk,
3572                cond.as_ref(),
3573                &mut state
3574            )
3575            .await
3576            .unwrap()
3577            .compact_vis(),
3578            &expect
3579        ));
3580        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3581        assert!(!state.found_matched);
3582
3583        let chunk = DataChunk::from_pretty(
3584            "i   f   i   F
3585             4   1.0 4   0.6
3586             4   1.0 4   1.0
3587             5   4.0 5   .
3588             6   7.0 6   .
3589             6   7.0 6   8.0",
3590        );
3591        let expect = DataChunk::from_pretty(
3592            "i   f   i   F
3593             6   7.0 6   8.0",
3594        );
3595        state.first_output_row_id = vec![2, 3];
3596        assert!(compare_data_chunk_with_rowsort(
3597            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3598                chunk,
3599                cond.as_ref(),
3600                &mut state
3601            )
3602            .await
3603            .unwrap()
3604            .compact_vis(),
3605            &expect
3606        ));
3607        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3608    }
3609
3610    #[tokio::test]
3611    async fn test_process_left_anti_join_non_equi_condition() {
3612        let chunk = DataChunk::from_pretty(
3613            "i   f   i   F
3614             1   3.5 1   5.5
3615             1   3.5 1   2.5
3616             2   4.0 .   .
3617             3   5.0 3   4.0
3618             3   5.0 3   3.0
3619             3   5.0 3   4.0
3620             3   5.0 3   3.0
3621             4   1.0 4   0
3622             4   1.0 4   0.5",
3623        );
3624        let expect = DataChunk::from_pretty(
3625            "i   f   i   F
3626             2   4.0 .   .
3627             3   5.0 3   4.0
3628             3   5.0 3   4.0",
3629        );
3630        let cond = TestFixture::create_cond();
3631        let mut state = LeftNonEquiJoinState {
3632            probe_column_count: 2,
3633            first_output_row_id: vec![0, 2, 3, 5, 7],
3634            has_more_output_rows: true,
3635            found_matched: false,
3636        };
3637        assert!(compare_data_chunk_with_rowsort(
3638            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3639                chunk,
3640                cond.as_ref(),
3641                &mut state
3642            )
3643            .await
3644            .unwrap()
3645            .compact_vis(),
3646            &expect
3647        ));
3648        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3649        assert!(!state.found_matched);
3650
3651        let chunk = DataChunk::from_pretty(
3652            "i   f   i   F
3653             4   1.0 4   0.6
3654             4   1.0 4   2.0
3655             5   4.0 5   .
3656             6   7.0 6   .
3657             6   7.0 6   5.0",
3658        );
3659        let expect = DataChunk::from_pretty(
3660            "i   f   i   F
3661             5   4.0 5   .
3662             6   7.0 6   .",
3663        );
3664        state.first_output_row_id = vec![2, 3];
3665        state.has_more_output_rows = false;
3666        assert!(compare_data_chunk_with_rowsort(
3667            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3668                chunk,
3669                cond.as_ref(),
3670                &mut state
3671            )
3672            .await
3673            .unwrap()
3674            .compact_vis(),
3675            &expect
3676        ));
3677        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3678        assert!(!state.found_matched);
3679
3680        let chunk = DataChunk::from_pretty(
3681            "i   f   i   F
3682             4   1.0 4   0.6
3683             4   1.0 4   1.0
3684             5   4.0 5   .
3685             6   7.0 6   .
3686             6   7.0 6   8.0",
3687        );
3688        let expect = DataChunk::from_pretty(
3689            "i   f   i   F
3690             4   1.0 4   0.6
3691             5   4.0 5   .",
3692        );
3693        state.first_output_row_id = vec![2, 3];
3694        state.has_more_output_rows = false;
3695        assert!(compare_data_chunk_with_rowsort(
3696            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3697                chunk,
3698                cond.as_ref(),
3699                &mut state
3700            )
3701            .await
3702            .unwrap()
3703            .compact_vis(),
3704            &expect
3705        ));
3706        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3707    }
3708
3709    #[tokio::test]
3710    async fn test_process_right_outer_join_non_equi_condition() {
3711        let chunk = DataChunk::from_pretty(
3712            "i   f   i   F
3713             1   3.5 1   5.5
3714             1   3.5 1   2.5
3715             3   5.0 3   4.0
3716             3   5.0 3   3.0
3717             3   5.0 3   4.0
3718             3   5.0 3   3.0
3719             4   1.0 4   0
3720             4   1.0 4   0.5",
3721        );
3722        let expect = DataChunk::from_pretty(
3723            "i   f   i   F
3724             1   3.5 1   5.5",
3725        );
3726        let cond = TestFixture::create_cond();
3727        // For simplicity, all rows are in one chunk.
3728        // Build side table
3729        // 0  - (1, 5.5)
3730        // 1  - (1, 2.5)
3731        // 2  - ?
3732        // 3  - (3, 4.0)
3733        // 4  - (3, 3.0)
3734        // 5  - (4, 0)
3735        // 6  - ?
3736        // 7  - (4, 0.5)
3737        // 8  - (4, 0.6)
3738        // 9  - (4, 2.0)
3739        // 10 - (5, .)
3740        // 11 - ?
3741        // 12 - (6, .)
3742        // 13 - (6, 5.0)
3743        // Rows with '?' are never matched here.
3744        let build_row_matched = ChunkedData::with_chunk_sizes([14].into_iter()).unwrap();
3745        let mut state = RightNonEquiJoinState {
3746            build_row_ids: vec![
3747                RowId::new(0, 0),
3748                RowId::new(0, 1),
3749                RowId::new(0, 3),
3750                RowId::new(0, 4),
3751                RowId::new(0, 3),
3752                RowId::new(0, 4),
3753                RowId::new(0, 5),
3754                RowId::new(0, 7),
3755            ],
3756            build_row_matched,
3757        };
3758        assert!(compare_data_chunk_with_rowsort(
3759            &HashJoinExecutor::<Key32>::process_right_outer_join_non_equi_condition(
3760                chunk,
3761                cond.as_ref(),
3762                &mut state
3763            )
3764            .await
3765            .unwrap()
3766            .compact_vis(),
3767            &expect
3768        ));
3769        assert_eq!(state.build_row_ids, Vec::new());
3770        assert_eq!(
3771            state.build_row_matched,
3772            ChunkedData::try_from(vec![{
3773                let mut v = vec![false; 14];
3774                v[0] = true;
3775                v
3776            }])
3777            .unwrap()
3778        );
3779
3780        let chunk = DataChunk::from_pretty(
3781            "i   f   i   F
3782             4   1.0 4   0.6
3783             4   1.0 4   2.0
3784             5   4.0 5   .
3785             6   7.0 6   .
3786             6   7.0 6   5.0",
3787        );
3788        let expect = DataChunk::from_pretty(
3789            "i   f   i   F
3790             4   1.0 4   2.0",
3791        );
3792        state.build_row_ids = vec![
3793            RowId::new(0, 8),
3794            RowId::new(0, 9),
3795            RowId::new(0, 10),
3796            RowId::new(0, 12),
3797            RowId::new(0, 13),
3798        ];
3799        assert!(compare_data_chunk_with_rowsort(
3800            &HashJoinExecutor::<Key32>::process_right_outer_join_non_equi_condition(
3801                chunk,
3802                cond.as_ref(),
3803                &mut state
3804            )
3805            .await
3806            .unwrap()
3807            .compact_vis(),
3808            &expect
3809        ));
3810        assert_eq!(state.build_row_ids, Vec::new());
3811        assert_eq!(
3812            state.build_row_matched,
3813            ChunkedData::try_from(vec![{
3814                let mut v = vec![false; 14];
3815                v[0] = true;
3816                v[9] = true;
3817                v
3818            }])
3819            .unwrap()
3820        );
3821    }
3822
3823    #[tokio::test]
3824    async fn test_process_right_semi_anti_join_non_equi_condition() {
3825        let chunk = DataChunk::from_pretty(
3826            "i   f   i   F
3827             1   3.5 1   5.5
3828             1   3.5 1   2.5
3829             3   5.0 3   4.0
3830             3   5.0 3   3.0
3831             3   5.0 3   4.0
3832             3   5.0 3   3.0
3833             4   1.0 4   0
3834             4   1.0 4   0.5",
3835        );
3836        let cond = TestFixture::create_cond();
3837        let build_row_matched = ChunkedData::with_chunk_sizes([14].into_iter()).unwrap();
3838        let mut state = RightNonEquiJoinState {
3839            build_row_ids: vec![
3840                RowId::new(0, 0),
3841                RowId::new(0, 1),
3842                RowId::new(0, 3),
3843                RowId::new(0, 4),
3844                RowId::new(0, 3),
3845                RowId::new(0, 4),
3846                RowId::new(0, 5),
3847                RowId::new(0, 7),
3848            ],
3849            build_row_matched,
3850        };
3851
3852        assert!(
3853            HashJoinExecutor::<Key32>::process_right_semi_anti_join_non_equi_condition(
3854                chunk,
3855                cond.as_ref(),
3856                &mut state
3857            )
3858            .await
3859            .is_ok()
3860        );
3861        assert_eq!(state.build_row_ids, Vec::new());
3862        assert_eq!(
3863            state.build_row_matched,
3864            ChunkedData::try_from(vec![{
3865                let mut v = vec![false; 14];
3866                v[0] = true;
3867                v
3868            }])
3869            .unwrap()
3870        );
3871
3872        let chunk = DataChunk::from_pretty(
3873            "i   f   i   F
3874             4   1.0 4   0.6
3875             4   1.0 4   2.0
3876             5   4.0 5   .
3877             6   7.0 6   .
3878             6   7.0 6   5.0",
3879        );
3880        state.build_row_ids = vec![
3881            RowId::new(0, 8),
3882            RowId::new(0, 9),
3883            RowId::new(0, 10),
3884            RowId::new(0, 12),
3885            RowId::new(0, 13),
3886        ];
3887        assert!(
3888            HashJoinExecutor::<Key32>::process_right_semi_anti_join_non_equi_condition(
3889                chunk,
3890                cond.as_ref(),
3891                &mut state
3892            )
3893            .await
3894            .is_ok()
3895        );
3896        assert_eq!(state.build_row_ids, Vec::new());
3897        assert_eq!(
3898            state.build_row_matched,
3899            ChunkedData::try_from(vec![{
3900                let mut v = vec![false; 14];
3901                v[0] = true;
3902                v[9] = true;
3903                v
3904            }])
3905            .unwrap()
3906        );
3907    }
3908
3909    #[tokio::test]
3910    async fn test_process_full_outer_join_non_equi_condition() {
3911        let chunk = DataChunk::from_pretty(
3912            "i   f   i   F
3913             1   3.5 1   5.5
3914             1   3.5 1   2.5
3915             3   5.0 3   4.0
3916             3   5.0 3   3.0
3917             3   5.0 3   4.0
3918             3   5.0 3   3.0
3919             4   1.0 4   0
3920             4   1.0 4   0.5",
3921        );
3922        let expect = DataChunk::from_pretty(
3923            "i   f   i   F
3924             1   3.5 1   5.5
3925             3   5.0 .   .
3926             3   5.0 .   .",
3927        );
3928        let cond = TestFixture::create_cond();
3929        let mut left_state = LeftNonEquiJoinState {
3930            probe_column_count: 2,
3931            first_output_row_id: vec![0, 2, 4, 6],
3932            has_more_output_rows: true,
3933            found_matched: false,
3934        };
3935        let mut right_state = RightNonEquiJoinState {
3936            build_row_ids: vec![
3937                RowId::new(0, 0),
3938                RowId::new(0, 1),
3939                RowId::new(0, 3),
3940                RowId::new(0, 4),
3941                RowId::new(0, 3),
3942                RowId::new(0, 4),
3943                RowId::new(0, 5),
3944                RowId::new(0, 7),
3945            ],
3946            build_row_matched: ChunkedData::with_chunk_sizes([14].into_iter()).unwrap(),
3947        };
3948        assert!(compare_data_chunk_with_rowsort(
3949            &HashJoinExecutor::<Key32>::process_full_outer_join_non_equi_condition(
3950                chunk,
3951                cond.as_ref(),
3952                &mut left_state,
3953                &mut right_state,
3954            )
3955            .await
3956            .unwrap()
3957            .compact_vis(),
3958            &expect
3959        ));
3960        assert_eq!(left_state.first_output_row_id, Vec::<usize>::new());
3961        assert!(!left_state.found_matched);
3962        assert_eq!(right_state.build_row_ids, Vec::new());
3963        assert_eq!(
3964            right_state.build_row_matched,
3965            ChunkedData::try_from(vec![{
3966                let mut v = vec![false; 14];
3967                v[0] = true;
3968                v
3969            }])
3970            .unwrap()
3971        );
3972
3973        let chunk = DataChunk::from_pretty(
3974            "i   f   i   F
3975             4   1.0 4   0.6
3976             4   1.0 4   2.0
3977             5   4.0 5   .
3978             6   7.0 6   .
3979             6   7.0 6   8.0",
3980        );
3981        let expect = DataChunk::from_pretty(
3982            "i   f   i   F
3983             4   1.0 4   2.0
3984             5   4.0 .   .
3985             6   7.0 6   8.0",
3986        );
3987        left_state.first_output_row_id = vec![2, 3];
3988        left_state.has_more_output_rows = false;
3989        right_state.build_row_ids = vec![
3990            RowId::new(0, 8),
3991            RowId::new(0, 9),
3992            RowId::new(0, 10),
3993            RowId::new(0, 12),
3994            RowId::new(0, 13),
3995        ];
3996        assert!(compare_data_chunk_with_rowsort(
3997            &HashJoinExecutor::<Key32>::process_full_outer_join_non_equi_condition(
3998                chunk,
3999                cond.as_ref(),
4000                &mut left_state,
4001                &mut right_state,
4002            )
4003            .await
4004            .unwrap()
4005            .compact_vis(),
4006            &expect
4007        ));
4008        assert_eq!(left_state.first_output_row_id, Vec::<usize>::new());
4009        assert!(left_state.found_matched);
4010        assert_eq!(right_state.build_row_ids, Vec::new());
4011        assert_eq!(
4012            right_state.build_row_matched,
4013            ChunkedData::try_from(vec![{
4014                let mut v = vec![false; 14];
4015                v[0] = true;
4016                v[9] = true;
4017                v[13] = true;
4018                v
4019            }])
4020            .unwrap()
4021        );
4022    }
4023
4024    #[test]
4025    fn test_find_asof_matched_rows_prefers_closest_le_match() {
4026        // Le: left <= right, so probe <= build.
4027        // probe=5, builds=[10, 20]. Both satisfy 5<=10 and 5<=20.
4028        // Closest (minimum build) = 10 at row 0.
4029        let probe_chunk = DataChunk::from_pretty(
4030            "i
4031             5",
4032        );
4033        let build_chunk = DataChunk::from_pretty(
4034            "i
4035             10
4036             20",
4037        );
4038        let build_side = vec![build_chunk];
4039
4040        let mut next_row_id = ChunkedData::with_chunk_sizes([2]).unwrap();
4041        next_row_id[RowId::new(0, 0)] = Some(RowId::new(0, 1));
4042        next_row_id[RowId::new(0, 1)] = None;
4043
4044        let asof_desc = AsOfDesc {
4045            left_idx: 0,
4046            right_idx: 0,
4047            inequality_type: AsOfInequalityType::Le,
4048        };
4049
4050        let matched = HashJoinExecutor::<Key32>::find_asof_matched_rows(
4051            probe_chunk.row_at_unchecked_vis(0),
4052            &build_side,
4053            next_row_id.row_id_iter(Some(RowId::new(0, 0))),
4054            &asof_desc,
4055        );
4056
4057        assert_eq!(matched, Some(RowId::new(0, 0)));
4058    }
4059
4060    #[test]
4061    fn test_find_asof_matched_rows_prefers_closest_ge_match() {
4062        // Ge: left >= right, so probe >= build.
4063        // probe=12, builds=[5, 10]. Both satisfy 12>=5 and 12>=10.
4064        // Closest (maximum build) = 10 at row 1.
4065        let probe_chunk = DataChunk::from_pretty(
4066            "i
4067             12",
4068        );
4069        let build_chunk = DataChunk::from_pretty(
4070            "i
4071             5
4072             10",
4073        );
4074        let build_side = vec![build_chunk];
4075
4076        let mut next_row_id = ChunkedData::with_chunk_sizes([2]).unwrap();
4077        next_row_id[RowId::new(0, 0)] = Some(RowId::new(0, 1));
4078        next_row_id[RowId::new(0, 1)] = None;
4079
4080        let asof_desc = AsOfDesc {
4081            left_idx: 0,
4082            right_idx: 0,
4083            inequality_type: AsOfInequalityType::Ge,
4084        };
4085
4086        let matched = HashJoinExecutor::<Key32>::find_asof_matched_rows(
4087            probe_chunk.row_at_unchecked_vis(0),
4088            &build_side,
4089            next_row_id.row_id_iter(Some(RowId::new(0, 0))),
4090            &asof_desc,
4091        );
4092
4093        assert_eq!(matched, Some(RowId::new(0, 1)));
4094    }
4095
4096    #[tokio::test]
4097    async fn test_batch_hash_join_asof_ge_returns_closest_match() {
4098        let left_schema = Schema {
4099            fields: vec![
4100                Field::unnamed(DataType::Int32),
4101                Field::unnamed(DataType::Int32),
4102            ],
4103        };
4104        let right_schema = Schema {
4105            fields: vec![
4106                Field::unnamed(DataType::Int32),
4107                Field::unnamed(DataType::Int32),
4108            ],
4109        };
4110
4111        let mut left_executor = MockExecutor::new(left_schema);
4112        left_executor.add(DataChunk::from_pretty(
4113            "i i
4114             3 12",
4115        ));
4116
4117        let mut right_executor = MockExecutor::new(right_schema);
4118        right_executor.add(DataChunk::from_pretty(
4119            "i i
4120             3 5
4121             3 10",
4122        ));
4123
4124        let join_executor = Box::new(HashJoinExecutor::<Key32>::new(
4125            JoinType::Inner,
4126            vec![0, 1, 2, 3],
4127            Box::new(left_executor),
4128            Box::new(right_executor),
4129            vec![0],
4130            vec![0],
4131            vec![false],
4132            None,
4133            "HashJoinExecutor".to_owned(),
4134            CHUNK_SIZE,
4135            Some(AsOfDesc {
4136                left_idx: 1,
4137                right_idx: 1,
4138                inequality_type: AsOfInequalityType::Ge,
4139            }),
4140            None,
4141            BatchSpillMetrics::for_test(),
4142            ShutdownToken::empty(),
4143            MemoryContext::new(None, LabelGuardedIntGauge::test_int_gauge::<4>()),
4144        ));
4145
4146        let mut stream = join_executor.execute();
4147        let chunk = stream.next().await.unwrap().unwrap().compact_vis();
4148        let expected = DataChunk::from_pretty(
4149            "i i i i
4150             3 12 3 10",
4151        );
4152
4153        assert!(compare_data_chunk_with_rowsort(&expected, &chunk));
4154        assert!(stream.next().await.is_none());
4155    }
4156
4157    #[tokio::test]
4158    async fn test_shutdown() {
4159        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
4160        test_fixture.do_test_shutdown(false).await;
4161        test_fixture.do_test_shutdown(true).await;
4162
4163        let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
4164        test_fixture.do_test_shutdown(false).await;
4165        test_fixture.do_test_shutdown(true).await;
4166
4167        let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
4168        test_fixture.do_test_shutdown(false).await;
4169        test_fixture.do_test_shutdown(true).await;
4170
4171        let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
4172        test_fixture.do_test_shutdown(false).await;
4173        test_fixture.do_test_shutdown(true).await;
4174
4175        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
4176        test_fixture.do_test_shutdown(false).await;
4177        test_fixture.do_test_shutdown(true).await;
4178
4179        let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
4180        test_fixture.do_test_shutdown(false).await;
4181        test_fixture.do_test_shutdown(true).await;
4182
4183        let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
4184        test_fixture.do_test_shutdown(false).await;
4185        test_fixture.do_test_shutdown(true).await;
4186
4187        let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
4188        test_fixture.do_test_shutdown(false).await;
4189        test_fixture.do_test_shutdown(true).await;
4190    }
4191}