risingwave_batch_executors/executor/join/
hash_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::iter;
17use std::iter::empty;
18use std::marker::PhantomData;
19use std::sync::Arc;
20
21use bytes::Bytes;
22use futures_async_stream::try_stream;
23use itertools::Itertools;
24use risingwave_common::array::{Array, DataChunk, RowRef};
25use risingwave_common::bitmap::{Bitmap, BitmapBuilder, FilterByBitmap};
26use risingwave_common::catalog::Schema;
27use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher};
28use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc};
29use risingwave_common::row::{Row, RowExt, repeat_n};
30use risingwave_common::types::{DataType, Datum, DefaultOrd};
31use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
32use risingwave_common::util::iter_util::ZipEqFast;
33use risingwave_common_estimate_size::EstimateSize;
34use risingwave_expr::expr::{BoxedExpression, Expression, build_from_prost};
35use risingwave_pb::Message;
36use risingwave_pb::batch_plan::plan_node::NodeBody;
37use risingwave_pb::data::DataChunk as PbDataChunk;
38
39use super::{AsOfDesc, AsOfInequalityType, ChunkedData, JoinType, RowId};
40use crate::error::{BatchError, Result};
41use crate::executor::{
42    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
43    WrapStreamExecutor,
44};
45use crate::monitor::BatchSpillMetrics;
46use crate::risingwave_common::hash::NullBitmap;
47use crate::spill::spill_op::SpillBackend::Disk;
48use crate::spill::spill_op::{
49    DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY, SpillBackend, SpillBuildHasher, SpillOp,
50};
51use crate::task::ShutdownToken;
52
53/// Hash Join Executor
54///
55/// High-level idea:
56/// 1. Iterate over the build side (i.e. right table) and build a hash map.
57/// 2. Iterate over the probe side (i.e. left table) and compute the hash value of each row.
58///    Then find the matched build side row for each probe side row in the hash map.
59/// 3. Concatenate the matched pair of probe side row and build side row into a single row and push
60///    it into the data chunk builder.
61/// 4. Yield chunks from the builder.
62pub struct HashJoinExecutor<K> {
63    /// Join type e.g. inner, left outer, ...
64    join_type: JoinType,
65    /// Output schema without applying `output_indices`
66    #[expect(dead_code)]
67    original_schema: Schema,
68    /// Output schema after applying `output_indices`
69    schema: Schema,
70    /// `output_indices` are the indices of the columns that we needed.
71    output_indices: Vec<usize>,
72    /// Left child executor
73    probe_side_source: BoxedExecutor,
74    /// Right child executor
75    build_side_source: BoxedExecutor,
76    /// Column indices of left keys in equi join
77    probe_key_idxs: Vec<usize>,
78    /// Column indices of right keys in equi join
79    build_key_idxs: Vec<usize>,
80    /// Non-equi join condition (optional)
81    cond: Option<Arc<BoxedExpression>>,
82    /// Whether or not to enable 'IS NOT DISTINCT FROM' semantics for a specific probe/build key
83    /// column
84    null_matched: Vec<bool>,
85    identity: String,
86    chunk_size: usize,
87    /// Whether the join is an as-of join
88    asof_desc: Option<AsOfDesc>,
89
90    spill_backend: Option<SpillBackend>,
91    spill_metrics: Arc<BatchSpillMetrics>,
92    /// The upper bound of memory usage for this executor.
93    memory_upper_bound: Option<u64>,
94
95    shutdown_rx: ShutdownToken,
96
97    mem_ctx: MemoryContext,
98    _phantom: PhantomData<K>,
99}
100
101impl<K: HashKey> Executor for HashJoinExecutor<K> {
102    fn schema(&self) -> &Schema {
103        &self.schema
104    }
105
106    fn identity(&self) -> &str {
107        &self.identity
108    }
109
110    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
111        self.do_execute()
112    }
113}
114
115/// In `JoinHashMap`, we only save the row id of the first build row that has the hash key.
116/// In fact, in the build side there may be multiple rows with the same hash key. To handle this
117/// case, we use `ChunkedData` to link them together. For example:
118///
119/// | id | key | row |
120/// | --- | --- | --- |
121/// | 0 | 1 | (1, 2, 3) |
122/// | 1 | 4 | (4, 5, 6) |
123/// | 2 | 1 | (1, 3, 7) |
124/// | 3 | 1 | (1, 3, 2) |
125/// | 4 | 3 | (3, 2, 1) |
126///
127/// The corresponding join hash map is:
128///
129/// | key | value |
130/// | --- | --- |
131/// | 1 | 0 |
132/// | 4 | 1 |
133/// | 3 | 4 |
134///
135/// And we save build rows with the same key like this:
136///
137/// | id | value |
138/// | --- | --- |
139/// | 0 | 2 |
140/// | 1 | None |
141/// | 2 | 3 |
142/// | 3 | None |
143/// | 4 | None |
144///
145/// This can be seen as an implicit linked list. For convenience, we use `RowIdIter` to iterate all
146/// build side row ids with the given key.
147pub type JoinHashMap<K> =
148    hashbrown::HashMap<K, RowId, PrecomputedBuildHasher, MonitoredGlobalAlloc>;
149
150struct RowIdIter<'a> {
151    current_row_id: Option<RowId>,
152    next_row_id: &'a ChunkedData<Option<RowId>>,
153}
154
155impl ChunkedData<Option<RowId>> {
156    fn row_id_iter(&self, begin: Option<RowId>) -> RowIdIter<'_> {
157        RowIdIter {
158            current_row_id: begin,
159            next_row_id: self,
160        }
161    }
162}
163
164impl Iterator for RowIdIter<'_> {
165    type Item = RowId;
166
167    fn next(&mut self) -> Option<Self::Item> {
168        self.current_row_id.inspect(|row_id| {
169            self.current_row_id = self.next_row_id[*row_id];
170        })
171    }
172}
173
174pub struct EquiJoinParams<K> {
175    probe_side: BoxedExecutor,
176    probe_data_types: Vec<DataType>,
177    probe_key_idxs: Vec<usize>,
178    build_side: Vec<DataChunk, MonitoredGlobalAlloc>,
179    build_data_types: Vec<DataType>,
180    full_data_types: Vec<DataType>,
181    hash_map: JoinHashMap<K>,
182    next_build_row_with_same_key: ChunkedData<Option<RowId>>,
183    chunk_size: usize,
184    shutdown_rx: ShutdownToken,
185    asof_desc: Option<AsOfDesc>,
186}
187
188impl<K> EquiJoinParams<K> {
189    #[allow(clippy::too_many_arguments)]
190    pub(super) fn new(
191        probe_side: BoxedExecutor,
192        probe_data_types: Vec<DataType>,
193        probe_key_idxs: Vec<usize>,
194        build_side: Vec<DataChunk, MonitoredGlobalAlloc>,
195        build_data_types: Vec<DataType>,
196        full_data_types: Vec<DataType>,
197        hash_map: JoinHashMap<K>,
198        next_build_row_with_same_key: ChunkedData<Option<RowId>>,
199        chunk_size: usize,
200        shutdown_rx: ShutdownToken,
201        asof_desc: Option<AsOfDesc>,
202    ) -> Self {
203        Self {
204            probe_side,
205            probe_data_types,
206            probe_key_idxs,
207            build_side,
208            build_data_types,
209            full_data_types,
210            hash_map,
211            next_build_row_with_same_key,
212            chunk_size,
213            shutdown_rx,
214            asof_desc,
215        }
216    }
217
218    pub(crate) fn is_asof_join(&self) -> bool {
219        self.asof_desc.is_some()
220    }
221}
222
223/// State variables used in left outer/semi/anti join and full outer join.
224#[derive(Default)]
225struct LeftNonEquiJoinState {
226    /// The number of columns in probe side.
227    probe_column_count: usize,
228    /// The offset of the first output row in **current** chunk for each probe side row that has
229    /// been processed.
230    first_output_row_id: Vec<usize>,
231    /// Whether the probe row being processed currently has output rows in **next** output chunk.
232    has_more_output_rows: bool,
233    /// Whether the probe row being processed currently has matched non-NULL build rows in **last**
234    /// output chunk.
235    found_matched: bool,
236}
237
238/// State variables used in right outer/semi/anti join and full outer join.
239#[derive(Default)]
240struct RightNonEquiJoinState {
241    /// Corresponding build row id for each row in **current** output chunk.
242    build_row_ids: Vec<RowId>,
243    /// Whether a build row has been matched.
244    build_row_matched: ChunkedData<bool>,
245}
246
247pub struct JoinSpillManager {
248    op: SpillOp,
249    partition_num: usize,
250    probe_side_writers: Vec<opendal::Writer>,
251    build_side_writers: Vec<opendal::Writer>,
252    probe_side_chunk_builders: Vec<DataChunkBuilder>,
253    build_side_chunk_builders: Vec<DataChunkBuilder>,
254    spill_build_hasher: SpillBuildHasher,
255    probe_side_data_types: Vec<DataType>,
256    build_side_data_types: Vec<DataType>,
257    spill_chunk_size: usize,
258    spill_metrics: Arc<BatchSpillMetrics>,
259}
260
261/// `JoinSpillManager` is used to manage how to write spill data file and read them back.
262/// The spill data first need to be partitioned. Each partition contains 2 files: `join_probe_side_file` and `join_build_side_file`.
263/// The spill file consume a data chunk and serialize the chunk into a protobuf bytes.
264/// Finally, spill file content will look like the below.
265/// The file write pattern is append-only and the read pattern is sequential scan.
266/// This can maximize the disk IO performance.
267///
268/// ```text
269/// [proto_len]
270/// [proto_bytes]
271/// ...
272/// [proto_len]
273/// [proto_bytes]
274/// ```
275impl JoinSpillManager {
276    pub fn new(
277        spill_backend: SpillBackend,
278        join_identity: &String,
279        partition_num: usize,
280        probe_side_data_types: Vec<DataType>,
281        build_side_data_types: Vec<DataType>,
282        spill_chunk_size: usize,
283        spill_metrics: Arc<BatchSpillMetrics>,
284    ) -> Result<Self> {
285        let suffix_uuid = uuid::Uuid::new_v4();
286        let dir = format!("/{}-{}/", join_identity, suffix_uuid);
287        let op = SpillOp::create(dir, spill_backend)?;
288        let probe_side_writers = Vec::with_capacity(partition_num);
289        let build_side_writers = Vec::with_capacity(partition_num);
290        let probe_side_chunk_builders = Vec::with_capacity(partition_num);
291        let build_side_chunk_builders = Vec::with_capacity(partition_num);
292        let spill_build_hasher = SpillBuildHasher(suffix_uuid.as_u64_pair().1);
293        Ok(Self {
294            op,
295            partition_num,
296            probe_side_writers,
297            build_side_writers,
298            probe_side_chunk_builders,
299            build_side_chunk_builders,
300            spill_build_hasher,
301            probe_side_data_types,
302            build_side_data_types,
303            spill_chunk_size,
304            spill_metrics,
305        })
306    }
307
308    pub async fn init_writers(&mut self) -> Result<()> {
309        for i in 0..self.partition_num {
310            let join_probe_side_partition_file_name = format!("join-probe-side-p{}", i);
311            let w = self
312                .op
313                .writer_with(&join_probe_side_partition_file_name)
314                .await?;
315            self.probe_side_writers.push(w);
316
317            let join_build_side_partition_file_name = format!("join-build-side-p{}", i);
318            let w = self
319                .op
320                .writer_with(&join_build_side_partition_file_name)
321                .await?;
322            self.build_side_writers.push(w);
323            self.probe_side_chunk_builders.push(DataChunkBuilder::new(
324                self.probe_side_data_types.clone(),
325                self.spill_chunk_size,
326            ));
327            self.build_side_chunk_builders.push(DataChunkBuilder::new(
328                self.build_side_data_types.clone(),
329                self.spill_chunk_size,
330            ));
331        }
332        Ok(())
333    }
334
335    pub async fn write_probe_side_chunk(
336        &mut self,
337        chunk: DataChunk,
338        hash_codes: Vec<u64>,
339    ) -> Result<()> {
340        let (columns, vis) = chunk.into_parts_v2();
341        for partition in 0..self.partition_num {
342            let new_vis = vis.clone()
343                & Bitmap::from_iter(
344                    hash_codes
345                        .iter()
346                        .map(|hash_code| (*hash_code as usize % self.partition_num) == partition),
347                );
348            let new_chunk = DataChunk::from_parts(columns.clone(), new_vis);
349            for output_chunk in self.probe_side_chunk_builders[partition].append_chunk(new_chunk) {
350                let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
351                let buf = Message::encode_to_vec(&chunk_pb);
352                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
353                self.spill_metrics
354                    .batch_spill_write_bytes
355                    .inc_by((buf.len() + len_bytes.len()) as u64);
356                self.probe_side_writers[partition].write(len_bytes).await?;
357                self.probe_side_writers[partition].write(buf).await?;
358            }
359        }
360        Ok(())
361    }
362
363    pub async fn write_build_side_chunk(
364        &mut self,
365        chunk: DataChunk,
366        hash_codes: Vec<u64>,
367    ) -> Result<()> {
368        let (columns, vis) = chunk.into_parts_v2();
369        for partition in 0..self.partition_num {
370            let new_vis = vis.clone()
371                & Bitmap::from_iter(
372                    hash_codes
373                        .iter()
374                        .map(|hash_code| (*hash_code as usize % self.partition_num) == partition),
375                );
376            let new_chunk = DataChunk::from_parts(columns.clone(), new_vis);
377            for output_chunk in self.build_side_chunk_builders[partition].append_chunk(new_chunk) {
378                let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
379                let buf = Message::encode_to_vec(&chunk_pb);
380                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
381                self.spill_metrics
382                    .batch_spill_write_bytes
383                    .inc_by((buf.len() + len_bytes.len()) as u64);
384                self.build_side_writers[partition].write(len_bytes).await?;
385                self.build_side_writers[partition].write(buf).await?;
386            }
387        }
388        Ok(())
389    }
390
391    pub async fn close_writers(&mut self) -> Result<()> {
392        for partition in 0..self.partition_num {
393            if let Some(output_chunk) = self.probe_side_chunk_builders[partition].consume_all() {
394                let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
395                let buf = Message::encode_to_vec(&chunk_pb);
396                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
397                self.spill_metrics
398                    .batch_spill_write_bytes
399                    .inc_by((buf.len() + len_bytes.len()) as u64);
400                self.probe_side_writers[partition].write(len_bytes).await?;
401                self.probe_side_writers[partition].write(buf).await?;
402            }
403
404            if let Some(output_chunk) = self.build_side_chunk_builders[partition].consume_all() {
405                let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
406                let buf = Message::encode_to_vec(&chunk_pb);
407                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
408                self.spill_metrics
409                    .batch_spill_write_bytes
410                    .inc_by((buf.len() + len_bytes.len()) as u64);
411                self.build_side_writers[partition].write(len_bytes).await?;
412                self.build_side_writers[partition].write(buf).await?;
413            }
414        }
415
416        for mut w in self.probe_side_writers.drain(..) {
417            w.close().await?;
418        }
419        for mut w in self.build_side_writers.drain(..) {
420            w.close().await?;
421        }
422        Ok(())
423    }
424
425    async fn read_probe_side_partition(
426        &mut self,
427        partition: usize,
428    ) -> Result<BoxedDataChunkStream> {
429        let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
430        let r = self
431            .op
432            .reader_with(&join_probe_side_partition_file_name)
433            .await?;
434        Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
435    }
436
437    async fn read_build_side_partition(
438        &mut self,
439        partition: usize,
440    ) -> Result<BoxedDataChunkStream> {
441        let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
442        let r = self
443            .op
444            .reader_with(&join_build_side_partition_file_name)
445            .await?;
446        Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
447    }
448
449    pub async fn estimate_partition_size(&self, partition: usize) -> Result<u64> {
450        let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
451        let probe_size = self
452            .op
453            .stat(&join_probe_side_partition_file_name)
454            .await?
455            .content_length();
456        let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
457        let build_size = self
458            .op
459            .stat(&join_build_side_partition_file_name)
460            .await?
461            .content_length();
462        Ok(probe_size + build_size)
463    }
464
465    async fn clear_partition(&mut self, partition: usize) -> Result<()> {
466        let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
467        self.op.delete(&join_probe_side_partition_file_name).await?;
468        let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
469        self.op.delete(&join_build_side_partition_file_name).await?;
470        Ok(())
471    }
472}
473
474impl<K: HashKey> HashJoinExecutor<K> {
475    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
476    async fn do_execute(self: Box<Self>) {
477        let mut need_to_spill = false;
478        // If the memory upper bound is less than 1MB, we don't need to check memory usage.
479        let check_memory = match self.memory_upper_bound {
480            Some(upper_bound) => upper_bound > SPILL_AT_LEAST_MEMORY,
481            None => true,
482        };
483
484        let probe_schema = self.probe_side_source.schema().clone();
485        let build_schema = self.build_side_source.schema().clone();
486        let probe_data_types = self.probe_side_source.schema().data_types();
487        let build_data_types = self.build_side_source.schema().data_types();
488        let full_data_types = [probe_data_types.clone(), build_data_types.clone()].concat();
489
490        let mut build_side = Vec::new_in(self.mem_ctx.global_allocator());
491        let mut build_row_count = 0;
492        let mut build_side_stream = self.build_side_source.execute();
493        #[for_await]
494        for build_chunk in &mut build_side_stream {
495            let build_chunk = build_chunk?;
496            if build_chunk.cardinality() > 0 {
497                build_row_count += build_chunk.cardinality();
498                let chunk_estimated_heap_size = build_chunk.estimated_heap_size();
499                // push build_chunk to build_side before checking memory limit, otherwise we will lose that chunk when spilling.
500                build_side.push(build_chunk);
501                if !self.mem_ctx.add(chunk_estimated_heap_size as i64) && check_memory {
502                    if self.spill_backend.is_some() {
503                        need_to_spill = true;
504                        break;
505                    } else {
506                        Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
507                    }
508                }
509            }
510        }
511        let mut hash_map = JoinHashMap::with_capacity_and_hasher_in(
512            build_row_count,
513            PrecomputedBuildHasher,
514            self.mem_ctx.global_allocator(),
515        );
516        let mut next_build_row_with_same_key =
517            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
518
519        let null_matched = K::Bitmap::from_bool_vec(self.null_matched.clone());
520
521        let mut mem_added_by_hash_table = 0;
522        if !need_to_spill {
523            // Build hash map
524            for (build_chunk_id, build_chunk) in build_side.iter().enumerate() {
525                let build_keys = K::build_many(&self.build_key_idxs, build_chunk);
526
527                for (build_row_id, build_key) in build_keys
528                    .into_iter()
529                    .enumerate()
530                    .filter_by_bitmap(build_chunk.visibility())
531                {
532                    self.shutdown_rx.check()?;
533                    // Only insert key to hash map if it is consistent with the null safe restriction.
534                    if build_key.null_bitmap().is_subset(&null_matched) {
535                        let row_id = RowId::new(build_chunk_id, build_row_id);
536                        let build_key_size = build_key.estimated_heap_size() as i64;
537                        mem_added_by_hash_table += build_key_size;
538                        if !self.mem_ctx.add(build_key_size) && check_memory {
539                            if self.spill_backend.is_some() {
540                                need_to_spill = true;
541                                break;
542                            } else {
543                                Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
544                            }
545                        }
546                        next_build_row_with_same_key[row_id] = hash_map.insert(build_key, row_id);
547                    }
548                }
549            }
550        }
551
552        if need_to_spill {
553            // A spilling version of hash join based on the RFC: Spill Hash Join https://github.com/risingwavelabs/rfcs/pull/91
554            // When HashJoinExecutor told memory is insufficient, JoinSpillManager will start to partition the hash table and spill to disk.
555            // After spilling the hash table, JoinSpillManager will consume all chunks from its build side input executor and probe side input executor.
556            // Finally, we would get e.g. 20 partitions. Each partition should contain a portion of the original build side input and probr side input data.
557            // A sub HashJoinExecutor would be used to consume each partition one by one.
558            // If memory is still not enough in the sub HashJoinExecutor, it will spill its inputs recursively.
559            info!(
560                "batch hash join executor {} starts to spill out",
561                &self.identity
562            );
563            let mut join_spill_manager = JoinSpillManager::new(
564                self.spill_backend.clone().unwrap(),
565                &self.identity,
566                DEFAULT_SPILL_PARTITION_NUM,
567                probe_data_types.clone(),
568                build_data_types.clone(),
569                self.chunk_size,
570                self.spill_metrics.clone(),
571            )?;
572            join_spill_manager.init_writers().await?;
573
574            // Release memory occupied by the hash map
575            self.mem_ctx.add(-mem_added_by_hash_table);
576            drop(hash_map);
577            drop(next_build_row_with_same_key);
578
579            // Spill buffered build side chunks
580            for chunk in build_side {
581                // Release the memory occupied by the buffered chunks
582                self.mem_ctx.add(-(chunk.estimated_heap_size() as i64));
583                let hash_codes = chunk.get_hash_values(
584                    self.build_key_idxs.as_slice(),
585                    join_spill_manager.spill_build_hasher,
586                );
587                join_spill_manager
588                    .write_build_side_chunk(
589                        chunk,
590                        hash_codes
591                            .into_iter()
592                            .map(|hash_code| hash_code.value())
593                            .collect(),
594                    )
595                    .await?;
596            }
597
598            // Spill build side chunks
599            #[for_await]
600            for chunk in build_side_stream {
601                let chunk = chunk?;
602                let hash_codes = chunk.get_hash_values(
603                    self.build_key_idxs.as_slice(),
604                    join_spill_manager.spill_build_hasher,
605                );
606                join_spill_manager
607                    .write_build_side_chunk(
608                        chunk,
609                        hash_codes
610                            .into_iter()
611                            .map(|hash_code| hash_code.value())
612                            .collect(),
613                    )
614                    .await?;
615            }
616
617            // Spill probe side chunks
618            #[for_await]
619            for chunk in self.probe_side_source.execute() {
620                let chunk = chunk?;
621                let hash_codes = chunk.get_hash_values(
622                    self.probe_key_idxs.as_slice(),
623                    join_spill_manager.spill_build_hasher,
624                );
625                join_spill_manager
626                    .write_probe_side_chunk(
627                        chunk,
628                        hash_codes
629                            .into_iter()
630                            .map(|hash_code| hash_code.value())
631                            .collect(),
632                    )
633                    .await?;
634            }
635
636            join_spill_manager.close_writers().await?;
637
638            // Process each partition one by one.
639            for i in 0..join_spill_manager.partition_num {
640                let partition_size = join_spill_manager.estimate_partition_size(i).await?;
641                let probe_side_stream = join_spill_manager.read_probe_side_partition(i).await?;
642                let build_side_stream = join_spill_manager.read_build_side_partition(i).await?;
643
644                let sub_hash_join_executor: HashJoinExecutor<K> = HashJoinExecutor::new_inner(
645                    self.join_type,
646                    self.output_indices.clone(),
647                    Box::new(WrapStreamExecutor::new(
648                        probe_schema.clone(),
649                        probe_side_stream,
650                    )),
651                    Box::new(WrapStreamExecutor::new(
652                        build_schema.clone(),
653                        build_side_stream,
654                    )),
655                    self.probe_key_idxs.clone(),
656                    self.build_key_idxs.clone(),
657                    self.null_matched.clone(),
658                    self.cond.clone(),
659                    format!("{}-sub{}", self.identity.clone(), i),
660                    self.chunk_size,
661                    self.asof_desc.clone(),
662                    self.spill_backend.clone(),
663                    self.spill_metrics.clone(),
664                    Some(partition_size),
665                    self.shutdown_rx.clone(),
666                    self.mem_ctx.clone(),
667                );
668
669                debug!(
670                    "create sub_hash_join {} for hash_join {} to spill",
671                    sub_hash_join_executor.identity, self.identity
672                );
673
674                let sub_hash_join_executor = Box::new(sub_hash_join_executor).execute();
675
676                #[for_await]
677                for chunk in sub_hash_join_executor {
678                    let chunk = chunk?;
679                    yield chunk;
680                }
681
682                // Clear files of the current partition.
683                join_spill_manager.clear_partition(i).await?;
684            }
685        } else {
686            let params = EquiJoinParams::new(
687                self.probe_side_source,
688                probe_data_types,
689                self.probe_key_idxs,
690                build_side,
691                build_data_types,
692                full_data_types,
693                hash_map,
694                next_build_row_with_same_key,
695                self.chunk_size,
696                self.shutdown_rx.clone(),
697                self.asof_desc,
698            );
699
700            if let Some(cond) = self.cond.as_ref()
701                && !params.is_asof_join()
702            {
703                let stream = match self.join_type {
704                    JoinType::Inner => Self::do_inner_join_with_non_equi_condition(params, cond),
705                    JoinType::LeftOuter => {
706                        Self::do_left_outer_join_with_non_equi_condition(params, cond)
707                    }
708                    JoinType::LeftSemi => {
709                        Self::do_left_semi_join_with_non_equi_condition(params, cond)
710                    }
711                    JoinType::LeftAnti => {
712                        Self::do_left_anti_join_with_non_equi_condition(params, cond)
713                    }
714                    JoinType::RightOuter => {
715                        Self::do_right_outer_join_with_non_equi_condition(params, cond)
716                    }
717                    JoinType::RightSemi => {
718                        Self::do_right_semi_anti_join_with_non_equi_condition::<false>(params, cond)
719                    }
720                    JoinType::RightAnti => {
721                        Self::do_right_semi_anti_join_with_non_equi_condition::<true>(params, cond)
722                    }
723                    JoinType::FullOuter => {
724                        Self::do_full_outer_join_with_non_equi_condition(params, cond)
725                    }
726                    JoinType::AsOfInner | JoinType::AsOfLeftOuter => {
727                        unreachable!("AsOf join should not reach here")
728                    }
729                };
730                // For non-equi join, we need an output chunk builder to align the output chunks.
731                let mut output_chunk_builder =
732                    DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
733                #[for_await]
734                for chunk in stream {
735                    for output_chunk in
736                        output_chunk_builder.append_chunk(chunk?.project(&self.output_indices))
737                    {
738                        yield output_chunk
739                    }
740                }
741                if let Some(output_chunk) = output_chunk_builder.consume_all() {
742                    yield output_chunk
743                }
744            } else {
745                let stream = match self.join_type {
746                    JoinType::Inner | JoinType::AsOfInner => Self::do_inner_join(params),
747                    JoinType::LeftOuter | JoinType::AsOfLeftOuter => {
748                        Self::do_left_outer_join(params)
749                    }
750                    JoinType::LeftSemi => Self::do_left_semi_anti_join::<false>(params),
751                    JoinType::LeftAnti => Self::do_left_semi_anti_join::<true>(params),
752                    JoinType::RightOuter => Self::do_right_outer_join(params),
753                    JoinType::RightSemi => Self::do_right_semi_anti_join::<false>(params),
754                    JoinType::RightAnti => Self::do_right_semi_anti_join::<true>(params),
755                    JoinType::FullOuter => Self::do_full_outer_join(params),
756                };
757                #[for_await]
758                for chunk in stream {
759                    yield chunk?.project(&self.output_indices)
760                }
761            }
762        }
763    }
764
765    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
766    pub async fn do_inner_join(
767        EquiJoinParams {
768            probe_side,
769            probe_key_idxs,
770            build_side,
771            full_data_types,
772            hash_map,
773            next_build_row_with_same_key,
774            chunk_size,
775            shutdown_rx,
776            asof_desc,
777            ..
778        }: EquiJoinParams<K>,
779    ) {
780        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
781        #[for_await]
782        for probe_chunk in probe_side.execute() {
783            let probe_chunk = probe_chunk?;
784            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
785            for (probe_row_id, probe_key) in probe_keys
786                .iter()
787                .enumerate()
788                .filter_by_bitmap(probe_chunk.visibility())
789            {
790                let build_side_row_iter =
791                    next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied());
792                if let Some(asof_desc) = &asof_desc {
793                    if let Some(build_row_id) = Self::find_asof_matched_rows(
794                        probe_chunk.row_at_unchecked_vis(probe_row_id),
795                        &build_side,
796                        build_side_row_iter,
797                        asof_desc,
798                    ) {
799                        shutdown_rx.check()?;
800                        if let Some(spilled) = Self::append_one_row(
801                            &mut chunk_builder,
802                            &probe_chunk,
803                            probe_row_id,
804                            &build_side[build_row_id.chunk_id()],
805                            build_row_id.row_id(),
806                        ) {
807                            yield spilled
808                        }
809                    }
810                } else {
811                    for build_row_id in build_side_row_iter {
812                        shutdown_rx.check()?;
813                        let build_chunk = &build_side[build_row_id.chunk_id()];
814                        if let Some(spilled) = Self::append_one_row(
815                            &mut chunk_builder,
816                            &probe_chunk,
817                            probe_row_id,
818                            build_chunk,
819                            build_row_id.row_id(),
820                        ) {
821                            yield spilled
822                        }
823                    }
824                }
825            }
826        }
827        if let Some(spilled) = chunk_builder.consume_all() {
828            yield spilled
829        }
830    }
831
832    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
833    pub async fn do_inner_join_with_non_equi_condition(
834        params: EquiJoinParams<K>,
835        cond: &BoxedExpression,
836    ) {
837        #[for_await]
838        for chunk in Self::do_inner_join(params) {
839            let mut chunk = chunk?;
840            chunk.set_visibility(cond.eval(&chunk).await?.as_bool().iter().collect());
841            yield chunk
842        }
843    }
844
845    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
846    pub async fn do_left_outer_join(
847        EquiJoinParams {
848            probe_side,
849            probe_key_idxs,
850            build_side,
851            build_data_types,
852            full_data_types,
853            hash_map,
854            next_build_row_with_same_key,
855            chunk_size,
856            shutdown_rx,
857            asof_desc,
858            ..
859        }: EquiJoinParams<K>,
860    ) {
861        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
862        #[for_await]
863        for probe_chunk in probe_side.execute() {
864            let probe_chunk = probe_chunk?;
865            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
866            for (probe_row_id, probe_key) in probe_keys
867                .iter()
868                .enumerate()
869                .filter_by_bitmap(probe_chunk.visibility())
870            {
871                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
872                    let build_side_row_iter =
873                        next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id));
874                    if let Some(asof_desc) = &asof_desc {
875                        if let Some(build_row_id) = Self::find_asof_matched_rows(
876                            probe_chunk.row_at_unchecked_vis(probe_row_id),
877                            &build_side,
878                            build_side_row_iter,
879                            asof_desc,
880                        ) {
881                            shutdown_rx.check()?;
882                            if let Some(spilled) = Self::append_one_row(
883                                &mut chunk_builder,
884                                &probe_chunk,
885                                probe_row_id,
886                                &build_side[build_row_id.chunk_id()],
887                                build_row_id.row_id(),
888                            ) {
889                                yield spilled
890                            }
891                        } else {
892                            shutdown_rx.check()?;
893                            let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
894                            if let Some(spilled) = Self::append_one_row_with_null_build_side(
895                                &mut chunk_builder,
896                                probe_row,
897                                build_data_types.len(),
898                            ) {
899                                yield spilled
900                            }
901                        }
902                    } else {
903                        for build_row_id in build_side_row_iter {
904                            shutdown_rx.check()?;
905                            let build_chunk = &build_side[build_row_id.chunk_id()];
906                            if let Some(spilled) = Self::append_one_row(
907                                &mut chunk_builder,
908                                &probe_chunk,
909                                probe_row_id,
910                                build_chunk,
911                                build_row_id.row_id(),
912                            ) {
913                                yield spilled
914                            }
915                        }
916                    }
917                } else {
918                    shutdown_rx.check()?;
919                    let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
920                    if let Some(spilled) = Self::append_one_row_with_null_build_side(
921                        &mut chunk_builder,
922                        probe_row,
923                        build_data_types.len(),
924                    ) {
925                        yield spilled
926                    }
927                }
928            }
929        }
930        if let Some(spilled) = chunk_builder.consume_all() {
931            yield spilled
932        }
933    }
934
935    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
936    pub async fn do_left_outer_join_with_non_equi_condition(
937        EquiJoinParams {
938            probe_side,
939            probe_data_types,
940            probe_key_idxs,
941            build_side,
942            build_data_types,
943            full_data_types,
944            hash_map,
945            next_build_row_with_same_key,
946            chunk_size,
947            shutdown_rx,
948            ..
949        }: EquiJoinParams<K>,
950        cond: &BoxedExpression,
951    ) {
952        let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size);
953        let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
954        let mut non_equi_state = LeftNonEquiJoinState {
955            probe_column_count: probe_data_types.len(),
956            ..Default::default()
957        };
958
959        #[for_await]
960        for probe_chunk in probe_side.execute() {
961            let probe_chunk = probe_chunk?;
962            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
963            for (probe_row_id, probe_key) in probe_keys
964                .iter()
965                .enumerate()
966                .filter_by_bitmap(probe_chunk.visibility())
967            {
968                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
969                    non_equi_state
970                        .first_output_row_id
971                        .push(chunk_builder.buffered_count());
972
973                    let mut build_row_id_iter = next_build_row_with_same_key
974                        .row_id_iter(Some(*first_matched_build_row_id))
975                        .peekable();
976                    while let Some(build_row_id) = build_row_id_iter.next() {
977                        shutdown_rx.check()?;
978                        let build_chunk = &build_side[build_row_id.chunk_id()];
979                        if let Some(spilled) = Self::append_one_row(
980                            &mut chunk_builder,
981                            &probe_chunk,
982                            probe_row_id,
983                            build_chunk,
984                            build_row_id.row_id(),
985                        ) {
986                            non_equi_state.has_more_output_rows =
987                                build_row_id_iter.peek().is_some();
988                            yield Self::process_left_outer_join_non_equi_condition(
989                                spilled,
990                                cond.as_ref(),
991                                &mut non_equi_state,
992                            )
993                            .await?
994                        }
995                    }
996                } else {
997                    shutdown_rx.check()?;
998                    let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
999                    if let Some(spilled) = Self::append_one_row_with_null_build_side(
1000                        &mut remaining_chunk_builder,
1001                        probe_row,
1002                        build_data_types.len(),
1003                    ) {
1004                        yield spilled
1005                    }
1006                }
1007            }
1008        }
1009        non_equi_state.has_more_output_rows = false;
1010        if let Some(spilled) = chunk_builder.consume_all() {
1011            yield Self::process_left_outer_join_non_equi_condition(
1012                spilled,
1013                cond.as_ref(),
1014                &mut non_equi_state,
1015            )
1016            .await?
1017        }
1018
1019        if let Some(spilled) = remaining_chunk_builder.consume_all() {
1020            yield spilled
1021        }
1022    }
1023
1024    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1025    pub async fn do_left_semi_anti_join<const ANTI_JOIN: bool>(
1026        EquiJoinParams {
1027            probe_side,
1028            probe_data_types,
1029            probe_key_idxs,
1030            hash_map,
1031            chunk_size,
1032            shutdown_rx,
1033            ..
1034        }: EquiJoinParams<K>,
1035    ) {
1036        let mut chunk_builder = DataChunkBuilder::new(probe_data_types, chunk_size);
1037        #[for_await]
1038        for probe_chunk in probe_side.execute() {
1039            let probe_chunk = probe_chunk?;
1040            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1041            for (probe_row_id, probe_key) in probe_keys
1042                .iter()
1043                .enumerate()
1044                .filter_by_bitmap(probe_chunk.visibility())
1045            {
1046                shutdown_rx.check()?;
1047                if !ANTI_JOIN {
1048                    if hash_map.contains_key(probe_key) {
1049                        if let Some(spilled) = Self::append_one_probe_row(
1050                            &mut chunk_builder,
1051                            &probe_chunk,
1052                            probe_row_id,
1053                        ) {
1054                            yield spilled
1055                        }
1056                    }
1057                } else if hash_map.get(probe_key).is_none() {
1058                    if let Some(spilled) =
1059                        Self::append_one_probe_row(&mut chunk_builder, &probe_chunk, probe_row_id)
1060                    {
1061                        yield spilled
1062                    }
1063                }
1064            }
1065        }
1066        if let Some(spilled) = chunk_builder.consume_all() {
1067            yield spilled
1068        }
1069    }
1070
1071    /// High-level idea:
1072    /// 1. For each probe_row, append candidate rows to buffer.
1073    ///    Candidate rows: Those satisfying equi_predicate (==).
1074    /// 2. If buffer becomes full, process it.
1075    ///    Apply non_equi_join predicates e.g. `>=`, `<=` to filter rows.
1076    ///    Track if probe_row is matched to avoid duplicates.
1077    /// 3. If we matched probe_row in spilled chunk,
1078    ///    stop appending its candidate rows,
1079    ///    to avoid matching it again in next spilled chunk.
1080    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1081    pub async fn do_left_semi_join_with_non_equi_condition<'a>(
1082        EquiJoinParams {
1083            probe_side,
1084            probe_key_idxs,
1085            build_side,
1086            full_data_types,
1087            hash_map,
1088            next_build_row_with_same_key,
1089            chunk_size,
1090            shutdown_rx,
1091            ..
1092        }: EquiJoinParams<K>,
1093        cond: &'a BoxedExpression,
1094    ) {
1095        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1096        let mut non_equi_state = LeftNonEquiJoinState::default();
1097
1098        #[for_await]
1099        for probe_chunk in probe_side.execute() {
1100            let probe_chunk = probe_chunk?;
1101            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1102            for (probe_row_id, probe_key) in probe_keys
1103                .iter()
1104                .enumerate()
1105                .filter_by_bitmap(probe_chunk.visibility())
1106            {
1107                non_equi_state.found_matched = false;
1108                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1109                    non_equi_state
1110                        .first_output_row_id
1111                        .push(chunk_builder.buffered_count());
1112
1113                    for build_row_id in
1114                        next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id))
1115                    {
1116                        shutdown_rx.check()?;
1117                        if non_equi_state.found_matched {
1118                            break;
1119                        }
1120                        let build_chunk = &build_side[build_row_id.chunk_id()];
1121                        if let Some(spilled) = Self::append_one_row(
1122                            &mut chunk_builder,
1123                            &probe_chunk,
1124                            probe_row_id,
1125                            build_chunk,
1126                            build_row_id.row_id(),
1127                        ) {
1128                            yield Self::process_left_semi_anti_join_non_equi_condition::<false>(
1129                                spilled,
1130                                cond.as_ref(),
1131                                &mut non_equi_state,
1132                            )
1133                            .await?
1134                        }
1135                    }
1136                }
1137            }
1138        }
1139
1140        // Process remaining rows in buffer
1141        if let Some(spilled) = chunk_builder.consume_all() {
1142            yield Self::process_left_semi_anti_join_non_equi_condition::<false>(
1143                spilled,
1144                cond.as_ref(),
1145                &mut non_equi_state,
1146            )
1147            .await?
1148        }
1149    }
1150
1151    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1152    pub async fn do_left_anti_join_with_non_equi_condition(
1153        EquiJoinParams {
1154            probe_side,
1155            probe_data_types,
1156            probe_key_idxs,
1157            build_side,
1158            full_data_types,
1159            hash_map,
1160            next_build_row_with_same_key,
1161            chunk_size,
1162            shutdown_rx,
1163            ..
1164        }: EquiJoinParams<K>,
1165        cond: &BoxedExpression,
1166    ) {
1167        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1168        let mut remaining_chunk_builder = DataChunkBuilder::new(probe_data_types, chunk_size);
1169        let mut non_equi_state = LeftNonEquiJoinState::default();
1170
1171        #[for_await]
1172        for probe_chunk in probe_side.execute() {
1173            let probe_chunk = probe_chunk?;
1174            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1175            for (probe_row_id, probe_key) in probe_keys
1176                .iter()
1177                .enumerate()
1178                .filter_by_bitmap(probe_chunk.visibility())
1179            {
1180                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1181                    non_equi_state
1182                        .first_output_row_id
1183                        .push(chunk_builder.buffered_count());
1184                    let mut build_row_id_iter = next_build_row_with_same_key
1185                        .row_id_iter(Some(*first_matched_build_row_id))
1186                        .peekable();
1187                    while let Some(build_row_id) = build_row_id_iter.next() {
1188                        shutdown_rx.check()?;
1189                        let build_chunk = &build_side[build_row_id.chunk_id()];
1190                        if let Some(spilled) = Self::append_one_row(
1191                            &mut chunk_builder,
1192                            &probe_chunk,
1193                            probe_row_id,
1194                            build_chunk,
1195                            build_row_id.row_id(),
1196                        ) {
1197                            non_equi_state.has_more_output_rows =
1198                                build_row_id_iter.peek().is_some();
1199                            yield Self::process_left_semi_anti_join_non_equi_condition::<true>(
1200                                spilled,
1201                                cond.as_ref(),
1202                                &mut non_equi_state,
1203                            )
1204                            .await?
1205                        }
1206                    }
1207                } else if let Some(spilled) = Self::append_one_probe_row(
1208                    &mut remaining_chunk_builder,
1209                    &probe_chunk,
1210                    probe_row_id,
1211                ) {
1212                    yield spilled
1213                }
1214            }
1215        }
1216        non_equi_state.has_more_output_rows = false;
1217        if let Some(spilled) = chunk_builder.consume_all() {
1218            yield Self::process_left_semi_anti_join_non_equi_condition::<true>(
1219                spilled,
1220                cond.as_ref(),
1221                &mut non_equi_state,
1222            )
1223            .await?
1224        }
1225        if let Some(spilled) = remaining_chunk_builder.consume_all() {
1226            yield spilled
1227        }
1228    }
1229
1230    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1231    pub async fn do_right_outer_join(
1232        EquiJoinParams {
1233            probe_side,
1234            probe_data_types,
1235            probe_key_idxs,
1236            build_side,
1237            full_data_types,
1238            hash_map,
1239            next_build_row_with_same_key,
1240            chunk_size,
1241            shutdown_rx,
1242            ..
1243        }: EquiJoinParams<K>,
1244    ) {
1245        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1246        let mut build_row_matched =
1247            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1248
1249        #[for_await]
1250        for probe_chunk in probe_side.execute() {
1251            let probe_chunk = probe_chunk?;
1252            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1253            for (probe_row_id, probe_key) in probe_keys
1254                .iter()
1255                .enumerate()
1256                .filter_by_bitmap(probe_chunk.visibility())
1257            {
1258                for build_row_id in
1259                    next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1260                {
1261                    shutdown_rx.check()?;
1262                    build_row_matched[build_row_id] = true;
1263                    let build_chunk = &build_side[build_row_id.chunk_id()];
1264                    if let Some(spilled) = Self::append_one_row(
1265                        &mut chunk_builder,
1266                        &probe_chunk,
1267                        probe_row_id,
1268                        build_chunk,
1269                        build_row_id.row_id(),
1270                    ) {
1271                        yield spilled
1272                    }
1273                }
1274            }
1275        }
1276        #[for_await]
1277        for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1278            &mut chunk_builder,
1279            &build_side,
1280            &build_row_matched,
1281            probe_data_types.len(),
1282        ) {
1283            yield spilled?
1284        }
1285    }
1286
1287    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1288    pub async fn do_right_outer_join_with_non_equi_condition(
1289        EquiJoinParams {
1290            probe_side,
1291            probe_data_types,
1292            probe_key_idxs,
1293            build_side,
1294            full_data_types,
1295            hash_map,
1296            next_build_row_with_same_key,
1297            chunk_size,
1298            shutdown_rx,
1299            ..
1300        }: EquiJoinParams<K>,
1301        cond: &BoxedExpression,
1302    ) {
1303        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1304        let build_row_matched =
1305            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1306        let mut non_equi_state = RightNonEquiJoinState {
1307            build_row_matched,
1308            ..Default::default()
1309        };
1310
1311        #[for_await]
1312        for probe_chunk in probe_side.execute() {
1313            let probe_chunk = probe_chunk?;
1314            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1315            for (probe_row_id, probe_key) in probe_keys
1316                .iter()
1317                .enumerate()
1318                .filter_by_bitmap(probe_chunk.visibility())
1319            {
1320                for build_row_id in
1321                    next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1322                {
1323                    shutdown_rx.check()?;
1324                    non_equi_state.build_row_ids.push(build_row_id);
1325                    let build_chunk = &build_side[build_row_id.chunk_id()];
1326                    if let Some(spilled) = Self::append_one_row(
1327                        &mut chunk_builder,
1328                        &probe_chunk,
1329                        probe_row_id,
1330                        build_chunk,
1331                        build_row_id.row_id(),
1332                    ) {
1333                        yield Self::process_right_outer_join_non_equi_condition(
1334                            spilled,
1335                            cond.as_ref(),
1336                            &mut non_equi_state,
1337                        )
1338                        .await?
1339                    }
1340                }
1341            }
1342        }
1343        if let Some(spilled) = chunk_builder.consume_all() {
1344            yield Self::process_right_outer_join_non_equi_condition(
1345                spilled,
1346                cond.as_ref(),
1347                &mut non_equi_state,
1348            )
1349            .await?
1350        }
1351        #[for_await]
1352        for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1353            &mut chunk_builder,
1354            &build_side,
1355            &non_equi_state.build_row_matched,
1356            probe_data_types.len(),
1357        ) {
1358            yield spilled?
1359        }
1360    }
1361
1362    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1363    pub async fn do_right_semi_anti_join<const ANTI_JOIN: bool>(
1364        EquiJoinParams {
1365            probe_side,
1366            probe_key_idxs,
1367            build_side,
1368            build_data_types,
1369            hash_map,
1370            next_build_row_with_same_key,
1371            chunk_size,
1372            shutdown_rx,
1373            ..
1374        }: EquiJoinParams<K>,
1375    ) {
1376        let mut chunk_builder = DataChunkBuilder::new(build_data_types, chunk_size);
1377        let mut build_row_matched =
1378            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1379
1380        #[for_await]
1381        for probe_chunk in probe_side.execute() {
1382            let probe_chunk = probe_chunk?;
1383            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1384            for probe_key in probe_keys.iter().filter_by_bitmap(probe_chunk.visibility()) {
1385                for build_row_id in
1386                    next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1387                {
1388                    shutdown_rx.check()?;
1389                    build_row_matched[build_row_id] = true;
1390                }
1391            }
1392        }
1393        #[for_await]
1394        for spilled in Self::handle_remaining_build_rows_for_right_semi_anti_join::<ANTI_JOIN>(
1395            &mut chunk_builder,
1396            &build_side,
1397            &build_row_matched,
1398        ) {
1399            yield spilled?
1400        }
1401    }
1402
1403    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1404    pub async fn do_right_semi_anti_join_with_non_equi_condition<const ANTI_JOIN: bool>(
1405        EquiJoinParams {
1406            probe_side,
1407            probe_key_idxs,
1408            build_side,
1409            build_data_types,
1410            full_data_types,
1411            hash_map,
1412            next_build_row_with_same_key,
1413            chunk_size,
1414            shutdown_rx,
1415            ..
1416        }: EquiJoinParams<K>,
1417        cond: &BoxedExpression,
1418    ) {
1419        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1420        let mut remaining_chunk_builder = DataChunkBuilder::new(build_data_types, chunk_size);
1421        let build_row_matched =
1422            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1423        let mut non_equi_state = RightNonEquiJoinState {
1424            build_row_matched,
1425            ..Default::default()
1426        };
1427
1428        #[for_await]
1429        for probe_chunk in probe_side.execute() {
1430            let probe_chunk = probe_chunk?;
1431            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1432            for (probe_row_id, probe_key) in probe_keys
1433                .iter()
1434                .enumerate()
1435                .filter_by_bitmap(probe_chunk.visibility())
1436            {
1437                for build_row_id in
1438                    next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1439                {
1440                    shutdown_rx.check()?;
1441                    non_equi_state.build_row_ids.push(build_row_id);
1442                    let build_chunk = &build_side[build_row_id.chunk_id()];
1443                    if let Some(spilled) = Self::append_one_row(
1444                        &mut chunk_builder,
1445                        &probe_chunk,
1446                        probe_row_id,
1447                        build_chunk,
1448                        build_row_id.row_id(),
1449                    ) {
1450                        Self::process_right_semi_anti_join_non_equi_condition(
1451                            spilled,
1452                            cond.as_ref(),
1453                            &mut non_equi_state,
1454                        )
1455                        .await?
1456                    }
1457                }
1458            }
1459        }
1460        if let Some(spilled) = chunk_builder.consume_all() {
1461            Self::process_right_semi_anti_join_non_equi_condition(
1462                spilled,
1463                cond.as_ref(),
1464                &mut non_equi_state,
1465            )
1466            .await?
1467        }
1468        #[for_await]
1469        for spilled in Self::handle_remaining_build_rows_for_right_semi_anti_join::<ANTI_JOIN>(
1470            &mut remaining_chunk_builder,
1471            &build_side,
1472            &non_equi_state.build_row_matched,
1473        ) {
1474            yield spilled?
1475        }
1476    }
1477
1478    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1479    pub async fn do_full_outer_join(
1480        EquiJoinParams {
1481            probe_side,
1482            probe_data_types,
1483            probe_key_idxs,
1484            build_side,
1485            build_data_types,
1486            full_data_types,
1487            hash_map,
1488            next_build_row_with_same_key,
1489            chunk_size,
1490            shutdown_rx,
1491            ..
1492        }: EquiJoinParams<K>,
1493    ) {
1494        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1495        let mut build_row_matched =
1496            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1497
1498        #[for_await]
1499        for probe_chunk in probe_side.execute() {
1500            let probe_chunk = probe_chunk?;
1501            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1502            for (probe_row_id, probe_key) in probe_keys
1503                .iter()
1504                .enumerate()
1505                .filter_by_bitmap(probe_chunk.visibility())
1506            {
1507                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1508                    for build_row_id in
1509                        next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id))
1510                    {
1511                        shutdown_rx.check()?;
1512                        build_row_matched[build_row_id] = true;
1513                        let build_chunk = &build_side[build_row_id.chunk_id()];
1514                        if let Some(spilled) = Self::append_one_row(
1515                            &mut chunk_builder,
1516                            &probe_chunk,
1517                            probe_row_id,
1518                            build_chunk,
1519                            build_row_id.row_id(),
1520                        ) {
1521                            yield spilled
1522                        }
1523                    }
1524                } else {
1525                    let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
1526                    if let Some(spilled) = Self::append_one_row_with_null_build_side(
1527                        &mut chunk_builder,
1528                        probe_row,
1529                        build_data_types.len(),
1530                    ) {
1531                        yield spilled
1532                    }
1533                }
1534            }
1535        }
1536        #[for_await]
1537        for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1538            &mut chunk_builder,
1539            &build_side,
1540            &build_row_matched,
1541            probe_data_types.len(),
1542        ) {
1543            yield spilled?
1544        }
1545    }
1546
1547    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1548    pub async fn do_full_outer_join_with_non_equi_condition(
1549        EquiJoinParams {
1550            probe_side,
1551            probe_data_types,
1552            probe_key_idxs,
1553            build_side,
1554            build_data_types,
1555            full_data_types,
1556            hash_map,
1557            next_build_row_with_same_key,
1558            chunk_size,
1559            shutdown_rx,
1560            ..
1561        }: EquiJoinParams<K>,
1562        cond: &BoxedExpression,
1563    ) {
1564        let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size);
1565        let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1566        let mut left_non_equi_state = LeftNonEquiJoinState {
1567            probe_column_count: probe_data_types.len(),
1568            ..Default::default()
1569        };
1570        let build_row_matched =
1571            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1572        let mut right_non_equi_state = RightNonEquiJoinState {
1573            build_row_matched,
1574            ..Default::default()
1575        };
1576
1577        #[for_await]
1578        for probe_chunk in probe_side.execute() {
1579            let probe_chunk = probe_chunk?;
1580            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1581            for (probe_row_id, probe_key) in probe_keys
1582                .iter()
1583                .enumerate()
1584                .filter_by_bitmap(probe_chunk.visibility())
1585            {
1586                left_non_equi_state.found_matched = false;
1587                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1588                    left_non_equi_state
1589                        .first_output_row_id
1590                        .push(chunk_builder.buffered_count());
1591                    let mut build_row_id_iter = next_build_row_with_same_key
1592                        .row_id_iter(Some(*first_matched_build_row_id))
1593                        .peekable();
1594                    while let Some(build_row_id) = build_row_id_iter.next() {
1595                        shutdown_rx.check()?;
1596                        right_non_equi_state.build_row_ids.push(build_row_id);
1597                        let build_chunk = &build_side[build_row_id.chunk_id()];
1598                        if let Some(spilled) = Self::append_one_row(
1599                            &mut chunk_builder,
1600                            &probe_chunk,
1601                            probe_row_id,
1602                            build_chunk,
1603                            build_row_id.row_id(),
1604                        ) {
1605                            left_non_equi_state.has_more_output_rows =
1606                                build_row_id_iter.peek().is_some();
1607                            yield Self::process_full_outer_join_non_equi_condition(
1608                                spilled,
1609                                cond.as_ref(),
1610                                &mut left_non_equi_state,
1611                                &mut right_non_equi_state,
1612                            )
1613                            .await?
1614                        }
1615                    }
1616                } else {
1617                    shutdown_rx.check()?;
1618                    let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
1619                    if let Some(spilled) = Self::append_one_row_with_null_build_side(
1620                        &mut remaining_chunk_builder,
1621                        probe_row,
1622                        build_data_types.len(),
1623                    ) {
1624                        yield spilled
1625                    }
1626                }
1627            }
1628        }
1629        left_non_equi_state.has_more_output_rows = false;
1630        if let Some(spilled) = chunk_builder.consume_all() {
1631            yield Self::process_full_outer_join_non_equi_condition(
1632                spilled,
1633                cond.as_ref(),
1634                &mut left_non_equi_state,
1635                &mut right_non_equi_state,
1636            )
1637            .await?
1638        }
1639        #[for_await]
1640        for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1641            &mut remaining_chunk_builder,
1642            &build_side,
1643            &right_non_equi_state.build_row_matched,
1644            probe_data_types.len(),
1645        ) {
1646            yield spilled?
1647        }
1648    }
1649
1650    /// Process output chunk for left outer join when non-equi condition is presented.
1651    ///
1652    /// # Arguments
1653    /// * `chunk` - Output chunk from `do_left_outer_join_with_non_equi_condition`, containing:
1654    ///     - Concatenation of probe row and its corresponding build row according to the hash map.
1655    ///     - Concatenation of probe row and `NULL` build row, if there is no matched build row
1656    ///       found for the probe row.
1657    /// * `cond` - Non-equi join condition.
1658    /// * `probe_column_count` - The number of columns in the probe side.
1659    /// * `first_output_row_id` - The offset of the first output row in `chunk` for each probe side
1660    ///   row that has been processed.
1661    /// * `has_more_output_rows` - Whether the probe row being processed currently has output rows
1662    ///   in next output chunk.
1663    /// * `found_matched` - Whether the probe row being processed currently has matched non-NULL
1664    ///   build rows in last output chunk.
1665    ///
1666    /// # Examples
1667    /// Assume we have two tables `t1` and `t2` as probe side and build side, respectively.
1668    /// ```sql
1669    /// CREATE TABLE t1 (v1 int, v2 int);
1670    /// CREATE TABLE t2 (v3 int);
1671    /// ```
1672    ///
1673    /// Now we de left outer join on `t1` and `t2`, as the following query shows:
1674    /// ```sql
1675    /// SELECT * FROM t1 LEFT JOIN t2 ON t1.v1 = t2.v3 AND t1.v2 <> t2.v3;
1676    /// ```
1677    ///
1678    /// Assume the chunk builder in `do_left_outer_join_with_non_equi_condition` has buffer size 5,
1679    /// and we have the following chunk as the first output ('-' represents NULL).
1680    ///
1681    /// | offset | v1 | v2 | v3 |
1682    /// |---|---|---|---|
1683    /// | 0 | 1 | 2 | 1 |
1684    /// | 1 | 1 | 1 | 1 |
1685    /// | 2 | 2 | 3 | - |
1686    /// | 3 | 3 | 3 | 3 |
1687    /// | 4 | 3 | 3 | 3 |
1688    ///
1689    /// We have the following precondition:
1690    /// ```ignore
1691    /// assert_eq!(probe_column_count, 2);
1692    /// assert_eq!(first_out_row_id, vec![0, 1, 2, 3]);
1693    /// assert_eq!(has_more_output_rows);
1694    /// assert_eq!(!found_matched);
1695    /// ```
1696    ///
1697    /// In `process_left_outer_join_non_equi_condition`, we transform the chunk in following steps.
1698    ///
1699    /// 1. Evaluate the non-equi condition on the chunk. Here the condition is `t1.v2 <> t2.v3`.
1700    ///
1701    /// We get the result array:
1702    ///
1703    /// | offset | value |
1704    /// | --- | --- |
1705    /// | 0 | true |
1706    /// | 1 | false |
1707    /// | 2 | false |
1708    /// | 3 | false |
1709    /// | 4 | false |
1710    ///
1711    /// 2. Set the build side columns to NULL if the corresponding result value is false.
1712    ///
1713    /// The chunk is changed to:
1714    ///
1715    /// | offset | v1 | v2 | v3 |
1716    /// |---|---|---|---|
1717    /// | 0 | 1 | 2 | 1 |
1718    /// | 1 | 1 | 1 | - |
1719    /// | 2 | 2 | 3 | - |
1720    /// | 3 | 3 | 3 | - |
1721    /// | 4 | 3 | 3 | - |
1722    ///
1723    /// 3. Remove duplicate rows with NULL build side. This is done by setting the visibility bitmap
1724    ///    of the chunk.
1725    ///
1726    /// | offset | v1 | v2 | v3 |
1727    /// |---|---|---|---|
1728    /// | 0 | 1 | 2 | 1 |
1729    /// | 1 | 1 | 1 | - |
1730    /// | 2 | 2 | 3 | - |
1731    /// | 3 | ~~3~~ | ~~3~~ | ~~-~~ |
1732    /// | 4 | ~~3~~ | ~~3~~ | ~~-~~ |
1733    ///
1734    /// For the probe row being processed currently (`(3, 3)` here), we don't have output rows with
1735    /// non-NULL build side, so we set `found_matched` to false.
1736    ///
1737    /// In `do_left_outer_join_with_non_equi_condition`, we have next output chunk as follows:
1738    ///
1739    /// | offset | v1 | v2 | v3 |
1740    /// |---|---|---|---|
1741    /// | 0 | 3 | 3 | 3 |
1742    /// | 1 | 3 | 3 | 3 |
1743    /// | 2 | 5 | 5 | - |
1744    /// | 3 | 5 | 3 | - |
1745    /// | 4 | 5 | 3 | - |
1746    ///
1747    /// This time We have the following precondition:
1748    /// ```ignore
1749    /// assert_eq!(probe_column_count, 2);
1750    /// assert_eq!(first_out_row_id, vec![2, 3]);
1751    /// assert_eq!(!has_more_output_rows);
1752    /// assert_eq!(!found_matched);
1753    /// ```
1754    ///
1755    /// The transformed chunk is as follows after the same steps.
1756    ///
1757    /// | offset | v1 | v2 | v3 |
1758    /// |---|---|---|---|
1759    /// | 0 | ~~3~~ | ~~3~~ | ~~3~~ |
1760    /// | 1 | 3 | 3 | - |
1761    /// | 2 | 5 | 5 | - |
1762    /// | 3 | 5 | 3 | - |
1763    /// | 4 | ~~5~~ | ~~3~~ | ~~-~~ |
1764    ///
1765    /// After we add these chunks to output chunk builder in `do_execute`, we get the final output:
1766    ///
1767    /// Chunk 1
1768    ///
1769    /// | offset | v1 | v2 | v3 |
1770    /// |---|---|---|---|
1771    /// | 0 | 1 | 2 | 1 |
1772    /// | 1 | 1 | 1 | - |
1773    /// | 2 | 2 | 3 | - |
1774    /// | 3 | 3 | 3 | - |
1775    /// | 4 | 5 | 5 | - |
1776    ///
1777    /// Chunk 2
1778    ///
1779    /// | offset | v1 | v2 | v3 |
1780    /// |---|---|---|---|
1781    /// | 0 | 5 | 3 | - |
1782    ///
1783    ///
1784    /// For more information about how `process_*_join_non_equi_condition` work, see their unit
1785    /// tests.
1786    async fn process_left_outer_join_non_equi_condition(
1787        chunk: DataChunk,
1788        cond: &dyn Expression,
1789        LeftNonEquiJoinState {
1790            probe_column_count,
1791            first_output_row_id,
1792            has_more_output_rows,
1793            found_matched,
1794        }: &mut LeftNonEquiJoinState,
1795    ) -> Result<DataChunk> {
1796        let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1797        Ok(DataChunkMutator(chunk)
1798            .nullify_build_side_for_non_equi_condition(&filter, *probe_column_count)
1799            .remove_duplicate_rows_for_left_outer_join(
1800                &filter,
1801                first_output_row_id,
1802                *has_more_output_rows,
1803                found_matched,
1804            )
1805            .take())
1806    }
1807
1808    /// Filters for candidate rows which satisfy `non_equi` predicate.
1809    /// Removes duplicate rows.
1810    async fn process_left_semi_anti_join_non_equi_condition<const ANTI_JOIN: bool>(
1811        chunk: DataChunk,
1812        cond: &dyn Expression,
1813        LeftNonEquiJoinState {
1814            first_output_row_id,
1815            found_matched,
1816            has_more_output_rows,
1817            ..
1818        }: &mut LeftNonEquiJoinState,
1819    ) -> Result<DataChunk> {
1820        let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1821        Ok(DataChunkMutator(chunk)
1822            .remove_duplicate_rows_for_left_semi_anti_join::<ANTI_JOIN>(
1823                &filter,
1824                first_output_row_id,
1825                *has_more_output_rows,
1826                found_matched,
1827            )
1828            .take())
1829    }
1830
1831    async fn process_right_outer_join_non_equi_condition(
1832        chunk: DataChunk,
1833        cond: &dyn Expression,
1834        RightNonEquiJoinState {
1835            build_row_ids,
1836            build_row_matched,
1837        }: &mut RightNonEquiJoinState,
1838    ) -> Result<DataChunk> {
1839        let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1840        Ok(DataChunkMutator(chunk)
1841            .remove_duplicate_rows_for_right_outer_join(&filter, build_row_ids, build_row_matched)
1842            .take())
1843    }
1844
1845    async fn process_right_semi_anti_join_non_equi_condition(
1846        chunk: DataChunk,
1847        cond: &dyn Expression,
1848        RightNonEquiJoinState {
1849            build_row_ids,
1850            build_row_matched,
1851        }: &mut RightNonEquiJoinState,
1852    ) -> Result<()> {
1853        let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1854        DataChunkMutator(chunk).remove_duplicate_rows_for_right_semi_anti_join(
1855            &filter,
1856            build_row_ids,
1857            build_row_matched,
1858        );
1859        Ok(())
1860    }
1861
1862    async fn process_full_outer_join_non_equi_condition(
1863        chunk: DataChunk,
1864        cond: &dyn Expression,
1865        left_non_equi_state: &mut LeftNonEquiJoinState,
1866        right_non_equi_state: &mut RightNonEquiJoinState,
1867    ) -> Result<DataChunk> {
1868        let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1869        Ok(DataChunkMutator(chunk)
1870            .nullify_build_side_for_non_equi_condition(
1871                &filter,
1872                left_non_equi_state.probe_column_count,
1873            )
1874            .remove_duplicate_rows_for_full_outer_join(
1875                &filter,
1876                left_non_equi_state,
1877                right_non_equi_state,
1878            )
1879            .take())
1880    }
1881
1882    #[try_stream(ok = DataChunk, error = BatchError)]
1883    async fn handle_remaining_build_rows_for_right_outer_join<'a>(
1884        chunk_builder: &'a mut DataChunkBuilder,
1885        build_side: &'a [DataChunk],
1886        build_row_matched: &'a ChunkedData<bool>,
1887        probe_column_count: usize,
1888    ) {
1889        for build_row_id in build_row_matched
1890            .all_row_ids()
1891            .filter(|build_row_id| !build_row_matched[*build_row_id])
1892        {
1893            let build_row =
1894                build_side[build_row_id.chunk_id()].row_at_unchecked_vis(build_row_id.row_id());
1895            if let Some(spilled) = Self::append_one_row_with_null_probe_side(
1896                chunk_builder,
1897                build_row,
1898                probe_column_count,
1899            ) {
1900                yield spilled
1901            }
1902        }
1903        if let Some(spilled) = chunk_builder.consume_all() {
1904            yield spilled
1905        }
1906    }
1907
1908    #[try_stream(ok = DataChunk, error = BatchError)]
1909    async fn handle_remaining_build_rows_for_right_semi_anti_join<'a, const ANTI_JOIN: bool>(
1910        chunk_builder: &'a mut DataChunkBuilder,
1911        build_side: &'a [DataChunk],
1912        build_row_matched: &'a ChunkedData<bool>,
1913    ) {
1914        for build_row_id in build_row_matched.all_row_ids().filter(|build_row_id| {
1915            if !ANTI_JOIN {
1916                build_row_matched[*build_row_id]
1917            } else {
1918                !build_row_matched[*build_row_id]
1919            }
1920        }) {
1921            if let Some(spilled) = Self::append_one_build_row(
1922                chunk_builder,
1923                &build_side[build_row_id.chunk_id()],
1924                build_row_id.row_id(),
1925            ) {
1926                yield spilled
1927            }
1928        }
1929        if let Some(spilled) = chunk_builder.consume_all() {
1930            yield spilled
1931        }
1932    }
1933
1934    fn append_one_row(
1935        chunk_builder: &mut DataChunkBuilder,
1936        probe_chunk: &DataChunk,
1937        probe_row_id: usize,
1938        build_chunk: &DataChunk,
1939        build_row_id: usize,
1940    ) -> Option<DataChunk> {
1941        chunk_builder.append_one_row_from_array_elements(
1942            probe_chunk.columns().iter().map(|c| c.as_ref()),
1943            probe_row_id,
1944            build_chunk.columns().iter().map(|c| c.as_ref()),
1945            build_row_id,
1946        )
1947    }
1948
1949    fn append_one_probe_row(
1950        chunk_builder: &mut DataChunkBuilder,
1951        probe_chunk: &DataChunk,
1952        probe_row_id: usize,
1953    ) -> Option<DataChunk> {
1954        chunk_builder.append_one_row_from_array_elements(
1955            probe_chunk.columns().iter().map(|c| c.as_ref()),
1956            probe_row_id,
1957            empty(),
1958            0,
1959        )
1960    }
1961
1962    fn append_one_build_row(
1963        chunk_builder: &mut DataChunkBuilder,
1964        build_chunk: &DataChunk,
1965        build_row_id: usize,
1966    ) -> Option<DataChunk> {
1967        chunk_builder.append_one_row_from_array_elements(
1968            empty(),
1969            0,
1970            build_chunk.columns().iter().map(|c| c.as_ref()),
1971            build_row_id,
1972        )
1973    }
1974
1975    fn append_one_row_with_null_build_side(
1976        chunk_builder: &mut DataChunkBuilder,
1977        probe_row_ref: RowRef<'_>,
1978        build_column_count: usize,
1979    ) -> Option<DataChunk> {
1980        chunk_builder.append_one_row(probe_row_ref.chain(repeat_n(Datum::None, build_column_count)))
1981    }
1982
1983    fn append_one_row_with_null_probe_side(
1984        chunk_builder: &mut DataChunkBuilder,
1985        build_row_ref: RowRef<'_>,
1986        probe_column_count: usize,
1987    ) -> Option<DataChunk> {
1988        chunk_builder.append_one_row(repeat_n(Datum::None, probe_column_count).chain(build_row_ref))
1989    }
1990
1991    fn find_asof_matched_rows(
1992        probe_row_ref: RowRef<'_>,
1993        build_side: &[DataChunk],
1994        build_side_row_iter: RowIdIter<'_>,
1995        asof_join_condition: &AsOfDesc,
1996    ) -> Option<RowId> {
1997        let probe_inequality_value = probe_row_ref.datum_at(asof_join_condition.left_idx);
1998        if let Some(probe_inequality_scalar) = probe_inequality_value {
1999            let mut result_row_id: Option<RowId> = None;
2000            let mut build_row_ref;
2001
2002            for build_row_id in build_side_row_iter {
2003                build_row_ref =
2004                    build_side[build_row_id.chunk_id()].row_at_unchecked_vis(build_row_id.row_id());
2005                let build_inequality_value = build_row_ref.datum_at(asof_join_condition.right_idx);
2006                if let Some(build_inequality_scalar) = build_inequality_value {
2007                    let mut pick_result = |compare: fn(Ordering) -> bool| {
2008                        if let Some(result_row_id_inner) = result_row_id {
2009                            let result_row_ref = build_side[result_row_id_inner.chunk_id()]
2010                                .row_at_unchecked_vis(result_row_id_inner.row_id());
2011                            let result_inequality_scalar = result_row_ref
2012                                .datum_at(asof_join_condition.right_idx)
2013                                .unwrap();
2014                            if compare(
2015                                probe_inequality_scalar.default_cmp(&build_inequality_scalar),
2016                            ) && compare(
2017                                probe_inequality_scalar.default_cmp(&result_inequality_scalar),
2018                            ) {
2019                                result_row_id = Some(build_row_id);
2020                            }
2021                        } else if compare(
2022                            probe_inequality_scalar.default_cmp(&build_inequality_scalar),
2023                        ) {
2024                            result_row_id = Some(build_row_id);
2025                        }
2026                    };
2027                    match asof_join_condition.inequality_type {
2028                        AsOfInequalityType::Lt => {
2029                            pick_result(Ordering::is_lt);
2030                        }
2031                        AsOfInequalityType::Le => {
2032                            pick_result(Ordering::is_le);
2033                        }
2034                        AsOfInequalityType::Gt => {
2035                            pick_result(Ordering::is_gt);
2036                        }
2037                        AsOfInequalityType::Ge => {
2038                            pick_result(Ordering::is_ge);
2039                        }
2040                    }
2041                }
2042            }
2043            result_row_id
2044        } else {
2045            None
2046        }
2047    }
2048}
2049
2050/// `DataChunkMutator` transforms the given data chunk for non-equi join.
2051#[repr(transparent)]
2052struct DataChunkMutator(DataChunk);
2053
2054impl DataChunkMutator {
2055    fn nullify_build_side_for_non_equi_condition(
2056        self,
2057        filter: &Bitmap,
2058        probe_column_count: usize,
2059    ) -> Self {
2060        let (mut columns, vis) = self.0.into_parts();
2061
2062        for build_column in columns.split_off(probe_column_count) {
2063            // Is it really safe to use Arc::try_unwrap here?
2064            let mut array = Arc::try_unwrap(build_column).unwrap();
2065            array.set_bitmap(array.null_bitmap() & filter);
2066            columns.push(array.into());
2067        }
2068
2069        Self(DataChunk::new(columns, vis))
2070    }
2071
2072    fn remove_duplicate_rows_for_left_outer_join(
2073        mut self,
2074        filter: &Bitmap,
2075        first_output_row_ids: &mut Vec<usize>,
2076        has_more_output_rows: bool,
2077        found_non_null: &mut bool,
2078    ) -> Self {
2079        let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2080
2081        for (&start_row_id, &end_row_id) in iter::once(&0)
2082            .chain(first_output_row_ids.iter())
2083            .tuple_windows()
2084            .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2085        {
2086            for row_id in start_row_id..end_row_id {
2087                if filter.is_set(row_id) {
2088                    *found_non_null = true;
2089                    new_visibility.set(row_id, true);
2090                }
2091            }
2092            if !*found_non_null {
2093                new_visibility.set(start_row_id, true);
2094            }
2095            *found_non_null = false;
2096        }
2097
2098        let start_row_id = first_output_row_ids.last().copied().unwrap_or_default();
2099        for row_id in start_row_id..filter.len() {
2100            if filter.is_set(row_id) {
2101                *found_non_null = true;
2102                new_visibility.set(row_id, true);
2103            }
2104        }
2105        if !has_more_output_rows {
2106            if !*found_non_null {
2107                new_visibility.set(start_row_id, true);
2108            }
2109            *found_non_null = false;
2110        }
2111
2112        first_output_row_ids.clear();
2113
2114        self.0
2115            .set_visibility(new_visibility.finish() & self.0.visibility());
2116        self
2117    }
2118
2119    /// Removes duplicate rows using `filter`
2120    /// and only returns the first match for each window.
2121    /// Windows are indicated by `first_output_row_ids`.
2122    fn remove_duplicate_rows_for_left_semi_anti_join<const ANTI_JOIN: bool>(
2123        mut self,
2124        filter: &Bitmap,
2125        first_output_row_ids: &mut Vec<usize>,
2126        has_more_output_rows: bool,
2127        found_matched: &mut bool,
2128    ) -> Self {
2129        let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2130
2131        for (&start_row_id, &end_row_id) in iter::once(&0)
2132            .chain(first_output_row_ids.iter())
2133            .tuple_windows()
2134            .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2135        {
2136            for row_id in start_row_id..end_row_id {
2137                if filter.is_set(row_id) {
2138                    if !ANTI_JOIN && !*found_matched {
2139                        new_visibility.set(row_id, true);
2140                    }
2141                    *found_matched = true;
2142                    break;
2143                }
2144            }
2145            if ANTI_JOIN && !*found_matched {
2146                new_visibility.set(start_row_id, true);
2147            }
2148            *found_matched = false;
2149        }
2150
2151        let start_row_id = first_output_row_ids.last().copied().unwrap_or_default();
2152        for row_id in start_row_id..filter.len() {
2153            if filter.is_set(row_id) {
2154                if !ANTI_JOIN && !*found_matched {
2155                    new_visibility.set(row_id, true);
2156                }
2157                *found_matched = true;
2158                break;
2159            }
2160        }
2161        if !has_more_output_rows && ANTI_JOIN {
2162            if !*found_matched {
2163                new_visibility.set(start_row_id, true);
2164            }
2165            *found_matched = false;
2166        }
2167
2168        first_output_row_ids.clear();
2169
2170        self.0
2171            .set_visibility(new_visibility.finish() & self.0.visibility());
2172        self
2173    }
2174
2175    fn remove_duplicate_rows_for_right_outer_join(
2176        mut self,
2177        filter: &Bitmap,
2178        build_row_ids: &mut Vec<RowId>,
2179        build_row_matched: &mut ChunkedData<bool>,
2180    ) -> Self {
2181        let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2182        for (output_row_id, (output_row_non_null, &build_row_id)) in
2183            filter.iter().zip_eq_fast(build_row_ids.iter()).enumerate()
2184        {
2185            if output_row_non_null {
2186                build_row_matched[build_row_id] = true;
2187                new_visibility.set(output_row_id, true);
2188            }
2189        }
2190
2191        build_row_ids.clear();
2192
2193        self.0
2194            .set_visibility(new_visibility.finish() & self.0.visibility());
2195        self
2196    }
2197
2198    fn remove_duplicate_rows_for_right_semi_anti_join(
2199        self,
2200        filter: &Bitmap,
2201        build_row_ids: &mut Vec<RowId>,
2202        build_row_matched: &mut ChunkedData<bool>,
2203    ) {
2204        for (output_row_non_null, &build_row_id) in filter.iter().zip_eq_fast(build_row_ids.iter())
2205        {
2206            if output_row_non_null {
2207                build_row_matched[build_row_id] = true;
2208            }
2209        }
2210
2211        build_row_ids.clear();
2212    }
2213
2214    fn remove_duplicate_rows_for_full_outer_join(
2215        mut self,
2216        filter: &Bitmap,
2217        LeftNonEquiJoinState {
2218            first_output_row_id,
2219            has_more_output_rows,
2220            found_matched,
2221            ..
2222        }: &mut LeftNonEquiJoinState,
2223        RightNonEquiJoinState {
2224            build_row_ids,
2225            build_row_matched,
2226        }: &mut RightNonEquiJoinState,
2227    ) -> Self {
2228        let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2229
2230        for (&start_row_id, &end_row_id) in iter::once(&0)
2231            .chain(first_output_row_id.iter())
2232            .tuple_windows()
2233            .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2234        {
2235            for row_id in start_row_id..end_row_id {
2236                if filter.is_set(row_id) {
2237                    *found_matched = true;
2238                    new_visibility.set(row_id, true);
2239                }
2240            }
2241            if !*found_matched {
2242                new_visibility.set(start_row_id, true);
2243            }
2244            *found_matched = false;
2245        }
2246
2247        let start_row_id = first_output_row_id.last().copied().unwrap_or_default();
2248        for row_id in start_row_id..filter.len() {
2249            if filter.is_set(row_id) {
2250                *found_matched = true;
2251                new_visibility.set(row_id, true);
2252            }
2253        }
2254        if !*has_more_output_rows && !*found_matched {
2255            new_visibility.set(start_row_id, true);
2256        }
2257
2258        first_output_row_id.clear();
2259
2260        for (output_row_id, (output_row_non_null, &build_row_id)) in
2261            filter.iter().zip_eq_fast(build_row_ids.iter()).enumerate()
2262        {
2263            if output_row_non_null {
2264                build_row_matched[build_row_id] = true;
2265                new_visibility.set(output_row_id, true);
2266            }
2267        }
2268
2269        build_row_ids.clear();
2270
2271        self.0
2272            .set_visibility(new_visibility.finish() & self.0.visibility());
2273        self
2274    }
2275
2276    fn take(self) -> DataChunk {
2277        self.0
2278    }
2279}
2280
2281impl BoxedExecutorBuilder for HashJoinExecutor<()> {
2282    async fn new_boxed_executor(
2283        context: &ExecutorBuilder<'_>,
2284        inputs: Vec<BoxedExecutor>,
2285    ) -> Result<BoxedExecutor> {
2286        let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap();
2287
2288        let hash_join_node = try_match_expand!(
2289            context.plan_node().get_node_body().unwrap(),
2290            NodeBody::HashJoin
2291        )?;
2292
2293        let join_type = JoinType::from_prost(hash_join_node.get_join_type()?);
2294
2295        let cond = match hash_join_node.get_condition() {
2296            Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
2297            Err(_) => None,
2298        };
2299
2300        let left_key_idxs = hash_join_node
2301            .get_left_key()
2302            .iter()
2303            .map(|&idx| idx as usize)
2304            .collect_vec();
2305        let right_key_idxs = hash_join_node
2306            .get_right_key()
2307            .iter()
2308            .map(|&idx| idx as usize)
2309            .collect_vec();
2310
2311        ensure!(left_key_idxs.len() == right_key_idxs.len());
2312
2313        let right_data_types = right_child.schema().data_types();
2314        let right_key_types = right_key_idxs
2315            .iter()
2316            .map(|&idx| right_data_types[idx].clone())
2317            .collect_vec();
2318
2319        let output_indices: Vec<usize> = hash_join_node
2320            .get_output_indices()
2321            .iter()
2322            .map(|&x| x as usize)
2323            .collect();
2324
2325        let identity = context.plan_node().get_identity().clone();
2326
2327        let asof_desc = hash_join_node
2328            .asof_desc
2329            .map(|desc| AsOfDesc::from_protobuf(&desc))
2330            .transpose()?;
2331
2332        Ok(HashJoinExecutorArgs {
2333            join_type,
2334            output_indices,
2335            probe_side_source: left_child,
2336            build_side_source: right_child,
2337            probe_key_idxs: left_key_idxs,
2338            build_key_idxs: right_key_idxs,
2339            null_matched: hash_join_node.get_null_safe().clone(),
2340            cond,
2341            identity: identity.clone(),
2342            right_key_types,
2343            chunk_size: context.context().get_config().developer.chunk_size,
2344            asof_desc,
2345            spill_backend: if context.context().get_config().enable_spill {
2346                Some(Disk)
2347            } else {
2348                None
2349            },
2350            spill_metrics: context.context().spill_metrics(),
2351            shutdown_rx: context.shutdown_rx().clone(),
2352            mem_ctx: context.context().create_executor_mem_context(&identity),
2353        }
2354        .dispatch())
2355    }
2356}
2357
2358struct HashJoinExecutorArgs {
2359    join_type: JoinType,
2360    output_indices: Vec<usize>,
2361    probe_side_source: BoxedExecutor,
2362    build_side_source: BoxedExecutor,
2363    probe_key_idxs: Vec<usize>,
2364    build_key_idxs: Vec<usize>,
2365    null_matched: Vec<bool>,
2366    cond: Option<BoxedExpression>,
2367    identity: String,
2368    right_key_types: Vec<DataType>,
2369    chunk_size: usize,
2370    asof_desc: Option<AsOfDesc>,
2371    spill_backend: Option<SpillBackend>,
2372    spill_metrics: Arc<BatchSpillMetrics>,
2373    shutdown_rx: ShutdownToken,
2374    mem_ctx: MemoryContext,
2375}
2376
2377impl HashKeyDispatcher for HashJoinExecutorArgs {
2378    type Output = BoxedExecutor;
2379
2380    fn dispatch_impl<K: HashKey>(self) -> Self::Output {
2381        Box::new(HashJoinExecutor::<K>::new(
2382            self.join_type,
2383            self.output_indices,
2384            self.probe_side_source,
2385            self.build_side_source,
2386            self.probe_key_idxs,
2387            self.build_key_idxs,
2388            self.null_matched,
2389            self.cond.map(Arc::new),
2390            self.identity,
2391            self.chunk_size,
2392            self.asof_desc,
2393            self.spill_backend,
2394            self.spill_metrics,
2395            self.shutdown_rx,
2396            self.mem_ctx,
2397        ))
2398    }
2399
2400    fn data_types(&self) -> &[DataType] {
2401        &self.right_key_types
2402    }
2403}
2404
2405impl<K> HashJoinExecutor<K> {
2406    #[allow(clippy::too_many_arguments)]
2407    pub fn new(
2408        join_type: JoinType,
2409        output_indices: Vec<usize>,
2410        probe_side_source: BoxedExecutor,
2411        build_side_source: BoxedExecutor,
2412        probe_key_idxs: Vec<usize>,
2413        build_key_idxs: Vec<usize>,
2414        null_matched: Vec<bool>,
2415        cond: Option<Arc<BoxedExpression>>,
2416        identity: String,
2417        chunk_size: usize,
2418        asof_desc: Option<AsOfDesc>,
2419        spill_backend: Option<SpillBackend>,
2420        spill_metrics: Arc<BatchSpillMetrics>,
2421        shutdown_rx: ShutdownToken,
2422        mem_ctx: MemoryContext,
2423    ) -> Self {
2424        Self::new_inner(
2425            join_type,
2426            output_indices,
2427            probe_side_source,
2428            build_side_source,
2429            probe_key_idxs,
2430            build_key_idxs,
2431            null_matched,
2432            cond,
2433            identity,
2434            chunk_size,
2435            asof_desc,
2436            spill_backend,
2437            spill_metrics,
2438            None,
2439            shutdown_rx,
2440            mem_ctx,
2441        )
2442    }
2443
2444    #[allow(clippy::too_many_arguments)]
2445    fn new_inner(
2446        join_type: JoinType,
2447        output_indices: Vec<usize>,
2448        probe_side_source: BoxedExecutor,
2449        build_side_source: BoxedExecutor,
2450        probe_key_idxs: Vec<usize>,
2451        build_key_idxs: Vec<usize>,
2452        null_matched: Vec<bool>,
2453        cond: Option<Arc<BoxedExpression>>,
2454        identity: String,
2455        chunk_size: usize,
2456        asof_desc: Option<AsOfDesc>,
2457        spill_backend: Option<SpillBackend>,
2458        spill_metrics: Arc<BatchSpillMetrics>,
2459        memory_upper_bound: Option<u64>,
2460        shutdown_rx: ShutdownToken,
2461        mem_ctx: MemoryContext,
2462    ) -> Self {
2463        assert_eq!(probe_key_idxs.len(), build_key_idxs.len());
2464        assert_eq!(probe_key_idxs.len(), null_matched.len());
2465        let original_schema = match join_type {
2466            JoinType::LeftSemi | JoinType::LeftAnti => probe_side_source.schema().clone(),
2467            JoinType::RightSemi | JoinType::RightAnti => build_side_source.schema().clone(),
2468            _ => Schema::from_iter(
2469                probe_side_source
2470                    .schema()
2471                    .fields()
2472                    .iter()
2473                    .chain(build_side_source.schema().fields().iter())
2474                    .cloned(),
2475            ),
2476        };
2477        let schema = Schema::from_iter(
2478            output_indices
2479                .iter()
2480                .map(|&idx| original_schema[idx].clone()),
2481        );
2482        Self {
2483            join_type,
2484            original_schema,
2485            schema,
2486            output_indices,
2487            probe_side_source,
2488            build_side_source,
2489            probe_key_idxs,
2490            build_key_idxs,
2491            null_matched,
2492            cond,
2493            identity,
2494            chunk_size,
2495            asof_desc,
2496            shutdown_rx,
2497            spill_backend,
2498            spill_metrics,
2499            memory_upper_bound,
2500            mem_ctx,
2501            _phantom: PhantomData,
2502        }
2503    }
2504}
2505
2506#[cfg(test)]
2507mod tests {
2508    use futures::StreamExt;
2509    use futures_async_stream::for_await;
2510    use itertools::Itertools;
2511    use risingwave_common::array::{ArrayBuilderImpl, DataChunk};
2512    use risingwave_common::catalog::{Field, Schema};
2513    use risingwave_common::hash::Key32;
2514    use risingwave_common::memory::MemoryContext;
2515    use risingwave_common::metrics::LabelGuardedIntGauge;
2516    use risingwave_common::test_prelude::DataChunkTestExt;
2517    use risingwave_common::types::DataType;
2518    use risingwave_common::util::iter_util::ZipEqDebug;
2519    use risingwave_common::util::memcmp_encoding::encode_chunk;
2520    use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
2521    use risingwave_expr::expr::{BoxedExpression, build_from_pretty};
2522
2523    use super::{
2524        ChunkedData, HashJoinExecutor, JoinType, LeftNonEquiJoinState, RightNonEquiJoinState, RowId,
2525    };
2526    use crate::error::Result;
2527    use crate::executor::BoxedExecutor;
2528    use crate::executor::test_utils::MockExecutor;
2529    use crate::monitor::BatchSpillMetrics;
2530    use crate::spill::spill_op::SpillBackend;
2531    use crate::task::ShutdownToken;
2532
2533    const CHUNK_SIZE: usize = 1024;
2534
2535    struct DataChunkMerger {
2536        array_builders: Vec<ArrayBuilderImpl>,
2537        array_len: usize,
2538    }
2539
2540    impl DataChunkMerger {
2541        fn new(data_types: Vec<DataType>) -> Result<Self> {
2542            let array_builders = data_types
2543                .iter()
2544                .map(|data_type| data_type.create_array_builder(CHUNK_SIZE))
2545                .collect();
2546
2547            Ok(Self {
2548                array_builders,
2549                array_len: 0,
2550            })
2551        }
2552
2553        fn append(&mut self, data_chunk: &DataChunk) -> Result<()> {
2554            ensure!(self.array_builders.len() == data_chunk.dimension());
2555            for idx in 0..self.array_builders.len() {
2556                self.array_builders[idx].append_array(data_chunk.column_at(idx));
2557            }
2558            self.array_len += data_chunk.capacity();
2559
2560            Ok(())
2561        }
2562
2563        fn finish(self) -> Result<DataChunk> {
2564            let columns = self
2565                .array_builders
2566                .into_iter()
2567                .map(|b| b.finish().into())
2568                .collect();
2569
2570            Ok(DataChunk::new(columns, self.array_len))
2571        }
2572    }
2573
2574    /// Sort each row in the data chunk and compare with the rows in the data chunk.
2575    fn compare_data_chunk_with_rowsort(left: &DataChunk, right: &DataChunk) -> bool {
2576        assert!(left.is_compacted());
2577        assert!(right.is_compacted());
2578
2579        if left.cardinality() != right.cardinality() {
2580            return false;
2581        }
2582
2583        // Sort and compare
2584        let column_orders = (0..left.columns().len())
2585            .map(|i| ColumnOrder::new(i, OrderType::ascending()))
2586            .collect_vec();
2587        let left_encoded_chunk = encode_chunk(left, &column_orders).unwrap();
2588        let mut sorted_left = left_encoded_chunk
2589            .into_iter()
2590            .enumerate()
2591            .map(|(row_id, row)| (left.row_at_unchecked_vis(row_id), row))
2592            .collect_vec();
2593        sorted_left.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
2594
2595        let right_encoded_chunk = encode_chunk(right, &column_orders).unwrap();
2596        let mut sorted_right = right_encoded_chunk
2597            .into_iter()
2598            .enumerate()
2599            .map(|(row_id, row)| (right.row_at_unchecked_vis(row_id), row))
2600            .collect_vec();
2601        sorted_right.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
2602
2603        sorted_left
2604            .into_iter()
2605            .map(|(row, _)| row)
2606            .zip_eq_debug(sorted_right.into_iter().map(|(row, _)| row))
2607            .all(|(row1, row2)| row1 == row2)
2608    }
2609
2610    struct TestFixture {
2611        left_types: Vec<DataType>,
2612        right_types: Vec<DataType>,
2613        join_type: JoinType,
2614    }
2615
2616    /// Sql for creating test data:
2617    /// ```sql
2618    /// drop table t1 if exists;
2619    /// create table t1(v1 int, v2 float);
2620    /// insert into t1 values
2621    /// (1, 6.1::FLOAT), (2, null), (null, 8.4::FLOAT), (3, 3.9::FLOAT), (null, null),
2622    /// (4, 6.6::FLOAT), (3, null), (null, 0.7::FLOAT), (5, null), (null, 5.5::FLOAT);
2623    ///
2624    /// drop table t2 if exists;
2625    /// create table t2(v1 int, v2 real);
2626    /// insert into t2 values
2627    /// (8, 6.1::REAL), (2, null), (null, 8.9::REAL), (3, null), (null, 3.5::REAL),
2628    /// (6, null), (4, 7.5::REAL), (6, null), (null, 8::REAL), (7, null),
2629    /// (null, 9.1::REAL), (9, null), (3, 3.7::REAL), (9, null), (null, 9.6::REAL),
2630    /// (100, null), (null, 8.18::REAL), (200, null);
2631    /// ```
2632    impl TestFixture {
2633        fn with_join_type(join_type: JoinType) -> Self {
2634            Self {
2635                left_types: vec![DataType::Int32, DataType::Float32],
2636                right_types: vec![DataType::Int32, DataType::Float64],
2637                join_type,
2638            }
2639        }
2640
2641        fn create_left_executor(&self) -> BoxedExecutor {
2642            let schema = Schema {
2643                fields: vec![
2644                    Field::unnamed(DataType::Int32),
2645                    Field::unnamed(DataType::Float32),
2646                ],
2647            };
2648            let mut executor = MockExecutor::new(schema);
2649
2650            executor.add(DataChunk::from_pretty(
2651                "i f
2652                 1 6.1
2653                 2 .
2654                 . 8.4
2655                 3 3.9
2656                 . .  ",
2657            ));
2658
2659            executor.add(DataChunk::from_pretty(
2660                "i f
2661                 4 6.6
2662                 3 .
2663                 . 0.7
2664                 5 .
2665                 . 5.5",
2666            ));
2667
2668            Box::new(executor)
2669        }
2670
2671        fn create_right_executor(&self) -> BoxedExecutor {
2672            let schema = Schema {
2673                fields: vec![
2674                    Field::unnamed(DataType::Int32),
2675                    Field::unnamed(DataType::Float64),
2676                ],
2677            };
2678            let mut executor = MockExecutor::new(schema);
2679
2680            executor.add(DataChunk::from_pretty(
2681                "i F
2682                 8 6.1
2683                 2 .
2684                 . 8.9
2685                 3 .
2686                 . 3.5
2687                 6 .  ",
2688            ));
2689
2690            executor.add(DataChunk::from_pretty(
2691                "i F
2692                 4 7.5
2693                 6 .
2694                 . 8
2695                 7 .
2696                 . 9.1
2697                 9 .  ",
2698            ));
2699
2700            executor.add(DataChunk::from_pretty(
2701                "  i F
2702                   3 3.7
2703                   9 .
2704                   . 9.6
2705                 100 .
2706                   . 8.18
2707                 200 .   ",
2708            ));
2709
2710            Box::new(executor)
2711        }
2712
2713        fn output_data_types(&self) -> Vec<DataType> {
2714            let join_type = self.join_type;
2715            if join_type.keep_all() {
2716                [self.left_types.clone(), self.right_types.clone()].concat()
2717            } else if join_type.keep_left() {
2718                self.left_types.clone()
2719            } else if join_type.keep_right() {
2720                self.right_types.clone()
2721            } else {
2722                unreachable!()
2723            }
2724        }
2725
2726        fn create_cond() -> BoxedExpression {
2727            build_from_pretty("(less_than:boolean $1:float4 $3:float8)")
2728        }
2729
2730        fn create_join_executor_with_chunk_size_and_executors(
2731            &self,
2732            has_non_equi_cond: bool,
2733            null_safe: bool,
2734            chunk_size: usize,
2735            left_child: BoxedExecutor,
2736            right_child: BoxedExecutor,
2737            shutdown_rx: ShutdownToken,
2738            parent_mem_ctx: Option<MemoryContext>,
2739            test_spill: bool,
2740        ) -> BoxedExecutor {
2741            let join_type = self.join_type;
2742
2743            let output_indices = (0..match join_type {
2744                JoinType::LeftSemi | JoinType::LeftAnti => left_child.schema().fields().len(),
2745                JoinType::RightSemi | JoinType::RightAnti => right_child.schema().fields().len(),
2746                _ => left_child.schema().fields().len() + right_child.schema().fields().len(),
2747            })
2748                .collect();
2749
2750            let cond = if has_non_equi_cond {
2751                Some(Self::create_cond().into())
2752            } else {
2753                None
2754            };
2755
2756            let mem_ctx = if test_spill {
2757                MemoryContext::new_with_mem_limit(
2758                    parent_mem_ctx,
2759                    LabelGuardedIntGauge::<4>::test_int_gauge(),
2760                    0,
2761                )
2762            } else {
2763                MemoryContext::new(parent_mem_ctx, LabelGuardedIntGauge::<4>::test_int_gauge())
2764            };
2765            Box::new(HashJoinExecutor::<Key32>::new(
2766                join_type,
2767                output_indices,
2768                left_child,
2769                right_child,
2770                vec![0],
2771                vec![0],
2772                vec![null_safe],
2773                cond,
2774                "HashJoinExecutor".to_owned(),
2775                chunk_size,
2776                None,
2777                if test_spill {
2778                    Some(SpillBackend::Memory)
2779                } else {
2780                    None
2781                },
2782                BatchSpillMetrics::for_test(),
2783                shutdown_rx,
2784                mem_ctx,
2785            ))
2786        }
2787
2788        async fn do_test(&self, expected: DataChunk, has_non_equi_cond: bool, null_safe: bool) {
2789            let left_executor = self.create_left_executor();
2790            let right_executor = self.create_right_executor();
2791            self.do_test_with_chunk_size_and_executors(
2792                expected.clone(),
2793                has_non_equi_cond,
2794                null_safe,
2795                self::CHUNK_SIZE,
2796                left_executor,
2797                right_executor,
2798                false,
2799            )
2800            .await;
2801
2802            // Test spill
2803            let left_executor = self.create_left_executor();
2804            let right_executor = self.create_right_executor();
2805            self.do_test_with_chunk_size_and_executors(
2806                expected,
2807                has_non_equi_cond,
2808                null_safe,
2809                self::CHUNK_SIZE,
2810                left_executor,
2811                right_executor,
2812                true,
2813            )
2814            .await;
2815        }
2816
2817        async fn do_test_with_chunk_size_and_executors(
2818            &self,
2819            expected: DataChunk,
2820            has_non_equi_cond: bool,
2821            null_safe: bool,
2822            chunk_size: usize,
2823            left_executor: BoxedExecutor,
2824            right_executor: BoxedExecutor,
2825            test_spill: bool,
2826        ) {
2827            let parent_mem_context =
2828                MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge(), u64::MAX);
2829
2830            {
2831                let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2832                    has_non_equi_cond,
2833                    null_safe,
2834                    chunk_size,
2835                    left_executor,
2836                    right_executor,
2837                    ShutdownToken::empty(),
2838                    Some(parent_mem_context.clone()),
2839                    test_spill,
2840                );
2841
2842                let mut data_chunk_merger = DataChunkMerger::new(self.output_data_types()).unwrap();
2843
2844                let fields = &join_executor.schema().fields;
2845
2846                if self.join_type.keep_all() {
2847                    assert_eq!(fields[1].data_type, DataType::Float32);
2848                    assert_eq!(fields[3].data_type, DataType::Float64);
2849                } else if self.join_type.keep_left() {
2850                    assert_eq!(fields[1].data_type, DataType::Float32);
2851                } else if self.join_type.keep_right() {
2852                    assert_eq!(fields[1].data_type, DataType::Float64)
2853                } else {
2854                    unreachable!()
2855                }
2856
2857                let mut stream = join_executor.execute();
2858
2859                while let Some(data_chunk) = stream.next().await {
2860                    let data_chunk = data_chunk.unwrap();
2861                    let data_chunk = data_chunk.compact();
2862                    data_chunk_merger.append(&data_chunk).unwrap();
2863                }
2864
2865                let result_chunk = data_chunk_merger.finish().unwrap();
2866                println!("expected: {:?}", expected);
2867                println!("result: {:?}", result_chunk);
2868
2869                // TODO: Replace this with unsorted comparison
2870                // assert_eq!(expected, result_chunk);
2871                assert!(compare_data_chunk_with_rowsort(&expected, &result_chunk));
2872            }
2873
2874            assert_eq!(0, parent_mem_context.get_bytes_used());
2875        }
2876
2877        async fn do_test_shutdown(&self, has_non_equi_cond: bool) {
2878            // Test `ShutdownMsg::Cancel`
2879            let left_executor = self.create_left_executor();
2880            let right_executor = self.create_right_executor();
2881            let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
2882            let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2883                has_non_equi_cond,
2884                false,
2885                self::CHUNK_SIZE,
2886                left_executor,
2887                right_executor,
2888                shutdown_rx,
2889                None,
2890                false,
2891            );
2892            shutdown_tx.cancel();
2893            #[for_await]
2894            for chunk in join_executor.execute() {
2895                assert!(chunk.is_err());
2896                break;
2897            }
2898
2899            // Test `ShutdownMsg::Abort`
2900            let left_executor = self.create_left_executor();
2901            let right_executor = self.create_right_executor();
2902            let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
2903            let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2904                has_non_equi_cond,
2905                false,
2906                self::CHUNK_SIZE,
2907                left_executor,
2908                right_executor,
2909                shutdown_rx,
2910                None,
2911                false,
2912            );
2913            shutdown_tx.abort("test");
2914            #[for_await]
2915            for chunk in join_executor.execute() {
2916                assert!(chunk.is_err());
2917                break;
2918            }
2919        }
2920    }
2921
2922    /// Sql:
2923    /// ```sql
2924    /// select * from t1 join t2 on t1.v1 = t2.v1;
2925    /// ```
2926    #[tokio::test]
2927    async fn test_inner_join() {
2928        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2929
2930        let expected_chunk = DataChunk::from_pretty(
2931            "i   f   i   F
2932             2   .   2   .
2933             3   3.9 3   3.7
2934             3   3.9 3   .
2935             4   6.6 4   7.5
2936             3   .   3   3.7
2937             3   .   3   .",
2938        );
2939
2940        test_fixture.do_test(expected_chunk, false, false).await;
2941    }
2942
2943    /// Sql:
2944    /// ```sql
2945    /// select * from t1 join t2 on t1.v1 is not distinct from t2.v1;
2946    /// ```
2947    #[tokio::test]
2948    async fn test_null_safe_inner_join() {
2949        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2950
2951        let expected_chunk = DataChunk::from_pretty(
2952            "i   f   i   F
2953             2    .  2     .
2954             .  8.4  .  8.18
2955             .  8.4  .  9.6
2956             .  8.4  .  9.1
2957             .  8.4  .  8
2958             .  8.4  .  3.5
2959             .  8.4  .  8.9
2960             3  3.9  3  3.7
2961             3  3.9  3     .
2962             .    .  .  8.18
2963             .    .  .  9.6
2964             .    .  .  9.1
2965             .    .  .  8
2966             .    .  .  3.5
2967             .    .  .  8.9
2968             4  6.6  4  7.5
2969             3    .  3  3.7
2970             3    .  3     .
2971             .  0.7  .  8.18
2972             .  0.7  .  9.6
2973             .  0.7  .  9.1
2974             .  0.7  .  8
2975             .  0.7  .  3.5
2976             .  0.7  .  8.9
2977             .  5.5  .  8.18
2978             .  5.5  .  9.6
2979             .  5.5  .  9.1
2980             .  5.5  .  8
2981             .  5.5  .  3.5
2982             .  5.5  .  8.9",
2983        );
2984
2985        test_fixture.do_test(expected_chunk, false, true).await;
2986    }
2987
2988    /// Sql:
2989    /// ```sql
2990    /// select * from t1 join t2 on t1.v1 = t2.v1 and t1.v2 < t2.v2;
2991    /// ```
2992    #[tokio::test]
2993    async fn test_inner_join_with_non_equi_condition() {
2994        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2995
2996        let expected_chunk = DataChunk::from_pretty(
2997            "i   f   i   F
2998             4   6.6 4   7.5",
2999        );
3000
3001        test_fixture.do_test(expected_chunk, true, false).await;
3002    }
3003
3004    /// Sql:
3005    /// ```sql
3006    /// select t1.v2 as t1_v2, t2.v2 as t2_v2 from t1 left outer join t2 on t1.v1 = t2.v1;
3007    /// ```
3008    #[tokio::test]
3009    async fn test_left_outer_join() {
3010        let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
3011
3012        let expected_chunk = DataChunk::from_pretty(
3013            "i   f   i   F
3014             1   6.1 .   .
3015             2   .   2   .
3016             .   8.4 .   .
3017             3   3.9 3   3.7
3018             3   3.9 3   .
3019             .   .   .   .
3020             4   6.6 4   7.5
3021             3   .   3   3.7
3022             3   .   3   .
3023             .   0.7 .   .
3024             5   .   .   .
3025             .   5.5 .   .",
3026        );
3027
3028        test_fixture.do_test(expected_chunk, false, false).await;
3029    }
3030
3031    /// Sql:
3032    /// ```sql
3033    /// select * from t1 left outer join t2 on t1.v1 = t2.v1 and t1.v2 < t2.v2;
3034    /// ```
3035    #[tokio::test]
3036    async fn test_left_outer_join_with_non_equi_condition() {
3037        let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
3038
3039        let expected_chunk = DataChunk::from_pretty(
3040            "i   f   i   F
3041             2   .   .   .
3042             3   3.9 .   .
3043             4   6.6 4   7.5
3044             3   .   .   .
3045             1   6.1 .   .
3046             .   8.4 .   .
3047             .   .   .   .
3048             .   0.7 .   .
3049             5   .   .   .
3050             .   5.5 .   .",
3051        );
3052
3053        test_fixture.do_test(expected_chunk, true, false).await;
3054    }
3055
3056    /// Sql:
3057    /// ```sql
3058    /// select * from t1 right outer join t2 on t1.v1 = t2.v1;
3059    /// ```
3060    #[tokio::test]
3061    async fn test_right_outer_join() {
3062        let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
3063
3064        let expected_chunk = DataChunk::from_pretty(
3065            "i   f   i   F
3066             2   .   2   .
3067             3   3.9 3   3.7
3068             3   3.9 3   .
3069             4   6.6 4   7.5
3070             3   .   3   3.7
3071             3   .   3   .
3072             .   .   8   6.1
3073             .   .   .   8.9
3074             .   .   .   3.5
3075             .   .   6   .
3076             .   .   6   .
3077             .   .   .   8
3078             .   .   7   .
3079             .   .   .   9.1
3080             .   .   9   .
3081             .   .   9   .
3082             .   .   .   9.6
3083             .   .   100 .
3084             .   .   .   8.18
3085             .   .   200 .",
3086        );
3087
3088        test_fixture.do_test(expected_chunk, false, false).await;
3089    }
3090
3091    /// Sql:
3092    /// ```sql
3093    /// select * from t1 left outer join t2 on t1.v1 = t2.v1 and t1.v2 < t2.v2;
3094    /// ```
3095    #[tokio::test]
3096    async fn test_right_outer_join_with_non_equi_condition() {
3097        let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
3098
3099        let expected_chunk = DataChunk::from_pretty(
3100            "i   f   i   F
3101             4   6.6 4   7.5
3102             .   .   8   6.1
3103             .   .   2   .
3104             .   .   .   8.9
3105             .   .   3   .
3106             .   .   .   3.5
3107             .   .   6   .
3108             .   .   6   .
3109             .   .   .   8
3110             .   .   7   .
3111             .   .   .   9.1
3112             .   .   9   .
3113             .   .   3   3.7
3114             .   .   9   .
3115             .   .   .   9.6
3116             .   .   100 .
3117             .   .   .   8.18
3118             .   .   200 .",
3119        );
3120
3121        test_fixture.do_test(expected_chunk, true, false).await;
3122    }
3123
3124    /// ```sql
3125    /// select * from t1 full outer join t2 on t1.v1 = t2.v1;
3126    /// ```
3127    #[tokio::test]
3128    async fn test_full_outer_join() {
3129        let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
3130
3131        let expected_chunk = DataChunk::from_pretty(
3132            "i   f   i   F
3133             1   6.1 .   .
3134             2   .   2   .
3135             .   8.4 .   .
3136             3   3.9 3   3.7
3137             3   3.9 3   .
3138             .   .   .   .
3139             4   6.6 4   7.5
3140             3   .   3   3.7
3141             3   .   3   .
3142             .   0.7 .   .
3143             5   .   .   .
3144             .   5.5 .   .
3145             .   .   8   6.1
3146             .   .   .   8.9
3147             .   .   .   3.5
3148             .   .   6   .
3149             .   .   6   .
3150             .   .   .   8
3151             .   .   7   .
3152             .   .   .   9.1
3153             .   .   9   .
3154             .   .   9   .
3155             .   .   .   9.6
3156             .   .   100 .
3157             .   .   .   8.18
3158             .   .   200 .",
3159        );
3160
3161        test_fixture.do_test(expected_chunk, false, false).await;
3162    }
3163
3164    /// ```sql
3165    /// select * from t1 full outer join t2 on t1.v1 = t2.v1 and t1.v2 < t2.v2;
3166    /// ```
3167    #[tokio::test]
3168    async fn test_full_outer_join_with_non_equi_condition() {
3169        let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
3170
3171        let expected_chunk = DataChunk::from_pretty(
3172            "i   f   i   F
3173             2   .   .   .
3174             3   3.9 .   .
3175             4   6.6 4   7.5
3176             3   .   .   .
3177             1   6.1 .   .
3178             .   8.4 .   .
3179             .   .   .   .
3180             .   0.7 .   .
3181             5   .   .   .
3182             .   5.5 .   .
3183             .   .   8   6.1
3184             .   .   2   .
3185             .   .   .   8.9
3186             .   .   3   .
3187             .   .   .   3.5
3188             .   .   6   .
3189             .   .   6   .
3190             .   .   .   8
3191             .   .   7   .
3192             .   .   .   9.1
3193             .   .   9   .
3194             .   .   3   3.7
3195             .   .   9   .
3196             .   .   .   9.6
3197             .   .   100 .
3198             .   .   .   8.18
3199             .   .   200 .",
3200        );
3201
3202        test_fixture.do_test(expected_chunk, true, false).await;
3203    }
3204
3205    #[tokio::test]
3206    async fn test_left_anti_join() {
3207        let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
3208
3209        let expected_chunk = DataChunk::from_pretty(
3210            "i   f
3211             1   6.1
3212             .   8.4
3213             .   .
3214             .   0.7
3215             5   .
3216             .   5.5",
3217        );
3218
3219        test_fixture.do_test(expected_chunk, false, false).await;
3220    }
3221
3222    #[tokio::test]
3223    async fn test_left_anti_join_with_non_equi_condition() {
3224        let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
3225
3226        let expected_chunk = DataChunk::from_pretty(
3227            "i   f
3228             2   .
3229             3   3.9
3230             3   .
3231             1   6.1
3232             .   8.4
3233             .   .
3234             .   0.7
3235             5   .
3236             .   5.5",
3237        );
3238
3239        test_fixture.do_test(expected_chunk, true, false).await;
3240    }
3241
3242    #[tokio::test]
3243    async fn test_left_semi_join() {
3244        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3245
3246        let expected_chunk = DataChunk::from_pretty(
3247            "i   f
3248             2   .
3249             3   3.9
3250             4   6.6
3251             3   .",
3252        );
3253
3254        test_fixture.do_test(expected_chunk, false, false).await;
3255    }
3256
3257    #[tokio::test]
3258    async fn test_left_semi_join_with_non_equi_condition() {
3259        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3260
3261        let expected_chunk = DataChunk::from_pretty(
3262            "i   f
3263             4   6.6",
3264        );
3265
3266        test_fixture.do_test(expected_chunk, true, false).await;
3267    }
3268
3269    /// Tests handling of edge case:
3270    /// Match is found for a probe_row,
3271    /// but there are still candidate rows in the iterator for that probe_row.
3272    /// These should not be buffered or we will have duplicate rows in output.
3273    #[tokio::test]
3274    async fn test_left_semi_join_with_non_equi_condition_duplicates() {
3275        let schema = Schema {
3276            fields: vec![
3277                Field::unnamed(DataType::Int32),
3278                Field::unnamed(DataType::Float32),
3279            ],
3280        };
3281
3282        // Build side
3283        let mut left_executor = MockExecutor::new(schema);
3284        left_executor.add(DataChunk::from_pretty(
3285            "i f
3286                 1 1.0
3287                 1 1.0
3288                 1 1.0
3289                 1 1.0
3290                 2 1.0",
3291        ));
3292
3293        // Probe side
3294        let schema = Schema {
3295            fields: vec![
3296                Field::unnamed(DataType::Int32),
3297                Field::unnamed(DataType::Float64),
3298            ],
3299        };
3300        let mut right_executor = MockExecutor::new(schema);
3301        right_executor.add(DataChunk::from_pretty(
3302            "i F
3303                 1 2.0
3304                 1 2.0
3305                 1 2.0
3306                 1 2.0
3307                 2 2.0",
3308        ));
3309
3310        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3311        let expected_chunk = DataChunk::from_pretty(
3312            "i f
3313            1 1.0
3314            1 1.0
3315            1 1.0
3316            1 1.0
3317            2 1.0",
3318        );
3319
3320        test_fixture
3321            .do_test_with_chunk_size_and_executors(
3322                expected_chunk,
3323                true,
3324                false,
3325                3,
3326                Box::new(left_executor),
3327                Box::new(right_executor),
3328                false,
3329            )
3330            .await;
3331    }
3332
3333    #[tokio::test]
3334    async fn test_right_anti_join() {
3335        let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
3336
3337        let expected_chunk = DataChunk::from_pretty(
3338            "i   F
3339             8   6.1
3340             .   8.9
3341             .   3.5
3342             6   .
3343             6   .
3344             .   8.0
3345             7   .
3346             .   9.1
3347             9   .
3348             9   .
3349             .   9.6
3350             100 .
3351             .   8.18
3352             200 .",
3353        );
3354
3355        test_fixture.do_test(expected_chunk, false, false).await;
3356    }
3357
3358    #[tokio::test]
3359    async fn test_right_anti_join_with_non_equi_condition() {
3360        let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
3361
3362        let expected_chunk = DataChunk::from_pretty(
3363            "i   F
3364             8   6.1
3365             2   .
3366             .   8.9
3367             3   .
3368             .   3.5
3369             6   .
3370             6   .
3371             .   8
3372             7   .
3373             .   9.1
3374             9   .
3375             3   3.7
3376             9   .
3377             .   9.6
3378             100 .
3379             .   8.18
3380             200 .",
3381        );
3382
3383        test_fixture.do_test(expected_chunk, true, false).await;
3384    }
3385
3386    #[tokio::test]
3387    async fn test_right_semi_join() {
3388        let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
3389
3390        let expected_chunk = DataChunk::from_pretty(
3391            "i   F
3392             2   .
3393             3   .
3394             4   7.5
3395             3   3.7",
3396        );
3397
3398        test_fixture.do_test(expected_chunk, false, false).await;
3399    }
3400
3401    #[tokio::test]
3402    async fn test_right_semi_join_with_non_equi_condition() {
3403        let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
3404
3405        let expected_chunk = DataChunk::from_pretty(
3406            "i   F
3407             4   7.5",
3408        );
3409
3410        test_fixture.do_test(expected_chunk, true, false).await;
3411    }
3412
3413    #[tokio::test]
3414    async fn test_process_left_outer_join_non_equi_condition() {
3415        let chunk = DataChunk::from_pretty(
3416            "i   f   i   F
3417             1   3.5 1   5.5
3418             1   3.5 1   2.5
3419             2   4.0 .   .
3420             3   5.0 3   4.0
3421             3   5.0 3   3.0
3422             3   5.0 3   4.0
3423             3   5.0 3   3.0
3424             4   1.0 4   0
3425             4   1.0 4   9.0",
3426        );
3427        let expect = DataChunk::from_pretty(
3428            "i   f   i   F
3429             1   3.5 1   5.5
3430             2   4.0 .   .
3431             3   5.0 .   .
3432             3   5.0 .   .
3433             4   1.0 4   9.0",
3434        );
3435        let cond = TestFixture::create_cond();
3436        let mut state = LeftNonEquiJoinState {
3437            probe_column_count: 2,
3438            first_output_row_id: vec![0, 2, 3, 5, 7],
3439            has_more_output_rows: true,
3440            found_matched: false,
3441        };
3442        assert!(compare_data_chunk_with_rowsort(
3443            &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3444                chunk,
3445                cond.as_ref(),
3446                &mut state
3447            )
3448            .await
3449            .unwrap()
3450            .compact(),
3451            &expect
3452        ));
3453        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3454        assert!(state.found_matched);
3455
3456        let chunk = DataChunk::from_pretty(
3457            "i   f   i   F
3458             4   1.0 4   0.6
3459             4   1.0 4   2.0
3460             5   4.0 5   .
3461             6   7.0 6   .
3462             6   7.0 6   5.0",
3463        );
3464        let expect = DataChunk::from_pretty(
3465            "i   f   i   F
3466             4   1.0 4   2.0
3467             5   4.0 .   .
3468             6   7.0 .   .",
3469        );
3470        state.first_output_row_id = vec![2, 3];
3471        state.has_more_output_rows = false;
3472        assert!(compare_data_chunk_with_rowsort(
3473            &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3474                chunk,
3475                cond.as_ref(),
3476                &mut state
3477            )
3478            .await
3479            .unwrap()
3480            .compact(),
3481            &expect
3482        ));
3483        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3484        assert!(!state.found_matched);
3485
3486        let chunk = DataChunk::from_pretty(
3487            "i   f   i   F
3488             4   1.0 4   0.6
3489             4   1.0 4   1.0
3490             5   4.0 5   .
3491             6   7.0 6   .
3492             6   7.0 6   8.0",
3493        );
3494        let expect = DataChunk::from_pretty(
3495            "i   f   i   F
3496             4   1.0 .   .
3497             5   4.0 .   .
3498             6   7.0 6   8.0",
3499        );
3500        state.first_output_row_id = vec![2, 3];
3501        state.has_more_output_rows = false;
3502        assert!(compare_data_chunk_with_rowsort(
3503            &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3504                chunk,
3505                cond.as_ref(),
3506                &mut state
3507            )
3508            .await
3509            .unwrap()
3510            .compact(),
3511            &expect
3512        ));
3513        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3514        assert!(!state.found_matched);
3515    }
3516
3517    #[tokio::test]
3518    async fn test_process_left_semi_join_non_equi_condition() {
3519        let chunk = DataChunk::from_pretty(
3520            "i   f   i   F
3521             1   3.5 1   5.5
3522             1   3.5 1   2.5
3523             2   4.0 .   .
3524             3   5.0 3   4.0
3525             3   5.0 3   3.0
3526             3   5.0 3   4.0
3527             3   5.0 3   3.0
3528             4   1.0 4   0
3529             4   1.0 4   0.5",
3530        );
3531        let expect = DataChunk::from_pretty(
3532            "i   f   i   F
3533             1   3.5 1   5.5",
3534        );
3535        let cond = TestFixture::create_cond();
3536        let mut state = LeftNonEquiJoinState {
3537            probe_column_count: 2,
3538            first_output_row_id: vec![0, 2, 3, 5, 7],
3539            found_matched: false,
3540            ..Default::default()
3541        };
3542        assert!(compare_data_chunk_with_rowsort(
3543            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3544                chunk,
3545                cond.as_ref(),
3546                &mut state
3547            )
3548            .await
3549            .unwrap()
3550            .compact(),
3551            &expect
3552        ));
3553        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3554        assert!(!state.found_matched);
3555
3556        let chunk = DataChunk::from_pretty(
3557            "i   f   i   F
3558             4   1.0 4   0.6
3559             4   1.0 4   2.0
3560             5   4.0 5   .
3561             6   7.0 6   .
3562             6   7.0 6   5.0",
3563        );
3564        let expect = DataChunk::from_pretty(
3565            "i   f   i   F
3566             4   1.0 4   2.0",
3567        );
3568        state.first_output_row_id = vec![2, 3];
3569        assert!(compare_data_chunk_with_rowsort(
3570            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3571                chunk,
3572                cond.as_ref(),
3573                &mut state
3574            )
3575            .await
3576            .unwrap()
3577            .compact(),
3578            &expect
3579        ));
3580        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3581        assert!(!state.found_matched);
3582
3583        let chunk = DataChunk::from_pretty(
3584            "i   f   i   F
3585             4   1.0 4   0.6
3586             4   1.0 4   1.0
3587             5   4.0 5   .
3588             6   7.0 6   .
3589             6   7.0 6   8.0",
3590        );
3591        let expect = DataChunk::from_pretty(
3592            "i   f   i   F
3593             6   7.0 6   8.0",
3594        );
3595        state.first_output_row_id = vec![2, 3];
3596        assert!(compare_data_chunk_with_rowsort(
3597            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3598                chunk,
3599                cond.as_ref(),
3600                &mut state
3601            )
3602            .await
3603            .unwrap()
3604            .compact(),
3605            &expect
3606        ));
3607        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3608    }
3609
3610    #[tokio::test]
3611    async fn test_process_left_anti_join_non_equi_condition() {
3612        let chunk = DataChunk::from_pretty(
3613            "i   f   i   F
3614             1   3.5 1   5.5
3615             1   3.5 1   2.5
3616             2   4.0 .   .
3617             3   5.0 3   4.0
3618             3   5.0 3   3.0
3619             3   5.0 3   4.0
3620             3   5.0 3   3.0
3621             4   1.0 4   0
3622             4   1.0 4   0.5",
3623        );
3624        let expect = DataChunk::from_pretty(
3625            "i   f   i   F
3626             2   4.0 .   .
3627             3   5.0 3   4.0
3628             3   5.0 3   4.0",
3629        );
3630        let cond = TestFixture::create_cond();
3631        let mut state = LeftNonEquiJoinState {
3632            probe_column_count: 2,
3633            first_output_row_id: vec![0, 2, 3, 5, 7],
3634            has_more_output_rows: true,
3635            found_matched: false,
3636        };
3637        assert!(compare_data_chunk_with_rowsort(
3638            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3639                chunk,
3640                cond.as_ref(),
3641                &mut state
3642            )
3643            .await
3644            .unwrap()
3645            .compact(),
3646            &expect
3647        ));
3648        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3649        assert!(!state.found_matched);
3650
3651        let chunk = DataChunk::from_pretty(
3652            "i   f   i   F
3653             4   1.0 4   0.6
3654             4   1.0 4   2.0
3655             5   4.0 5   .
3656             6   7.0 6   .
3657             6   7.0 6   5.0",
3658        );
3659        let expect = DataChunk::from_pretty(
3660            "i   f   i   F
3661             5   4.0 5   .
3662             6   7.0 6   .",
3663        );
3664        state.first_output_row_id = vec![2, 3];
3665        state.has_more_output_rows = false;
3666        assert!(compare_data_chunk_with_rowsort(
3667            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3668                chunk,
3669                cond.as_ref(),
3670                &mut state
3671            )
3672            .await
3673            .unwrap()
3674            .compact(),
3675            &expect
3676        ));
3677        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3678        assert!(!state.found_matched);
3679
3680        let chunk = DataChunk::from_pretty(
3681            "i   f   i   F
3682             4   1.0 4   0.6
3683             4   1.0 4   1.0
3684             5   4.0 5   .
3685             6   7.0 6   .
3686             6   7.0 6   8.0",
3687        );
3688        let expect = DataChunk::from_pretty(
3689            "i   f   i   F
3690             4   1.0 4   0.6
3691             5   4.0 5   .",
3692        );
3693        state.first_output_row_id = vec![2, 3];
3694        state.has_more_output_rows = false;
3695        assert!(compare_data_chunk_with_rowsort(
3696            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3697                chunk,
3698                cond.as_ref(),
3699                &mut state
3700            )
3701            .await
3702            .unwrap()
3703            .compact(),
3704            &expect
3705        ));
3706        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3707    }
3708
3709    #[tokio::test]
3710    async fn test_process_right_outer_join_non_equi_condition() {
3711        let chunk = DataChunk::from_pretty(
3712            "i   f   i   F
3713             1   3.5 1   5.5
3714             1   3.5 1   2.5
3715             3   5.0 3   4.0
3716             3   5.0 3   3.0
3717             3   5.0 3   4.0
3718             3   5.0 3   3.0
3719             4   1.0 4   0
3720             4   1.0 4   0.5",
3721        );
3722        let expect = DataChunk::from_pretty(
3723            "i   f   i   F
3724             1   3.5 1   5.5",
3725        );
3726        let cond = TestFixture::create_cond();
3727        // For simplicity, all rows are in one chunk.
3728        // Build side table
3729        // 0  - (1, 5.5)
3730        // 1  - (1, 2.5)
3731        // 2  - ?
3732        // 3  - (3, 4.0)
3733        // 4  - (3, 3.0)
3734        // 5  - (4, 0)
3735        // 6  - ?
3736        // 7  - (4, 0.5)
3737        // 8  - (4, 0.6)
3738        // 9  - (4, 2.0)
3739        // 10 - (5, .)
3740        // 11 - ?
3741        // 12 - (6, .)
3742        // 13 - (6, 5.0)
3743        // Rows with '?' are never matched here.
3744        let build_row_matched = ChunkedData::with_chunk_sizes([14].into_iter()).unwrap();
3745        let mut state = RightNonEquiJoinState {
3746            build_row_ids: vec![
3747                RowId::new(0, 0),
3748                RowId::new(0, 1),
3749                RowId::new(0, 3),
3750                RowId::new(0, 4),
3751                RowId::new(0, 3),
3752                RowId::new(0, 4),
3753                RowId::new(0, 5),
3754                RowId::new(0, 7),
3755            ],
3756            build_row_matched,
3757        };
3758        assert!(compare_data_chunk_with_rowsort(
3759            &HashJoinExecutor::<Key32>::process_right_outer_join_non_equi_condition(
3760                chunk,
3761                cond.as_ref(),
3762                &mut state
3763            )
3764            .await
3765            .unwrap()
3766            .compact(),
3767            &expect
3768        ));
3769        assert_eq!(state.build_row_ids, Vec::new());
3770        assert_eq!(
3771            state.build_row_matched,
3772            ChunkedData::try_from(vec![{
3773                let mut v = vec![false; 14];
3774                v[0] = true;
3775                v
3776            }])
3777            .unwrap()
3778        );
3779
3780        let chunk = DataChunk::from_pretty(
3781            "i   f   i   F
3782             4   1.0 4   0.6
3783             4   1.0 4   2.0
3784             5   4.0 5   .
3785             6   7.0 6   .
3786             6   7.0 6   5.0",
3787        );
3788        let expect = DataChunk::from_pretty(
3789            "i   f   i   F
3790             4   1.0 4   2.0",
3791        );
3792        state.build_row_ids = vec![
3793            RowId::new(0, 8),
3794            RowId::new(0, 9),
3795            RowId::new(0, 10),
3796            RowId::new(0, 12),
3797            RowId::new(0, 13),
3798        ];
3799        assert!(compare_data_chunk_with_rowsort(
3800            &HashJoinExecutor::<Key32>::process_right_outer_join_non_equi_condition(
3801                chunk,
3802                cond.as_ref(),
3803                &mut state
3804            )
3805            .await
3806            .unwrap()
3807            .compact(),
3808            &expect
3809        ));
3810        assert_eq!(state.build_row_ids, Vec::new());
3811        assert_eq!(
3812            state.build_row_matched,
3813            ChunkedData::try_from(vec![{
3814                let mut v = vec![false; 14];
3815                v[0] = true;
3816                v[9] = true;
3817                v
3818            }])
3819            .unwrap()
3820        );
3821    }
3822
3823    #[tokio::test]
3824    async fn test_process_right_semi_anti_join_non_equi_condition() {
3825        let chunk = DataChunk::from_pretty(
3826            "i   f   i   F
3827             1   3.5 1   5.5
3828             1   3.5 1   2.5
3829             3   5.0 3   4.0
3830             3   5.0 3   3.0
3831             3   5.0 3   4.0
3832             3   5.0 3   3.0
3833             4   1.0 4   0
3834             4   1.0 4   0.5",
3835        );
3836        let cond = TestFixture::create_cond();
3837        let build_row_matched = ChunkedData::with_chunk_sizes([14].into_iter()).unwrap();
3838        let mut state = RightNonEquiJoinState {
3839            build_row_ids: vec![
3840                RowId::new(0, 0),
3841                RowId::new(0, 1),
3842                RowId::new(0, 3),
3843                RowId::new(0, 4),
3844                RowId::new(0, 3),
3845                RowId::new(0, 4),
3846                RowId::new(0, 5),
3847                RowId::new(0, 7),
3848            ],
3849            build_row_matched,
3850        };
3851
3852        assert!(
3853            HashJoinExecutor::<Key32>::process_right_semi_anti_join_non_equi_condition(
3854                chunk,
3855                cond.as_ref(),
3856                &mut state
3857            )
3858            .await
3859            .is_ok()
3860        );
3861        assert_eq!(state.build_row_ids, Vec::new());
3862        assert_eq!(
3863            state.build_row_matched,
3864            ChunkedData::try_from(vec![{
3865                let mut v = vec![false; 14];
3866                v[0] = true;
3867                v
3868            }])
3869            .unwrap()
3870        );
3871
3872        let chunk = DataChunk::from_pretty(
3873            "i   f   i   F
3874             4   1.0 4   0.6
3875             4   1.0 4   2.0
3876             5   4.0 5   .
3877             6   7.0 6   .
3878             6   7.0 6   5.0",
3879        );
3880        state.build_row_ids = vec![
3881            RowId::new(0, 8),
3882            RowId::new(0, 9),
3883            RowId::new(0, 10),
3884            RowId::new(0, 12),
3885            RowId::new(0, 13),
3886        ];
3887        assert!(
3888            HashJoinExecutor::<Key32>::process_right_semi_anti_join_non_equi_condition(
3889                chunk,
3890                cond.as_ref(),
3891                &mut state
3892            )
3893            .await
3894            .is_ok()
3895        );
3896        assert_eq!(state.build_row_ids, Vec::new());
3897        assert_eq!(
3898            state.build_row_matched,
3899            ChunkedData::try_from(vec![{
3900                let mut v = vec![false; 14];
3901                v[0] = true;
3902                v[9] = true;
3903                v
3904            }])
3905            .unwrap()
3906        );
3907    }
3908
3909    #[tokio::test]
3910    async fn test_process_full_outer_join_non_equi_condition() {
3911        let chunk = DataChunk::from_pretty(
3912            "i   f   i   F
3913             1   3.5 1   5.5
3914             1   3.5 1   2.5
3915             3   5.0 3   4.0
3916             3   5.0 3   3.0
3917             3   5.0 3   4.0
3918             3   5.0 3   3.0
3919             4   1.0 4   0
3920             4   1.0 4   0.5",
3921        );
3922        let expect = DataChunk::from_pretty(
3923            "i   f   i   F
3924             1   3.5 1   5.5
3925             3   5.0 .   .
3926             3   5.0 .   .",
3927        );
3928        let cond = TestFixture::create_cond();
3929        let mut left_state = LeftNonEquiJoinState {
3930            probe_column_count: 2,
3931            first_output_row_id: vec![0, 2, 4, 6],
3932            has_more_output_rows: true,
3933            found_matched: false,
3934        };
3935        let mut right_state = RightNonEquiJoinState {
3936            build_row_ids: vec![
3937                RowId::new(0, 0),
3938                RowId::new(0, 1),
3939                RowId::new(0, 3),
3940                RowId::new(0, 4),
3941                RowId::new(0, 3),
3942                RowId::new(0, 4),
3943                RowId::new(0, 5),
3944                RowId::new(0, 7),
3945            ],
3946            build_row_matched: ChunkedData::with_chunk_sizes([14].into_iter()).unwrap(),
3947        };
3948        assert!(compare_data_chunk_with_rowsort(
3949            &HashJoinExecutor::<Key32>::process_full_outer_join_non_equi_condition(
3950                chunk,
3951                cond.as_ref(),
3952                &mut left_state,
3953                &mut right_state,
3954            )
3955            .await
3956            .unwrap()
3957            .compact(),
3958            &expect
3959        ));
3960        assert_eq!(left_state.first_output_row_id, Vec::<usize>::new());
3961        assert!(!left_state.found_matched);
3962        assert_eq!(right_state.build_row_ids, Vec::new());
3963        assert_eq!(
3964            right_state.build_row_matched,
3965            ChunkedData::try_from(vec![{
3966                let mut v = vec![false; 14];
3967                v[0] = true;
3968                v
3969            }])
3970            .unwrap()
3971        );
3972
3973        let chunk = DataChunk::from_pretty(
3974            "i   f   i   F
3975             4   1.0 4   0.6
3976             4   1.0 4   2.0
3977             5   4.0 5   .
3978             6   7.0 6   .
3979             6   7.0 6   8.0",
3980        );
3981        let expect = DataChunk::from_pretty(
3982            "i   f   i   F
3983             4   1.0 4   2.0
3984             5   4.0 .   .
3985             6   7.0 6   8.0",
3986        );
3987        left_state.first_output_row_id = vec![2, 3];
3988        left_state.has_more_output_rows = false;
3989        right_state.build_row_ids = vec![
3990            RowId::new(0, 8),
3991            RowId::new(0, 9),
3992            RowId::new(0, 10),
3993            RowId::new(0, 12),
3994            RowId::new(0, 13),
3995        ];
3996        assert!(compare_data_chunk_with_rowsort(
3997            &HashJoinExecutor::<Key32>::process_full_outer_join_non_equi_condition(
3998                chunk,
3999                cond.as_ref(),
4000                &mut left_state,
4001                &mut right_state,
4002            )
4003            .await
4004            .unwrap()
4005            .compact(),
4006            &expect
4007        ));
4008        assert_eq!(left_state.first_output_row_id, Vec::<usize>::new());
4009        assert!(left_state.found_matched);
4010        assert_eq!(right_state.build_row_ids, Vec::new());
4011        assert_eq!(
4012            right_state.build_row_matched,
4013            ChunkedData::try_from(vec![{
4014                let mut v = vec![false; 14];
4015                v[0] = true;
4016                v[9] = true;
4017                v[13] = true;
4018                v
4019            }])
4020            .unwrap()
4021        );
4022    }
4023
4024    #[tokio::test]
4025    async fn test_shutdown() {
4026        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
4027        test_fixture.do_test_shutdown(false).await;
4028        test_fixture.do_test_shutdown(true).await;
4029
4030        let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
4031        test_fixture.do_test_shutdown(false).await;
4032        test_fixture.do_test_shutdown(true).await;
4033
4034        let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
4035        test_fixture.do_test_shutdown(false).await;
4036        test_fixture.do_test_shutdown(true).await;
4037
4038        let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
4039        test_fixture.do_test_shutdown(false).await;
4040        test_fixture.do_test_shutdown(true).await;
4041
4042        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
4043        test_fixture.do_test_shutdown(false).await;
4044        test_fixture.do_test_shutdown(true).await;
4045
4046        let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
4047        test_fixture.do_test_shutdown(false).await;
4048        test_fixture.do_test_shutdown(true).await;
4049
4050        let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
4051        test_fixture.do_test_shutdown(false).await;
4052        test_fixture.do_test_shutdown(true).await;
4053
4054        let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
4055        test_fixture.do_test_shutdown(false).await;
4056        test_fixture.do_test_shutdown(true).await;
4057    }
4058}