risingwave_batch_executors/executor/join/
hash_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::iter;
17use std::iter::empty;
18use std::marker::PhantomData;
19use std::sync::Arc;
20
21use bytes::Bytes;
22use futures_async_stream::try_stream;
23use itertools::Itertools;
24use risingwave_common::array::{Array, DataChunk, RowRef};
25use risingwave_common::bitmap::{Bitmap, BitmapBuilder, FilterByBitmap};
26use risingwave_common::catalog::Schema;
27use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher};
28use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc};
29use risingwave_common::row::{Row, RowExt, repeat_n};
30use risingwave_common::types::{DataType, Datum, DefaultOrd};
31use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
32use risingwave_common::util::iter_util::ZipEqFast;
33use risingwave_common_estimate_size::EstimateSize;
34use risingwave_expr::expr::{BoxedExpression, Expression, build_from_prost};
35use risingwave_pb::Message;
36use risingwave_pb::batch_plan::plan_node::NodeBody;
37use risingwave_pb::data::DataChunk as PbDataChunk;
38
39use super::{AsOfDesc, AsOfInequalityType, ChunkedData, JoinType, RowId};
40use crate::error::{BatchError, Result};
41use crate::executor::{
42    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
43    WrapStreamExecutor,
44};
45use crate::monitor::BatchSpillMetrics;
46use crate::risingwave_common::hash::NullBitmap;
47use crate::spill::spill_op::SpillBackend::Disk;
48use crate::spill::spill_op::{
49    DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY, SpillBackend, SpillBuildHasher, SpillOp,
50};
51use crate::task::ShutdownToken;
52
53/// Hash Join Executor
54///
55/// High-level idea:
56/// 1. Iterate over the build side (i.e. right table) and build a hash map.
57/// 2. Iterate over the probe side (i.e. left table) and compute the hash value of each row.
58///    Then find the matched build side row for each probe side row in the hash map.
59/// 3. Concatenate the matched pair of probe side row and build side row into a single row and push
60///    it into the data chunk builder.
61/// 4. Yield chunks from the builder.
62pub struct HashJoinExecutor<K> {
63    /// Join type e.g. inner, left outer, ...
64    join_type: JoinType,
65    /// Output schema without applying `output_indices`
66    #[expect(dead_code)]
67    original_schema: Schema,
68    /// Output schema after applying `output_indices`
69    schema: Schema,
70    /// `output_indices` are the indices of the columns that we needed.
71    output_indices: Vec<usize>,
72    /// Left child executor
73    probe_side_source: BoxedExecutor,
74    /// Right child executor
75    build_side_source: BoxedExecutor,
76    /// Column indices of left keys in equi join
77    probe_key_idxs: Vec<usize>,
78    /// Column indices of right keys in equi join
79    build_key_idxs: Vec<usize>,
80    /// Non-equi join condition (optional)
81    cond: Option<Arc<BoxedExpression>>,
82    /// Whether or not to enable 'IS NOT DISTINCT FROM' semantics for a specific probe/build key
83    /// column
84    null_matched: Vec<bool>,
85    identity: String,
86    chunk_size: usize,
87    /// Whether the join is an as-of join
88    asof_desc: Option<AsOfDesc>,
89
90    spill_backend: Option<SpillBackend>,
91    spill_metrics: Arc<BatchSpillMetrics>,
92    /// The upper bound of memory usage for this executor.
93    memory_upper_bound: Option<u64>,
94
95    shutdown_rx: ShutdownToken,
96
97    mem_ctx: MemoryContext,
98    _phantom: PhantomData<K>,
99}
100
101impl<K: HashKey> Executor for HashJoinExecutor<K> {
102    fn schema(&self) -> &Schema {
103        &self.schema
104    }
105
106    fn identity(&self) -> &str {
107        &self.identity
108    }
109
110    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
111        self.do_execute()
112    }
113}
114
115/// In `JoinHashMap`, we only save the row id of the first build row that has the hash key.
116/// In fact, in the build side there may be multiple rows with the same hash key. To handle this
117/// case, we use `ChunkedData` to link them together. For example:
118///
119/// | id | key | row |
120/// | --- | --- | --- |
121/// | 0 | 1 | (1, 2, 3) |
122/// | 1 | 4 | (4, 5, 6) |
123/// | 2 | 1 | (1, 3, 7) |
124/// | 3 | 1 | (1, 3, 2) |
125/// | 4 | 3 | (3, 2, 1) |
126///
127/// The corresponding join hash map is:
128///
129/// | key | value |
130/// | --- | --- |
131/// | 1 | 0 |
132/// | 4 | 1 |
133/// | 3 | 4 |
134///
135/// And we save build rows with the same key like this:
136///
137/// | id | value |
138/// | --- | --- |
139/// | 0 | 2 |
140/// | 1 | None |
141/// | 2 | 3 |
142/// | 3 | None |
143/// | 4 | None |
144///
145/// This can be seen as an implicit linked list. For convenience, we use `RowIdIter` to iterate all
146/// build side row ids with the given key.
147pub type JoinHashMap<K> =
148    hashbrown::HashMap<K, RowId, PrecomputedBuildHasher, MonitoredGlobalAlloc>;
149
150struct RowIdIter<'a> {
151    current_row_id: Option<RowId>,
152    next_row_id: &'a ChunkedData<Option<RowId>>,
153}
154
155impl ChunkedData<Option<RowId>> {
156    fn row_id_iter(&self, begin: Option<RowId>) -> RowIdIter<'_> {
157        RowIdIter {
158            current_row_id: begin,
159            next_row_id: self,
160        }
161    }
162}
163
164impl Iterator for RowIdIter<'_> {
165    type Item = RowId;
166
167    fn next(&mut self) -> Option<Self::Item> {
168        self.current_row_id.inspect(|row_id| {
169            self.current_row_id = self.next_row_id[*row_id];
170        })
171    }
172}
173
174pub struct EquiJoinParams<K> {
175    probe_side: BoxedExecutor,
176    probe_data_types: Vec<DataType>,
177    probe_key_idxs: Vec<usize>,
178    build_side: Vec<DataChunk, MonitoredGlobalAlloc>,
179    build_data_types: Vec<DataType>,
180    full_data_types: Vec<DataType>,
181    hash_map: JoinHashMap<K>,
182    next_build_row_with_same_key: ChunkedData<Option<RowId>>,
183    chunk_size: usize,
184    shutdown_rx: ShutdownToken,
185    asof_desc: Option<AsOfDesc>,
186}
187
188impl<K> EquiJoinParams<K> {
189    #[allow(clippy::too_many_arguments)]
190    pub(super) fn new(
191        probe_side: BoxedExecutor,
192        probe_data_types: Vec<DataType>,
193        probe_key_idxs: Vec<usize>,
194        build_side: Vec<DataChunk, MonitoredGlobalAlloc>,
195        build_data_types: Vec<DataType>,
196        full_data_types: Vec<DataType>,
197        hash_map: JoinHashMap<K>,
198        next_build_row_with_same_key: ChunkedData<Option<RowId>>,
199        chunk_size: usize,
200        shutdown_rx: ShutdownToken,
201        asof_desc: Option<AsOfDesc>,
202    ) -> Self {
203        Self {
204            probe_side,
205            probe_data_types,
206            probe_key_idxs,
207            build_side,
208            build_data_types,
209            full_data_types,
210            hash_map,
211            next_build_row_with_same_key,
212            chunk_size,
213            shutdown_rx,
214            asof_desc,
215        }
216    }
217
218    pub(crate) fn is_asof_join(&self) -> bool {
219        self.asof_desc.is_some()
220    }
221}
222
223/// State variables used in left outer/semi/anti join and full outer join.
224#[derive(Default)]
225struct LeftNonEquiJoinState {
226    /// The number of columns in probe side.
227    probe_column_count: usize,
228    /// The offset of the first output row in **current** chunk for each probe side row that has
229    /// been processed.
230    first_output_row_id: Vec<usize>,
231    /// Whether the probe row being processed currently has output rows in **next** output chunk.
232    has_more_output_rows: bool,
233    /// Whether the probe row being processed currently has matched non-NULL build rows in **last**
234    /// output chunk.
235    found_matched: bool,
236}
237
238/// State variables used in right outer/semi/anti join and full outer join.
239#[derive(Default)]
240struct RightNonEquiJoinState {
241    /// Corresponding build row id for each row in **current** output chunk.
242    build_row_ids: Vec<RowId>,
243    /// Whether a build row has been matched.
244    build_row_matched: ChunkedData<bool>,
245}
246
247pub struct JoinSpillManager {
248    op: SpillOp,
249    partition_num: usize,
250    probe_side_writers: Vec<opendal::Writer>,
251    build_side_writers: Vec<opendal::Writer>,
252    probe_side_chunk_builders: Vec<DataChunkBuilder>,
253    build_side_chunk_builders: Vec<DataChunkBuilder>,
254    spill_build_hasher: SpillBuildHasher,
255    probe_side_data_types: Vec<DataType>,
256    build_side_data_types: Vec<DataType>,
257    spill_chunk_size: usize,
258    spill_metrics: Arc<BatchSpillMetrics>,
259}
260
261/// `JoinSpillManager` is used to manage how to write spill data file and read them back.
262/// The spill data first need to be partitioned. Each partition contains 2 files: `join_probe_side_file` and `join_build_side_file`.
263/// The spill file consume a data chunk and serialize the chunk into a protobuf bytes.
264/// Finally, spill file content will look like the below.
265/// The file write pattern is append-only and the read pattern is sequential scan.
266/// This can maximize the disk IO performance.
267///
268/// ```text
269/// [proto_len]
270/// [proto_bytes]
271/// ...
272/// [proto_len]
273/// [proto_bytes]
274/// ```
275impl JoinSpillManager {
276    pub fn new(
277        spill_backend: SpillBackend,
278        join_identity: &String,
279        partition_num: usize,
280        probe_side_data_types: Vec<DataType>,
281        build_side_data_types: Vec<DataType>,
282        spill_chunk_size: usize,
283        spill_metrics: Arc<BatchSpillMetrics>,
284    ) -> Result<Self> {
285        let suffix_uuid = uuid::Uuid::new_v4();
286        let dir = format!("/{}-{}/", join_identity, suffix_uuid);
287        let op = SpillOp::create(dir, spill_backend)?;
288        let probe_side_writers = Vec::with_capacity(partition_num);
289        let build_side_writers = Vec::with_capacity(partition_num);
290        let probe_side_chunk_builders = Vec::with_capacity(partition_num);
291        let build_side_chunk_builders = Vec::with_capacity(partition_num);
292        let spill_build_hasher = SpillBuildHasher(suffix_uuid.as_u64_pair().1);
293        Ok(Self {
294            op,
295            partition_num,
296            probe_side_writers,
297            build_side_writers,
298            probe_side_chunk_builders,
299            build_side_chunk_builders,
300            spill_build_hasher,
301            probe_side_data_types,
302            build_side_data_types,
303            spill_chunk_size,
304            spill_metrics,
305        })
306    }
307
308    pub async fn init_writers(&mut self) -> Result<()> {
309        for i in 0..self.partition_num {
310            let join_probe_side_partition_file_name = format!("join-probe-side-p{}", i);
311            let w = self
312                .op
313                .writer_with(&join_probe_side_partition_file_name)
314                .await?;
315            self.probe_side_writers.push(w);
316
317            let join_build_side_partition_file_name = format!("join-build-side-p{}", i);
318            let w = self
319                .op
320                .writer_with(&join_build_side_partition_file_name)
321                .await?;
322            self.build_side_writers.push(w);
323            self.probe_side_chunk_builders.push(DataChunkBuilder::new(
324                self.probe_side_data_types.clone(),
325                self.spill_chunk_size,
326            ));
327            self.build_side_chunk_builders.push(DataChunkBuilder::new(
328                self.build_side_data_types.clone(),
329                self.spill_chunk_size,
330            ));
331        }
332        Ok(())
333    }
334
335    pub async fn write_probe_side_chunk(
336        &mut self,
337        chunk: DataChunk,
338        hash_codes: Vec<u64>,
339    ) -> Result<()> {
340        let (columns, vis) = chunk.into_parts_v2();
341        for partition in 0..self.partition_num {
342            let new_vis = vis.clone()
343                & Bitmap::from_iter(
344                    hash_codes
345                        .iter()
346                        .map(|hash_code| (*hash_code as usize % self.partition_num) == partition),
347                );
348            let new_chunk = DataChunk::from_parts(columns.clone(), new_vis);
349            for output_chunk in self.probe_side_chunk_builders[partition].append_chunk(new_chunk) {
350                let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
351                let buf = Message::encode_to_vec(&chunk_pb);
352                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
353                self.spill_metrics
354                    .batch_spill_write_bytes
355                    .inc_by((buf.len() + len_bytes.len()) as u64);
356                self.probe_side_writers[partition].write(len_bytes).await?;
357                self.probe_side_writers[partition].write(buf).await?;
358            }
359        }
360        Ok(())
361    }
362
363    pub async fn write_build_side_chunk(
364        &mut self,
365        chunk: DataChunk,
366        hash_codes: Vec<u64>,
367    ) -> Result<()> {
368        let (columns, vis) = chunk.into_parts_v2();
369        for partition in 0..self.partition_num {
370            let new_vis = vis.clone()
371                & Bitmap::from_iter(
372                    hash_codes
373                        .iter()
374                        .map(|hash_code| (*hash_code as usize % self.partition_num) == partition),
375                );
376            let new_chunk = DataChunk::from_parts(columns.clone(), new_vis);
377            for output_chunk in self.build_side_chunk_builders[partition].append_chunk(new_chunk) {
378                let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
379                let buf = Message::encode_to_vec(&chunk_pb);
380                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
381                self.spill_metrics
382                    .batch_spill_write_bytes
383                    .inc_by((buf.len() + len_bytes.len()) as u64);
384                self.build_side_writers[partition].write(len_bytes).await?;
385                self.build_side_writers[partition].write(buf).await?;
386            }
387        }
388        Ok(())
389    }
390
391    pub async fn close_writers(&mut self) -> Result<()> {
392        for partition in 0..self.partition_num {
393            if let Some(output_chunk) = self.probe_side_chunk_builders[partition].consume_all() {
394                let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
395                let buf = Message::encode_to_vec(&chunk_pb);
396                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
397                self.spill_metrics
398                    .batch_spill_write_bytes
399                    .inc_by((buf.len() + len_bytes.len()) as u64);
400                self.probe_side_writers[partition].write(len_bytes).await?;
401                self.probe_side_writers[partition].write(buf).await?;
402            }
403
404            if let Some(output_chunk) = self.build_side_chunk_builders[partition].consume_all() {
405                let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
406                let buf = Message::encode_to_vec(&chunk_pb);
407                let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
408                self.spill_metrics
409                    .batch_spill_write_bytes
410                    .inc_by((buf.len() + len_bytes.len()) as u64);
411                self.build_side_writers[partition].write(len_bytes).await?;
412                self.build_side_writers[partition].write(buf).await?;
413            }
414        }
415
416        for mut w in self.probe_side_writers.drain(..) {
417            w.close().await?;
418        }
419        for mut w in self.build_side_writers.drain(..) {
420            w.close().await?;
421        }
422        Ok(())
423    }
424
425    async fn read_probe_side_partition(
426        &mut self,
427        partition: usize,
428    ) -> Result<BoxedDataChunkStream> {
429        let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
430        let r = self
431            .op
432            .reader_with(&join_probe_side_partition_file_name)
433            .await?;
434        Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
435    }
436
437    async fn read_build_side_partition(
438        &mut self,
439        partition: usize,
440    ) -> Result<BoxedDataChunkStream> {
441        let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
442        let r = self
443            .op
444            .reader_with(&join_build_side_partition_file_name)
445            .await?;
446        Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
447    }
448
449    pub async fn estimate_partition_size(&self, partition: usize) -> Result<u64> {
450        let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
451        let probe_size = self
452            .op
453            .stat(&join_probe_side_partition_file_name)
454            .await?
455            .content_length();
456        let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
457        let build_size = self
458            .op
459            .stat(&join_build_side_partition_file_name)
460            .await?
461            .content_length();
462        Ok(probe_size + build_size)
463    }
464
465    async fn clear_partition(&mut self, partition: usize) -> Result<()> {
466        let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
467        self.op.delete(&join_probe_side_partition_file_name).await?;
468        let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
469        self.op.delete(&join_build_side_partition_file_name).await?;
470        Ok(())
471    }
472}
473
474impl<K: HashKey> HashJoinExecutor<K> {
475    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
476    async fn do_execute(self: Box<Self>) {
477        let mut need_to_spill = false;
478        // If the memory upper bound is less than 1MB, we don't need to check memory usage.
479        let check_memory = match self.memory_upper_bound {
480            Some(upper_bound) => upper_bound > SPILL_AT_LEAST_MEMORY,
481            None => true,
482        };
483
484        let probe_schema = self.probe_side_source.schema().clone();
485        let build_schema = self.build_side_source.schema().clone();
486        let probe_data_types = self.probe_side_source.schema().data_types();
487        let build_data_types = self.build_side_source.schema().data_types();
488        let full_data_types = [probe_data_types.clone(), build_data_types.clone()].concat();
489
490        let mut build_side = Vec::new_in(self.mem_ctx.global_allocator());
491        let mut build_row_count = 0;
492        let mut build_side_stream = self.build_side_source.execute();
493        #[for_await]
494        for build_chunk in &mut build_side_stream {
495            let build_chunk = build_chunk?;
496            if build_chunk.cardinality() > 0 {
497                build_row_count += build_chunk.cardinality();
498                let chunk_estimated_heap_size = build_chunk.estimated_heap_size();
499                // push build_chunk to build_side before checking memory limit, otherwise we will lose that chunk when spilling.
500                build_side.push(build_chunk);
501                if !self.mem_ctx.add(chunk_estimated_heap_size as i64) && check_memory {
502                    if self.spill_backend.is_some() {
503                        need_to_spill = true;
504                        break;
505                    } else {
506                        Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
507                    }
508                }
509            }
510        }
511        let mut hash_map = JoinHashMap::with_capacity_and_hasher_in(
512            build_row_count,
513            PrecomputedBuildHasher,
514            self.mem_ctx.global_allocator(),
515        );
516        let mut next_build_row_with_same_key =
517            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
518
519        let null_matched = K::Bitmap::from_bool_vec(self.null_matched.clone());
520
521        let mut mem_added_by_hash_table = 0;
522        if !need_to_spill {
523            // Build hash map
524            for (build_chunk_id, build_chunk) in build_side.iter().enumerate() {
525                let build_keys = K::build_many(&self.build_key_idxs, build_chunk);
526
527                for (build_row_id, build_key) in build_keys
528                    .into_iter()
529                    .enumerate()
530                    .filter_by_bitmap(build_chunk.visibility())
531                {
532                    self.shutdown_rx.check()?;
533                    // Only insert key to hash map if it is consistent with the null safe restriction.
534                    if build_key.null_bitmap().is_subset(&null_matched) {
535                        let row_id = RowId::new(build_chunk_id, build_row_id);
536                        let build_key_size = build_key.estimated_heap_size() as i64;
537                        mem_added_by_hash_table += build_key_size;
538                        if !self.mem_ctx.add(build_key_size) && check_memory {
539                            if self.spill_backend.is_some() {
540                                need_to_spill = true;
541                                break;
542                            } else {
543                                Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
544                            }
545                        }
546                        next_build_row_with_same_key[row_id] = hash_map.insert(build_key, row_id);
547                    }
548                }
549            }
550        }
551
552        if need_to_spill {
553            // A spilling version of hash join based on the RFC: Spill Hash Join https://github.com/risingwavelabs/rfcs/pull/91
554            // When HashJoinExecutor told memory is insufficient, JoinSpillManager will start to partition the hash table and spill to disk.
555            // After spilling the hash table, JoinSpillManager will consume all chunks from its build side input executor and probe side input executor.
556            // Finally, we would get e.g. 20 partitions. Each partition should contain a portion of the original build side input and probr side input data.
557            // A sub HashJoinExecutor would be used to consume each partition one by one.
558            // If memory is still not enough in the sub HashJoinExecutor, it will spill its inputs recursively.
559            info!(
560                "batch hash join executor {} starts to spill out",
561                &self.identity
562            );
563            let mut join_spill_manager = JoinSpillManager::new(
564                self.spill_backend.clone().unwrap(),
565                &self.identity,
566                DEFAULT_SPILL_PARTITION_NUM,
567                probe_data_types.clone(),
568                build_data_types.clone(),
569                self.chunk_size,
570                self.spill_metrics.clone(),
571            )?;
572            join_spill_manager.init_writers().await?;
573
574            // Release memory occupied by the hash map
575            self.mem_ctx.add(-mem_added_by_hash_table);
576            drop(hash_map);
577            drop(next_build_row_with_same_key);
578
579            // Spill buffered build side chunks
580            for chunk in build_side {
581                // Release the memory occupied by the buffered chunks
582                self.mem_ctx.add(-(chunk.estimated_heap_size() as i64));
583                let hash_codes = chunk.get_hash_values(
584                    self.build_key_idxs.as_slice(),
585                    join_spill_manager.spill_build_hasher,
586                );
587                join_spill_manager
588                    .write_build_side_chunk(
589                        chunk,
590                        hash_codes
591                            .into_iter()
592                            .map(|hash_code| hash_code.value())
593                            .collect(),
594                    )
595                    .await?;
596            }
597
598            // Spill build side chunks
599            #[for_await]
600            for chunk in build_side_stream {
601                let chunk = chunk?;
602                let hash_codes = chunk.get_hash_values(
603                    self.build_key_idxs.as_slice(),
604                    join_spill_manager.spill_build_hasher,
605                );
606                join_spill_manager
607                    .write_build_side_chunk(
608                        chunk,
609                        hash_codes
610                            .into_iter()
611                            .map(|hash_code| hash_code.value())
612                            .collect(),
613                    )
614                    .await?;
615            }
616
617            // Spill probe side chunks
618            #[for_await]
619            for chunk in self.probe_side_source.execute() {
620                let chunk = chunk?;
621                let hash_codes = chunk.get_hash_values(
622                    self.probe_key_idxs.as_slice(),
623                    join_spill_manager.spill_build_hasher,
624                );
625                join_spill_manager
626                    .write_probe_side_chunk(
627                        chunk,
628                        hash_codes
629                            .into_iter()
630                            .map(|hash_code| hash_code.value())
631                            .collect(),
632                    )
633                    .await?;
634            }
635
636            join_spill_manager.close_writers().await?;
637
638            // Process each partition one by one.
639            for i in 0..join_spill_manager.partition_num {
640                let partition_size = join_spill_manager.estimate_partition_size(i).await?;
641                let probe_side_stream = join_spill_manager.read_probe_side_partition(i).await?;
642                let build_side_stream = join_spill_manager.read_build_side_partition(i).await?;
643
644                let sub_hash_join_executor: HashJoinExecutor<K> = HashJoinExecutor::new_inner(
645                    self.join_type,
646                    self.output_indices.clone(),
647                    Box::new(WrapStreamExecutor::new(
648                        probe_schema.clone(),
649                        probe_side_stream,
650                    )),
651                    Box::new(WrapStreamExecutor::new(
652                        build_schema.clone(),
653                        build_side_stream,
654                    )),
655                    self.probe_key_idxs.clone(),
656                    self.build_key_idxs.clone(),
657                    self.null_matched.clone(),
658                    self.cond.clone(),
659                    format!("{}-sub{}", self.identity.clone(), i),
660                    self.chunk_size,
661                    self.asof_desc.clone(),
662                    self.spill_backend.clone(),
663                    self.spill_metrics.clone(),
664                    Some(partition_size),
665                    self.shutdown_rx.clone(),
666                    self.mem_ctx.clone(),
667                );
668
669                debug!(
670                    "create sub_hash_join {} for hash_join {} to spill",
671                    sub_hash_join_executor.identity, self.identity
672                );
673
674                let sub_hash_join_executor = Box::new(sub_hash_join_executor).execute();
675
676                #[for_await]
677                for chunk in sub_hash_join_executor {
678                    let chunk = chunk?;
679                    yield chunk;
680                }
681
682                // Clear files of the current partition.
683                join_spill_manager.clear_partition(i).await?;
684            }
685        } else {
686            let params = EquiJoinParams::new(
687                self.probe_side_source,
688                probe_data_types,
689                self.probe_key_idxs,
690                build_side,
691                build_data_types,
692                full_data_types,
693                hash_map,
694                next_build_row_with_same_key,
695                self.chunk_size,
696                self.shutdown_rx.clone(),
697                self.asof_desc,
698            );
699
700            if let Some(cond) = self.cond.as_ref()
701                && !params.is_asof_join()
702            {
703                let stream = match self.join_type {
704                    JoinType::Inner => Self::do_inner_join_with_non_equi_condition(params, cond),
705                    JoinType::LeftOuter => {
706                        Self::do_left_outer_join_with_non_equi_condition(params, cond)
707                    }
708                    JoinType::LeftSemi => {
709                        Self::do_left_semi_join_with_non_equi_condition(params, cond)
710                    }
711                    JoinType::LeftAnti => {
712                        Self::do_left_anti_join_with_non_equi_condition(params, cond)
713                    }
714                    JoinType::RightOuter => {
715                        Self::do_right_outer_join_with_non_equi_condition(params, cond)
716                    }
717                    JoinType::RightSemi => {
718                        Self::do_right_semi_anti_join_with_non_equi_condition::<false>(params, cond)
719                    }
720                    JoinType::RightAnti => {
721                        Self::do_right_semi_anti_join_with_non_equi_condition::<true>(params, cond)
722                    }
723                    JoinType::FullOuter => {
724                        Self::do_full_outer_join_with_non_equi_condition(params, cond)
725                    }
726                    JoinType::AsOfInner | JoinType::AsOfLeftOuter => {
727                        unreachable!("AsOf join should not reach here")
728                    }
729                };
730                // For non-equi join, we need an output chunk builder to align the output chunks.
731                let mut output_chunk_builder =
732                    DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
733                #[for_await]
734                for chunk in stream {
735                    for output_chunk in
736                        output_chunk_builder.append_chunk(chunk?.project(&self.output_indices))
737                    {
738                        yield output_chunk
739                    }
740                }
741                if let Some(output_chunk) = output_chunk_builder.consume_all() {
742                    yield output_chunk
743                }
744            } else {
745                let stream = match self.join_type {
746                    JoinType::Inner | JoinType::AsOfInner => Self::do_inner_join(params),
747                    JoinType::LeftOuter | JoinType::AsOfLeftOuter => {
748                        Self::do_left_outer_join(params)
749                    }
750                    JoinType::LeftSemi => Self::do_left_semi_anti_join::<false>(params),
751                    JoinType::LeftAnti => Self::do_left_semi_anti_join::<true>(params),
752                    JoinType::RightOuter => Self::do_right_outer_join(params),
753                    JoinType::RightSemi => Self::do_right_semi_anti_join::<false>(params),
754                    JoinType::RightAnti => Self::do_right_semi_anti_join::<true>(params),
755                    JoinType::FullOuter => Self::do_full_outer_join(params),
756                };
757                #[for_await]
758                for chunk in stream {
759                    yield chunk?.project(&self.output_indices)
760                }
761            }
762        }
763    }
764
765    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
766    pub async fn do_inner_join(
767        EquiJoinParams {
768            probe_side,
769            probe_key_idxs,
770            build_side,
771            full_data_types,
772            hash_map,
773            next_build_row_with_same_key,
774            chunk_size,
775            shutdown_rx,
776            asof_desc,
777            ..
778        }: EquiJoinParams<K>,
779    ) {
780        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
781        #[for_await]
782        for probe_chunk in probe_side.execute() {
783            let probe_chunk = probe_chunk?;
784            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
785            for (probe_row_id, probe_key) in probe_keys
786                .iter()
787                .enumerate()
788                .filter_by_bitmap(probe_chunk.visibility())
789            {
790                let build_side_row_iter =
791                    next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied());
792                if let Some(asof_desc) = &asof_desc {
793                    if let Some(build_row_id) = Self::find_asof_matched_rows(
794                        probe_chunk.row_at_unchecked_vis(probe_row_id),
795                        &build_side,
796                        build_side_row_iter,
797                        asof_desc,
798                    ) {
799                        shutdown_rx.check()?;
800                        if let Some(spilled) = Self::append_one_row(
801                            &mut chunk_builder,
802                            &probe_chunk,
803                            probe_row_id,
804                            &build_side[build_row_id.chunk_id()],
805                            build_row_id.row_id(),
806                        ) {
807                            yield spilled
808                        }
809                    }
810                } else {
811                    for build_row_id in build_side_row_iter {
812                        shutdown_rx.check()?;
813                        let build_chunk = &build_side[build_row_id.chunk_id()];
814                        if let Some(spilled) = Self::append_one_row(
815                            &mut chunk_builder,
816                            &probe_chunk,
817                            probe_row_id,
818                            build_chunk,
819                            build_row_id.row_id(),
820                        ) {
821                            yield spilled
822                        }
823                    }
824                }
825            }
826        }
827        if let Some(spilled) = chunk_builder.consume_all() {
828            yield spilled
829        }
830    }
831
832    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
833    pub async fn do_inner_join_with_non_equi_condition(
834        params: EquiJoinParams<K>,
835        cond: &BoxedExpression,
836    ) {
837        #[for_await]
838        for chunk in Self::do_inner_join(params) {
839            let mut chunk = chunk?;
840            chunk.set_visibility(cond.eval(&chunk).await?.as_bool().iter().collect());
841            yield chunk
842        }
843    }
844
845    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
846    pub async fn do_left_outer_join(
847        EquiJoinParams {
848            probe_side,
849            probe_key_idxs,
850            build_side,
851            build_data_types,
852            full_data_types,
853            hash_map,
854            next_build_row_with_same_key,
855            chunk_size,
856            shutdown_rx,
857            asof_desc,
858            ..
859        }: EquiJoinParams<K>,
860    ) {
861        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
862        #[for_await]
863        for probe_chunk in probe_side.execute() {
864            let probe_chunk = probe_chunk?;
865            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
866            for (probe_row_id, probe_key) in probe_keys
867                .iter()
868                .enumerate()
869                .filter_by_bitmap(probe_chunk.visibility())
870            {
871                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
872                    let build_side_row_iter =
873                        next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id));
874                    if let Some(asof_desc) = &asof_desc {
875                        if let Some(build_row_id) = Self::find_asof_matched_rows(
876                            probe_chunk.row_at_unchecked_vis(probe_row_id),
877                            &build_side,
878                            build_side_row_iter,
879                            asof_desc,
880                        ) {
881                            shutdown_rx.check()?;
882                            if let Some(spilled) = Self::append_one_row(
883                                &mut chunk_builder,
884                                &probe_chunk,
885                                probe_row_id,
886                                &build_side[build_row_id.chunk_id()],
887                                build_row_id.row_id(),
888                            ) {
889                                yield spilled
890                            }
891                        } else {
892                            shutdown_rx.check()?;
893                            let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
894                            if let Some(spilled) = Self::append_one_row_with_null_build_side(
895                                &mut chunk_builder,
896                                probe_row,
897                                build_data_types.len(),
898                            ) {
899                                yield spilled
900                            }
901                        }
902                    } else {
903                        for build_row_id in build_side_row_iter {
904                            shutdown_rx.check()?;
905                            let build_chunk = &build_side[build_row_id.chunk_id()];
906                            if let Some(spilled) = Self::append_one_row(
907                                &mut chunk_builder,
908                                &probe_chunk,
909                                probe_row_id,
910                                build_chunk,
911                                build_row_id.row_id(),
912                            ) {
913                                yield spilled
914                            }
915                        }
916                    }
917                } else {
918                    shutdown_rx.check()?;
919                    let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
920                    if let Some(spilled) = Self::append_one_row_with_null_build_side(
921                        &mut chunk_builder,
922                        probe_row,
923                        build_data_types.len(),
924                    ) {
925                        yield spilled
926                    }
927                }
928            }
929        }
930        if let Some(spilled) = chunk_builder.consume_all() {
931            yield spilled
932        }
933    }
934
935    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
936    pub async fn do_left_outer_join_with_non_equi_condition(
937        EquiJoinParams {
938            probe_side,
939            probe_data_types,
940            probe_key_idxs,
941            build_side,
942            build_data_types,
943            full_data_types,
944            hash_map,
945            next_build_row_with_same_key,
946            chunk_size,
947            shutdown_rx,
948            ..
949        }: EquiJoinParams<K>,
950        cond: &BoxedExpression,
951    ) {
952        let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size);
953        let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
954        let mut non_equi_state = LeftNonEquiJoinState {
955            probe_column_count: probe_data_types.len(),
956            ..Default::default()
957        };
958
959        #[for_await]
960        for probe_chunk in probe_side.execute() {
961            let probe_chunk = probe_chunk?;
962            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
963            for (probe_row_id, probe_key) in probe_keys
964                .iter()
965                .enumerate()
966                .filter_by_bitmap(probe_chunk.visibility())
967            {
968                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
969                    non_equi_state
970                        .first_output_row_id
971                        .push(chunk_builder.buffered_count());
972
973                    let mut build_row_id_iter = next_build_row_with_same_key
974                        .row_id_iter(Some(*first_matched_build_row_id))
975                        .peekable();
976                    while let Some(build_row_id) = build_row_id_iter.next() {
977                        shutdown_rx.check()?;
978                        let build_chunk = &build_side[build_row_id.chunk_id()];
979                        if let Some(spilled) = Self::append_one_row(
980                            &mut chunk_builder,
981                            &probe_chunk,
982                            probe_row_id,
983                            build_chunk,
984                            build_row_id.row_id(),
985                        ) {
986                            non_equi_state.has_more_output_rows =
987                                build_row_id_iter.peek().is_some();
988                            yield Self::process_left_outer_join_non_equi_condition(
989                                spilled,
990                                cond.as_ref(),
991                                &mut non_equi_state,
992                            )
993                            .await?
994                        }
995                    }
996                } else {
997                    shutdown_rx.check()?;
998                    let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
999                    if let Some(spilled) = Self::append_one_row_with_null_build_side(
1000                        &mut remaining_chunk_builder,
1001                        probe_row,
1002                        build_data_types.len(),
1003                    ) {
1004                        yield spilled
1005                    }
1006                }
1007            }
1008        }
1009        non_equi_state.has_more_output_rows = false;
1010        if let Some(spilled) = chunk_builder.consume_all() {
1011            yield Self::process_left_outer_join_non_equi_condition(
1012                spilled,
1013                cond.as_ref(),
1014                &mut non_equi_state,
1015            )
1016            .await?
1017        }
1018
1019        if let Some(spilled) = remaining_chunk_builder.consume_all() {
1020            yield spilled
1021        }
1022    }
1023
1024    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1025    pub async fn do_left_semi_anti_join<const ANTI_JOIN: bool>(
1026        EquiJoinParams {
1027            probe_side,
1028            probe_data_types,
1029            probe_key_idxs,
1030            hash_map,
1031            chunk_size,
1032            shutdown_rx,
1033            ..
1034        }: EquiJoinParams<K>,
1035    ) {
1036        let mut chunk_builder = DataChunkBuilder::new(probe_data_types, chunk_size);
1037        #[for_await]
1038        for probe_chunk in probe_side.execute() {
1039            let probe_chunk = probe_chunk?;
1040            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1041            for (probe_row_id, probe_key) in probe_keys
1042                .iter()
1043                .enumerate()
1044                .filter_by_bitmap(probe_chunk.visibility())
1045            {
1046                shutdown_rx.check()?;
1047                if !ANTI_JOIN {
1048                    if hash_map.contains_key(probe_key)
1049                        && let Some(spilled) = Self::append_one_probe_row(
1050                            &mut chunk_builder,
1051                            &probe_chunk,
1052                            probe_row_id,
1053                        )
1054                    {
1055                        yield spilled
1056                    }
1057                } else if hash_map.get(probe_key).is_none()
1058                    && let Some(spilled) =
1059                        Self::append_one_probe_row(&mut chunk_builder, &probe_chunk, probe_row_id)
1060                {
1061                    yield spilled
1062                }
1063            }
1064        }
1065        if let Some(spilled) = chunk_builder.consume_all() {
1066            yield spilled
1067        }
1068    }
1069
1070    /// High-level idea:
1071    /// 1. For each `probe_row`, append candidate rows to buffer.
1072    ///    Candidate rows: Those satisfying `equi_predicate` (==).
1073    /// 2. If buffer becomes full, process it.
1074    ///    Apply `non_equi_join` predicates e.g. `>=`, `<=` to filter rows.
1075    ///    Track if `probe_row` is matched to avoid duplicates.
1076    /// 3. If we matched `probe_row` in spilled chunk,
1077    ///    stop appending its candidate rows,
1078    ///    to avoid matching it again in next spilled chunk.
1079    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1080    pub async fn do_left_semi_join_with_non_equi_condition<'a>(
1081        EquiJoinParams {
1082            probe_side,
1083            probe_key_idxs,
1084            build_side,
1085            full_data_types,
1086            hash_map,
1087            next_build_row_with_same_key,
1088            chunk_size,
1089            shutdown_rx,
1090            ..
1091        }: EquiJoinParams<K>,
1092        cond: &'a BoxedExpression,
1093    ) {
1094        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1095        let mut non_equi_state = LeftNonEquiJoinState::default();
1096
1097        #[for_await]
1098        for probe_chunk in probe_side.execute() {
1099            let probe_chunk = probe_chunk?;
1100            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1101            for (probe_row_id, probe_key) in probe_keys
1102                .iter()
1103                .enumerate()
1104                .filter_by_bitmap(probe_chunk.visibility())
1105            {
1106                non_equi_state.found_matched = false;
1107                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1108                    non_equi_state
1109                        .first_output_row_id
1110                        .push(chunk_builder.buffered_count());
1111
1112                    for build_row_id in
1113                        next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id))
1114                    {
1115                        shutdown_rx.check()?;
1116                        if non_equi_state.found_matched {
1117                            break;
1118                        }
1119                        let build_chunk = &build_side[build_row_id.chunk_id()];
1120                        if let Some(spilled) = Self::append_one_row(
1121                            &mut chunk_builder,
1122                            &probe_chunk,
1123                            probe_row_id,
1124                            build_chunk,
1125                            build_row_id.row_id(),
1126                        ) {
1127                            yield Self::process_left_semi_anti_join_non_equi_condition::<false>(
1128                                spilled,
1129                                cond.as_ref(),
1130                                &mut non_equi_state,
1131                            )
1132                            .await?
1133                        }
1134                    }
1135                }
1136            }
1137        }
1138
1139        // Process remaining rows in buffer
1140        if let Some(spilled) = chunk_builder.consume_all() {
1141            yield Self::process_left_semi_anti_join_non_equi_condition::<false>(
1142                spilled,
1143                cond.as_ref(),
1144                &mut non_equi_state,
1145            )
1146            .await?
1147        }
1148    }
1149
1150    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1151    pub async fn do_left_anti_join_with_non_equi_condition(
1152        EquiJoinParams {
1153            probe_side,
1154            probe_data_types,
1155            probe_key_idxs,
1156            build_side,
1157            full_data_types,
1158            hash_map,
1159            next_build_row_with_same_key,
1160            chunk_size,
1161            shutdown_rx,
1162            ..
1163        }: EquiJoinParams<K>,
1164        cond: &BoxedExpression,
1165    ) {
1166        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1167        let mut remaining_chunk_builder = DataChunkBuilder::new(probe_data_types, chunk_size);
1168        let mut non_equi_state = LeftNonEquiJoinState::default();
1169
1170        #[for_await]
1171        for probe_chunk in probe_side.execute() {
1172            let probe_chunk = probe_chunk?;
1173            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1174            for (probe_row_id, probe_key) in probe_keys
1175                .iter()
1176                .enumerate()
1177                .filter_by_bitmap(probe_chunk.visibility())
1178            {
1179                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1180                    non_equi_state
1181                        .first_output_row_id
1182                        .push(chunk_builder.buffered_count());
1183                    let mut build_row_id_iter = next_build_row_with_same_key
1184                        .row_id_iter(Some(*first_matched_build_row_id))
1185                        .peekable();
1186                    while let Some(build_row_id) = build_row_id_iter.next() {
1187                        shutdown_rx.check()?;
1188                        let build_chunk = &build_side[build_row_id.chunk_id()];
1189                        if let Some(spilled) = Self::append_one_row(
1190                            &mut chunk_builder,
1191                            &probe_chunk,
1192                            probe_row_id,
1193                            build_chunk,
1194                            build_row_id.row_id(),
1195                        ) {
1196                            non_equi_state.has_more_output_rows =
1197                                build_row_id_iter.peek().is_some();
1198                            yield Self::process_left_semi_anti_join_non_equi_condition::<true>(
1199                                spilled,
1200                                cond.as_ref(),
1201                                &mut non_equi_state,
1202                            )
1203                            .await?
1204                        }
1205                    }
1206                } else if let Some(spilled) = Self::append_one_probe_row(
1207                    &mut remaining_chunk_builder,
1208                    &probe_chunk,
1209                    probe_row_id,
1210                ) {
1211                    yield spilled
1212                }
1213            }
1214        }
1215        non_equi_state.has_more_output_rows = false;
1216        if let Some(spilled) = chunk_builder.consume_all() {
1217            yield Self::process_left_semi_anti_join_non_equi_condition::<true>(
1218                spilled,
1219                cond.as_ref(),
1220                &mut non_equi_state,
1221            )
1222            .await?
1223        }
1224        if let Some(spilled) = remaining_chunk_builder.consume_all() {
1225            yield spilled
1226        }
1227    }
1228
1229    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1230    pub async fn do_right_outer_join(
1231        EquiJoinParams {
1232            probe_side,
1233            probe_data_types,
1234            probe_key_idxs,
1235            build_side,
1236            full_data_types,
1237            hash_map,
1238            next_build_row_with_same_key,
1239            chunk_size,
1240            shutdown_rx,
1241            ..
1242        }: EquiJoinParams<K>,
1243    ) {
1244        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1245        let mut build_row_matched =
1246            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1247
1248        #[for_await]
1249        for probe_chunk in probe_side.execute() {
1250            let probe_chunk = probe_chunk?;
1251            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1252            for (probe_row_id, probe_key) in probe_keys
1253                .iter()
1254                .enumerate()
1255                .filter_by_bitmap(probe_chunk.visibility())
1256            {
1257                for build_row_id in
1258                    next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1259                {
1260                    shutdown_rx.check()?;
1261                    build_row_matched[build_row_id] = true;
1262                    let build_chunk = &build_side[build_row_id.chunk_id()];
1263                    if let Some(spilled) = Self::append_one_row(
1264                        &mut chunk_builder,
1265                        &probe_chunk,
1266                        probe_row_id,
1267                        build_chunk,
1268                        build_row_id.row_id(),
1269                    ) {
1270                        yield spilled
1271                    }
1272                }
1273            }
1274        }
1275        #[for_await]
1276        for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1277            &mut chunk_builder,
1278            &build_side,
1279            &build_row_matched,
1280            probe_data_types.len(),
1281        ) {
1282            yield spilled?
1283        }
1284    }
1285
1286    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1287    pub async fn do_right_outer_join_with_non_equi_condition(
1288        EquiJoinParams {
1289            probe_side,
1290            probe_data_types,
1291            probe_key_idxs,
1292            build_side,
1293            full_data_types,
1294            hash_map,
1295            next_build_row_with_same_key,
1296            chunk_size,
1297            shutdown_rx,
1298            ..
1299        }: EquiJoinParams<K>,
1300        cond: &BoxedExpression,
1301    ) {
1302        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1303        let build_row_matched =
1304            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1305        let mut non_equi_state = RightNonEquiJoinState {
1306            build_row_matched,
1307            ..Default::default()
1308        };
1309
1310        #[for_await]
1311        for probe_chunk in probe_side.execute() {
1312            let probe_chunk = probe_chunk?;
1313            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1314            for (probe_row_id, probe_key) in probe_keys
1315                .iter()
1316                .enumerate()
1317                .filter_by_bitmap(probe_chunk.visibility())
1318            {
1319                for build_row_id in
1320                    next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1321                {
1322                    shutdown_rx.check()?;
1323                    non_equi_state.build_row_ids.push(build_row_id);
1324                    let build_chunk = &build_side[build_row_id.chunk_id()];
1325                    if let Some(spilled) = Self::append_one_row(
1326                        &mut chunk_builder,
1327                        &probe_chunk,
1328                        probe_row_id,
1329                        build_chunk,
1330                        build_row_id.row_id(),
1331                    ) {
1332                        yield Self::process_right_outer_join_non_equi_condition(
1333                            spilled,
1334                            cond.as_ref(),
1335                            &mut non_equi_state,
1336                        )
1337                        .await?
1338                    }
1339                }
1340            }
1341        }
1342        if let Some(spilled) = chunk_builder.consume_all() {
1343            yield Self::process_right_outer_join_non_equi_condition(
1344                spilled,
1345                cond.as_ref(),
1346                &mut non_equi_state,
1347            )
1348            .await?
1349        }
1350        #[for_await]
1351        for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1352            &mut chunk_builder,
1353            &build_side,
1354            &non_equi_state.build_row_matched,
1355            probe_data_types.len(),
1356        ) {
1357            yield spilled?
1358        }
1359    }
1360
1361    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1362    pub async fn do_right_semi_anti_join<const ANTI_JOIN: bool>(
1363        EquiJoinParams {
1364            probe_side,
1365            probe_key_idxs,
1366            build_side,
1367            build_data_types,
1368            hash_map,
1369            next_build_row_with_same_key,
1370            chunk_size,
1371            shutdown_rx,
1372            ..
1373        }: EquiJoinParams<K>,
1374    ) {
1375        let mut chunk_builder = DataChunkBuilder::new(build_data_types, chunk_size);
1376        let mut build_row_matched =
1377            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1378
1379        #[for_await]
1380        for probe_chunk in probe_side.execute() {
1381            let probe_chunk = probe_chunk?;
1382            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1383            for probe_key in probe_keys.iter().filter_by_bitmap(probe_chunk.visibility()) {
1384                for build_row_id in
1385                    next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1386                {
1387                    shutdown_rx.check()?;
1388                    build_row_matched[build_row_id] = true;
1389                }
1390            }
1391        }
1392        #[for_await]
1393        for spilled in Self::handle_remaining_build_rows_for_right_semi_anti_join::<ANTI_JOIN>(
1394            &mut chunk_builder,
1395            &build_side,
1396            &build_row_matched,
1397        ) {
1398            yield spilled?
1399        }
1400    }
1401
1402    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1403    pub async fn do_right_semi_anti_join_with_non_equi_condition<const ANTI_JOIN: bool>(
1404        EquiJoinParams {
1405            probe_side,
1406            probe_key_idxs,
1407            build_side,
1408            build_data_types,
1409            full_data_types,
1410            hash_map,
1411            next_build_row_with_same_key,
1412            chunk_size,
1413            shutdown_rx,
1414            ..
1415        }: EquiJoinParams<K>,
1416        cond: &BoxedExpression,
1417    ) {
1418        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1419        let mut remaining_chunk_builder = DataChunkBuilder::new(build_data_types, chunk_size);
1420        let build_row_matched =
1421            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1422        let mut non_equi_state = RightNonEquiJoinState {
1423            build_row_matched,
1424            ..Default::default()
1425        };
1426
1427        #[for_await]
1428        for probe_chunk in probe_side.execute() {
1429            let probe_chunk = probe_chunk?;
1430            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1431            for (probe_row_id, probe_key) in probe_keys
1432                .iter()
1433                .enumerate()
1434                .filter_by_bitmap(probe_chunk.visibility())
1435            {
1436                for build_row_id in
1437                    next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1438                {
1439                    shutdown_rx.check()?;
1440                    non_equi_state.build_row_ids.push(build_row_id);
1441                    let build_chunk = &build_side[build_row_id.chunk_id()];
1442                    if let Some(spilled) = Self::append_one_row(
1443                        &mut chunk_builder,
1444                        &probe_chunk,
1445                        probe_row_id,
1446                        build_chunk,
1447                        build_row_id.row_id(),
1448                    ) {
1449                        Self::process_right_semi_anti_join_non_equi_condition(
1450                            spilled,
1451                            cond.as_ref(),
1452                            &mut non_equi_state,
1453                        )
1454                        .await?
1455                    }
1456                }
1457            }
1458        }
1459        if let Some(spilled) = chunk_builder.consume_all() {
1460            Self::process_right_semi_anti_join_non_equi_condition(
1461                spilled,
1462                cond.as_ref(),
1463                &mut non_equi_state,
1464            )
1465            .await?
1466        }
1467        #[for_await]
1468        for spilled in Self::handle_remaining_build_rows_for_right_semi_anti_join::<ANTI_JOIN>(
1469            &mut remaining_chunk_builder,
1470            &build_side,
1471            &non_equi_state.build_row_matched,
1472        ) {
1473            yield spilled?
1474        }
1475    }
1476
1477    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1478    pub async fn do_full_outer_join(
1479        EquiJoinParams {
1480            probe_side,
1481            probe_data_types,
1482            probe_key_idxs,
1483            build_side,
1484            build_data_types,
1485            full_data_types,
1486            hash_map,
1487            next_build_row_with_same_key,
1488            chunk_size,
1489            shutdown_rx,
1490            ..
1491        }: EquiJoinParams<K>,
1492    ) {
1493        let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1494        let mut build_row_matched =
1495            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1496
1497        #[for_await]
1498        for probe_chunk in probe_side.execute() {
1499            let probe_chunk = probe_chunk?;
1500            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1501            for (probe_row_id, probe_key) in probe_keys
1502                .iter()
1503                .enumerate()
1504                .filter_by_bitmap(probe_chunk.visibility())
1505            {
1506                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1507                    for build_row_id in
1508                        next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id))
1509                    {
1510                        shutdown_rx.check()?;
1511                        build_row_matched[build_row_id] = true;
1512                        let build_chunk = &build_side[build_row_id.chunk_id()];
1513                        if let Some(spilled) = Self::append_one_row(
1514                            &mut chunk_builder,
1515                            &probe_chunk,
1516                            probe_row_id,
1517                            build_chunk,
1518                            build_row_id.row_id(),
1519                        ) {
1520                            yield spilled
1521                        }
1522                    }
1523                } else {
1524                    let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
1525                    if let Some(spilled) = Self::append_one_row_with_null_build_side(
1526                        &mut chunk_builder,
1527                        probe_row,
1528                        build_data_types.len(),
1529                    ) {
1530                        yield spilled
1531                    }
1532                }
1533            }
1534        }
1535        #[for_await]
1536        for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1537            &mut chunk_builder,
1538            &build_side,
1539            &build_row_matched,
1540            probe_data_types.len(),
1541        ) {
1542            yield spilled?
1543        }
1544    }
1545
1546    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1547    pub async fn do_full_outer_join_with_non_equi_condition(
1548        EquiJoinParams {
1549            probe_side,
1550            probe_data_types,
1551            probe_key_idxs,
1552            build_side,
1553            build_data_types,
1554            full_data_types,
1555            hash_map,
1556            next_build_row_with_same_key,
1557            chunk_size,
1558            shutdown_rx,
1559            ..
1560        }: EquiJoinParams<K>,
1561        cond: &BoxedExpression,
1562    ) {
1563        let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size);
1564        let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1565        let mut left_non_equi_state = LeftNonEquiJoinState {
1566            probe_column_count: probe_data_types.len(),
1567            ..Default::default()
1568        };
1569        let build_row_matched =
1570            ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1571        let mut right_non_equi_state = RightNonEquiJoinState {
1572            build_row_matched,
1573            ..Default::default()
1574        };
1575
1576        #[for_await]
1577        for probe_chunk in probe_side.execute() {
1578            let probe_chunk = probe_chunk?;
1579            let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1580            for (probe_row_id, probe_key) in probe_keys
1581                .iter()
1582                .enumerate()
1583                .filter_by_bitmap(probe_chunk.visibility())
1584            {
1585                left_non_equi_state.found_matched = false;
1586                if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1587                    left_non_equi_state
1588                        .first_output_row_id
1589                        .push(chunk_builder.buffered_count());
1590                    let mut build_row_id_iter = next_build_row_with_same_key
1591                        .row_id_iter(Some(*first_matched_build_row_id))
1592                        .peekable();
1593                    while let Some(build_row_id) = build_row_id_iter.next() {
1594                        shutdown_rx.check()?;
1595                        right_non_equi_state.build_row_ids.push(build_row_id);
1596                        let build_chunk = &build_side[build_row_id.chunk_id()];
1597                        if let Some(spilled) = Self::append_one_row(
1598                            &mut chunk_builder,
1599                            &probe_chunk,
1600                            probe_row_id,
1601                            build_chunk,
1602                            build_row_id.row_id(),
1603                        ) {
1604                            left_non_equi_state.has_more_output_rows =
1605                                build_row_id_iter.peek().is_some();
1606                            yield Self::process_full_outer_join_non_equi_condition(
1607                                spilled,
1608                                cond.as_ref(),
1609                                &mut left_non_equi_state,
1610                                &mut right_non_equi_state,
1611                            )
1612                            .await?
1613                        }
1614                    }
1615                } else {
1616                    shutdown_rx.check()?;
1617                    let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
1618                    if let Some(spilled) = Self::append_one_row_with_null_build_side(
1619                        &mut remaining_chunk_builder,
1620                        probe_row,
1621                        build_data_types.len(),
1622                    ) {
1623                        yield spilled
1624                    }
1625                }
1626            }
1627        }
1628        left_non_equi_state.has_more_output_rows = false;
1629        if let Some(spilled) = chunk_builder.consume_all() {
1630            yield Self::process_full_outer_join_non_equi_condition(
1631                spilled,
1632                cond.as_ref(),
1633                &mut left_non_equi_state,
1634                &mut right_non_equi_state,
1635            )
1636            .await?
1637        }
1638        #[for_await]
1639        for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1640            &mut remaining_chunk_builder,
1641            &build_side,
1642            &right_non_equi_state.build_row_matched,
1643            probe_data_types.len(),
1644        ) {
1645            yield spilled?
1646        }
1647    }
1648
1649    /// Process output chunk for left outer join when non-equi condition is presented.
1650    ///
1651    /// # Arguments
1652    /// * `chunk` - Output chunk from `do_left_outer_join_with_non_equi_condition`, containing:
1653    ///     - Concatenation of probe row and its corresponding build row according to the hash map.
1654    ///     - Concatenation of probe row and `NULL` build row, if there is no matched build row
1655    ///       found for the probe row.
1656    /// * `cond` - Non-equi join condition.
1657    /// * `probe_column_count` - The number of columns in the probe side.
1658    /// * `first_output_row_id` - The offset of the first output row in `chunk` for each probe side
1659    ///   row that has been processed.
1660    /// * `has_more_output_rows` - Whether the probe row being processed currently has output rows
1661    ///   in next output chunk.
1662    /// * `found_matched` - Whether the probe row being processed currently has matched non-NULL
1663    ///   build rows in last output chunk.
1664    ///
1665    /// # Examples
1666    /// Assume we have two tables `t1` and `t2` as probe side and build side, respectively.
1667    /// ```sql
1668    /// CREATE TABLE t1 (v1 int, v2 int);
1669    /// CREATE TABLE t2 (v3 int);
1670    /// ```
1671    ///
1672    /// Now we de left outer join on `t1` and `t2`, as the following query shows:
1673    /// ```sql
1674    /// SELECT * FROM t1 LEFT JOIN t2 ON t1.v1 = t2.v3 AND t1.v2 <> t2.v3;
1675    /// ```
1676    ///
1677    /// Assume the chunk builder in `do_left_outer_join_with_non_equi_condition` has buffer size 5,
1678    /// and we have the following chunk as the first output ('-' represents NULL).
1679    ///
1680    /// | offset | v1 | v2 | v3 |
1681    /// |---|---|---|---|
1682    /// | 0 | 1 | 2 | 1 |
1683    /// | 1 | 1 | 1 | 1 |
1684    /// | 2 | 2 | 3 | - |
1685    /// | 3 | 3 | 3 | 3 |
1686    /// | 4 | 3 | 3 | 3 |
1687    ///
1688    /// We have the following precondition:
1689    /// ```ignore
1690    /// assert_eq!(probe_column_count, 2);
1691    /// assert_eq!(first_out_row_id, vec![0, 1, 2, 3]);
1692    /// assert_eq!(has_more_output_rows);
1693    /// assert_eq!(!found_matched);
1694    /// ```
1695    ///
1696    /// In `process_left_outer_join_non_equi_condition`, we transform the chunk in following steps.
1697    ///
1698    /// 1. Evaluate the non-equi condition on the chunk. Here the condition is `t1.v2 <> t2.v3`.
1699    ///
1700    /// We get the result array:
1701    ///
1702    /// | offset | value |
1703    /// | --- | --- |
1704    /// | 0 | true |
1705    /// | 1 | false |
1706    /// | 2 | false |
1707    /// | 3 | false |
1708    /// | 4 | false |
1709    ///
1710    /// 2. Set the build side columns to NULL if the corresponding result value is false.
1711    ///
1712    /// The chunk is changed to:
1713    ///
1714    /// | offset | v1 | v2 | v3 |
1715    /// |---|---|---|---|
1716    /// | 0 | 1 | 2 | 1 |
1717    /// | 1 | 1 | 1 | - |
1718    /// | 2 | 2 | 3 | - |
1719    /// | 3 | 3 | 3 | - |
1720    /// | 4 | 3 | 3 | - |
1721    ///
1722    /// 3. Remove duplicate rows with NULL build side. This is done by setting the visibility bitmap
1723    ///    of the chunk.
1724    ///
1725    /// | offset | v1 | v2 | v3 |
1726    /// |---|---|---|---|
1727    /// | 0 | 1 | 2 | 1 |
1728    /// | 1 | 1 | 1 | - |
1729    /// | 2 | 2 | 3 | - |
1730    /// | 3 | ~~3~~ | ~~3~~ | ~~-~~ |
1731    /// | 4 | ~~3~~ | ~~3~~ | ~~-~~ |
1732    ///
1733    /// For the probe row being processed currently (`(3, 3)` here), we don't have output rows with
1734    /// non-NULL build side, so we set `found_matched` to false.
1735    ///
1736    /// In `do_left_outer_join_with_non_equi_condition`, we have next output chunk as follows:
1737    ///
1738    /// | offset | v1 | v2 | v3 |
1739    /// |---|---|---|---|
1740    /// | 0 | 3 | 3 | 3 |
1741    /// | 1 | 3 | 3 | 3 |
1742    /// | 2 | 5 | 5 | - |
1743    /// | 3 | 5 | 3 | - |
1744    /// | 4 | 5 | 3 | - |
1745    ///
1746    /// This time We have the following precondition:
1747    /// ```ignore
1748    /// assert_eq!(probe_column_count, 2);
1749    /// assert_eq!(first_out_row_id, vec![2, 3]);
1750    /// assert_eq!(!has_more_output_rows);
1751    /// assert_eq!(!found_matched);
1752    /// ```
1753    ///
1754    /// The transformed chunk is as follows after the same steps.
1755    ///
1756    /// | offset | v1 | v2 | v3 |
1757    /// |---|---|---|---|
1758    /// | 0 | ~~3~~ | ~~3~~ | ~~3~~ |
1759    /// | 1 | 3 | 3 | - |
1760    /// | 2 | 5 | 5 | - |
1761    /// | 3 | 5 | 3 | - |
1762    /// | 4 | ~~5~~ | ~~3~~ | ~~-~~ |
1763    ///
1764    /// After we add these chunks to output chunk builder in `do_execute`, we get the final output:
1765    ///
1766    /// Chunk 1
1767    ///
1768    /// | offset | v1 | v2 | v3 |
1769    /// |---|---|---|---|
1770    /// | 0 | 1 | 2 | 1 |
1771    /// | 1 | 1 | 1 | - |
1772    /// | 2 | 2 | 3 | - |
1773    /// | 3 | 3 | 3 | - |
1774    /// | 4 | 5 | 5 | - |
1775    ///
1776    /// Chunk 2
1777    ///
1778    /// | offset | v1 | v2 | v3 |
1779    /// |---|---|---|---|
1780    /// | 0 | 5 | 3 | - |
1781    ///
1782    ///
1783    /// For more information about how `process_*_join_non_equi_condition` work, see their unit
1784    /// tests.
1785    async fn process_left_outer_join_non_equi_condition(
1786        chunk: DataChunk,
1787        cond: &dyn Expression,
1788        LeftNonEquiJoinState {
1789            probe_column_count,
1790            first_output_row_id,
1791            has_more_output_rows,
1792            found_matched,
1793        }: &mut LeftNonEquiJoinState,
1794    ) -> Result<DataChunk> {
1795        let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1796        Ok(DataChunkMutator(chunk)
1797            .nullify_build_side_for_non_equi_condition(&filter, *probe_column_count)
1798            .remove_duplicate_rows_for_left_outer_join(
1799                &filter,
1800                first_output_row_id,
1801                *has_more_output_rows,
1802                found_matched,
1803            )
1804            .take())
1805    }
1806
1807    /// Filters for candidate rows which satisfy `non_equi` predicate.
1808    /// Removes duplicate rows.
1809    async fn process_left_semi_anti_join_non_equi_condition<const ANTI_JOIN: bool>(
1810        chunk: DataChunk,
1811        cond: &dyn Expression,
1812        LeftNonEquiJoinState {
1813            first_output_row_id,
1814            found_matched,
1815            has_more_output_rows,
1816            ..
1817        }: &mut LeftNonEquiJoinState,
1818    ) -> Result<DataChunk> {
1819        let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1820        Ok(DataChunkMutator(chunk)
1821            .remove_duplicate_rows_for_left_semi_anti_join::<ANTI_JOIN>(
1822                &filter,
1823                first_output_row_id,
1824                *has_more_output_rows,
1825                found_matched,
1826            )
1827            .take())
1828    }
1829
1830    async fn process_right_outer_join_non_equi_condition(
1831        chunk: DataChunk,
1832        cond: &dyn Expression,
1833        RightNonEquiJoinState {
1834            build_row_ids,
1835            build_row_matched,
1836        }: &mut RightNonEquiJoinState,
1837    ) -> Result<DataChunk> {
1838        let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1839        Ok(DataChunkMutator(chunk)
1840            .remove_duplicate_rows_for_right_outer_join(&filter, build_row_ids, build_row_matched)
1841            .take())
1842    }
1843
1844    async fn process_right_semi_anti_join_non_equi_condition(
1845        chunk: DataChunk,
1846        cond: &dyn Expression,
1847        RightNonEquiJoinState {
1848            build_row_ids,
1849            build_row_matched,
1850        }: &mut RightNonEquiJoinState,
1851    ) -> Result<()> {
1852        let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1853        DataChunkMutator(chunk).remove_duplicate_rows_for_right_semi_anti_join(
1854            &filter,
1855            build_row_ids,
1856            build_row_matched,
1857        );
1858        Ok(())
1859    }
1860
1861    async fn process_full_outer_join_non_equi_condition(
1862        chunk: DataChunk,
1863        cond: &dyn Expression,
1864        left_non_equi_state: &mut LeftNonEquiJoinState,
1865        right_non_equi_state: &mut RightNonEquiJoinState,
1866    ) -> Result<DataChunk> {
1867        let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1868        Ok(DataChunkMutator(chunk)
1869            .nullify_build_side_for_non_equi_condition(
1870                &filter,
1871                left_non_equi_state.probe_column_count,
1872            )
1873            .remove_duplicate_rows_for_full_outer_join(
1874                &filter,
1875                left_non_equi_state,
1876                right_non_equi_state,
1877            )
1878            .take())
1879    }
1880
1881    #[try_stream(ok = DataChunk, error = BatchError)]
1882    async fn handle_remaining_build_rows_for_right_outer_join<'a>(
1883        chunk_builder: &'a mut DataChunkBuilder,
1884        build_side: &'a [DataChunk],
1885        build_row_matched: &'a ChunkedData<bool>,
1886        probe_column_count: usize,
1887    ) {
1888        for build_row_id in build_row_matched
1889            .all_row_ids()
1890            .filter(|build_row_id| !build_row_matched[*build_row_id])
1891        {
1892            let build_row =
1893                build_side[build_row_id.chunk_id()].row_at_unchecked_vis(build_row_id.row_id());
1894            if let Some(spilled) = Self::append_one_row_with_null_probe_side(
1895                chunk_builder,
1896                build_row,
1897                probe_column_count,
1898            ) {
1899                yield spilled
1900            }
1901        }
1902        if let Some(spilled) = chunk_builder.consume_all() {
1903            yield spilled
1904        }
1905    }
1906
1907    #[try_stream(ok = DataChunk, error = BatchError)]
1908    async fn handle_remaining_build_rows_for_right_semi_anti_join<'a, const ANTI_JOIN: bool>(
1909        chunk_builder: &'a mut DataChunkBuilder,
1910        build_side: &'a [DataChunk],
1911        build_row_matched: &'a ChunkedData<bool>,
1912    ) {
1913        for build_row_id in build_row_matched.all_row_ids().filter(|build_row_id| {
1914            if !ANTI_JOIN {
1915                build_row_matched[*build_row_id]
1916            } else {
1917                !build_row_matched[*build_row_id]
1918            }
1919        }) {
1920            if let Some(spilled) = Self::append_one_build_row(
1921                chunk_builder,
1922                &build_side[build_row_id.chunk_id()],
1923                build_row_id.row_id(),
1924            ) {
1925                yield spilled
1926            }
1927        }
1928        if let Some(spilled) = chunk_builder.consume_all() {
1929            yield spilled
1930        }
1931    }
1932
1933    fn append_one_row(
1934        chunk_builder: &mut DataChunkBuilder,
1935        probe_chunk: &DataChunk,
1936        probe_row_id: usize,
1937        build_chunk: &DataChunk,
1938        build_row_id: usize,
1939    ) -> Option<DataChunk> {
1940        chunk_builder.append_one_row_from_array_elements(
1941            probe_chunk.columns().iter().map(|c| c.as_ref()),
1942            probe_row_id,
1943            build_chunk.columns().iter().map(|c| c.as_ref()),
1944            build_row_id,
1945        )
1946    }
1947
1948    fn append_one_probe_row(
1949        chunk_builder: &mut DataChunkBuilder,
1950        probe_chunk: &DataChunk,
1951        probe_row_id: usize,
1952    ) -> Option<DataChunk> {
1953        chunk_builder.append_one_row_from_array_elements(
1954            probe_chunk.columns().iter().map(|c| c.as_ref()),
1955            probe_row_id,
1956            empty(),
1957            0,
1958        )
1959    }
1960
1961    fn append_one_build_row(
1962        chunk_builder: &mut DataChunkBuilder,
1963        build_chunk: &DataChunk,
1964        build_row_id: usize,
1965    ) -> Option<DataChunk> {
1966        chunk_builder.append_one_row_from_array_elements(
1967            empty(),
1968            0,
1969            build_chunk.columns().iter().map(|c| c.as_ref()),
1970            build_row_id,
1971        )
1972    }
1973
1974    fn append_one_row_with_null_build_side(
1975        chunk_builder: &mut DataChunkBuilder,
1976        probe_row_ref: RowRef<'_>,
1977        build_column_count: usize,
1978    ) -> Option<DataChunk> {
1979        chunk_builder.append_one_row(probe_row_ref.chain(repeat_n(Datum::None, build_column_count)))
1980    }
1981
1982    fn append_one_row_with_null_probe_side(
1983        chunk_builder: &mut DataChunkBuilder,
1984        build_row_ref: RowRef<'_>,
1985        probe_column_count: usize,
1986    ) -> Option<DataChunk> {
1987        chunk_builder.append_one_row(repeat_n(Datum::None, probe_column_count).chain(build_row_ref))
1988    }
1989
1990    fn find_asof_matched_rows(
1991        probe_row_ref: RowRef<'_>,
1992        build_side: &[DataChunk],
1993        build_side_row_iter: RowIdIter<'_>,
1994        asof_join_condition: &AsOfDesc,
1995    ) -> Option<RowId> {
1996        let probe_inequality_value = probe_row_ref.datum_at(asof_join_condition.left_idx);
1997        if let Some(probe_inequality_scalar) = probe_inequality_value {
1998            let mut result_row_id: Option<RowId> = None;
1999            let mut build_row_ref;
2000
2001            for build_row_id in build_side_row_iter {
2002                build_row_ref =
2003                    build_side[build_row_id.chunk_id()].row_at_unchecked_vis(build_row_id.row_id());
2004                let build_inequality_value = build_row_ref.datum_at(asof_join_condition.right_idx);
2005                if let Some(build_inequality_scalar) = build_inequality_value {
2006                    let mut pick_result = |compare: fn(Ordering) -> bool| {
2007                        if let Some(result_row_id_inner) = result_row_id {
2008                            let result_row_ref = build_side[result_row_id_inner.chunk_id()]
2009                                .row_at_unchecked_vis(result_row_id_inner.row_id());
2010                            let result_inequality_scalar = result_row_ref
2011                                .datum_at(asof_join_condition.right_idx)
2012                                .unwrap();
2013                            if compare(
2014                                probe_inequality_scalar.default_cmp(&build_inequality_scalar),
2015                            ) && compare(
2016                                probe_inequality_scalar.default_cmp(&result_inequality_scalar),
2017                            ) {
2018                                result_row_id = Some(build_row_id);
2019                            }
2020                        } else if compare(
2021                            probe_inequality_scalar.default_cmp(&build_inequality_scalar),
2022                        ) {
2023                            result_row_id = Some(build_row_id);
2024                        }
2025                    };
2026                    match asof_join_condition.inequality_type {
2027                        AsOfInequalityType::Lt => {
2028                            pick_result(Ordering::is_lt);
2029                        }
2030                        AsOfInequalityType::Le => {
2031                            pick_result(Ordering::is_le);
2032                        }
2033                        AsOfInequalityType::Gt => {
2034                            pick_result(Ordering::is_gt);
2035                        }
2036                        AsOfInequalityType::Ge => {
2037                            pick_result(Ordering::is_ge);
2038                        }
2039                    }
2040                }
2041            }
2042            result_row_id
2043        } else {
2044            None
2045        }
2046    }
2047}
2048
2049/// `DataChunkMutator` transforms the given data chunk for non-equi join.
2050#[repr(transparent)]
2051struct DataChunkMutator(DataChunk);
2052
2053impl DataChunkMutator {
2054    fn nullify_build_side_for_non_equi_condition(
2055        self,
2056        filter: &Bitmap,
2057        probe_column_count: usize,
2058    ) -> Self {
2059        let (mut columns, vis) = self.0.into_parts();
2060
2061        for build_column in columns.split_off(probe_column_count) {
2062            // Is it really safe to use Arc::try_unwrap here?
2063            let mut array = Arc::try_unwrap(build_column).unwrap();
2064            array.set_bitmap(array.null_bitmap() & filter);
2065            columns.push(array.into());
2066        }
2067
2068        Self(DataChunk::new(columns, vis))
2069    }
2070
2071    fn remove_duplicate_rows_for_left_outer_join(
2072        mut self,
2073        filter: &Bitmap,
2074        first_output_row_ids: &mut Vec<usize>,
2075        has_more_output_rows: bool,
2076        found_non_null: &mut bool,
2077    ) -> Self {
2078        let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2079
2080        for (&start_row_id, &end_row_id) in iter::once(&0)
2081            .chain(first_output_row_ids.iter())
2082            .tuple_windows()
2083            .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2084        {
2085            for row_id in start_row_id..end_row_id {
2086                if filter.is_set(row_id) {
2087                    *found_non_null = true;
2088                    new_visibility.set(row_id, true);
2089                }
2090            }
2091            if !*found_non_null {
2092                new_visibility.set(start_row_id, true);
2093            }
2094            *found_non_null = false;
2095        }
2096
2097        let start_row_id = first_output_row_ids.last().copied().unwrap_or_default();
2098        for row_id in start_row_id..filter.len() {
2099            if filter.is_set(row_id) {
2100                *found_non_null = true;
2101                new_visibility.set(row_id, true);
2102            }
2103        }
2104        if !has_more_output_rows {
2105            if !*found_non_null {
2106                new_visibility.set(start_row_id, true);
2107            }
2108            *found_non_null = false;
2109        }
2110
2111        first_output_row_ids.clear();
2112
2113        self.0
2114            .set_visibility(new_visibility.finish() & self.0.visibility());
2115        self
2116    }
2117
2118    /// Removes duplicate rows using `filter`
2119    /// and only returns the first match for each window.
2120    /// Windows are indicated by `first_output_row_ids`.
2121    fn remove_duplicate_rows_for_left_semi_anti_join<const ANTI_JOIN: bool>(
2122        mut self,
2123        filter: &Bitmap,
2124        first_output_row_ids: &mut Vec<usize>,
2125        has_more_output_rows: bool,
2126        found_matched: &mut bool,
2127    ) -> Self {
2128        let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2129
2130        for (&start_row_id, &end_row_id) in iter::once(&0)
2131            .chain(first_output_row_ids.iter())
2132            .tuple_windows()
2133            .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2134        {
2135            for row_id in start_row_id..end_row_id {
2136                if filter.is_set(row_id) {
2137                    if !ANTI_JOIN && !*found_matched {
2138                        new_visibility.set(row_id, true);
2139                    }
2140                    *found_matched = true;
2141                    break;
2142                }
2143            }
2144            if ANTI_JOIN && !*found_matched {
2145                new_visibility.set(start_row_id, true);
2146            }
2147            *found_matched = false;
2148        }
2149
2150        let start_row_id = first_output_row_ids.last().copied().unwrap_or_default();
2151        for row_id in start_row_id..filter.len() {
2152            if filter.is_set(row_id) {
2153                if !ANTI_JOIN && !*found_matched {
2154                    new_visibility.set(row_id, true);
2155                }
2156                *found_matched = true;
2157                break;
2158            }
2159        }
2160        if !has_more_output_rows && ANTI_JOIN {
2161            if !*found_matched {
2162                new_visibility.set(start_row_id, true);
2163            }
2164            *found_matched = false;
2165        }
2166
2167        first_output_row_ids.clear();
2168
2169        self.0
2170            .set_visibility(new_visibility.finish() & self.0.visibility());
2171        self
2172    }
2173
2174    fn remove_duplicate_rows_for_right_outer_join(
2175        mut self,
2176        filter: &Bitmap,
2177        build_row_ids: &mut Vec<RowId>,
2178        build_row_matched: &mut ChunkedData<bool>,
2179    ) -> Self {
2180        let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2181        for (output_row_id, (output_row_non_null, &build_row_id)) in
2182            filter.iter().zip_eq_fast(build_row_ids.iter()).enumerate()
2183        {
2184            if output_row_non_null {
2185                build_row_matched[build_row_id] = true;
2186                new_visibility.set(output_row_id, true);
2187            }
2188        }
2189
2190        build_row_ids.clear();
2191
2192        self.0
2193            .set_visibility(new_visibility.finish() & self.0.visibility());
2194        self
2195    }
2196
2197    fn remove_duplicate_rows_for_right_semi_anti_join(
2198        self,
2199        filter: &Bitmap,
2200        build_row_ids: &mut Vec<RowId>,
2201        build_row_matched: &mut ChunkedData<bool>,
2202    ) {
2203        for (output_row_non_null, &build_row_id) in filter.iter().zip_eq_fast(build_row_ids.iter())
2204        {
2205            if output_row_non_null {
2206                build_row_matched[build_row_id] = true;
2207            }
2208        }
2209
2210        build_row_ids.clear();
2211    }
2212
2213    fn remove_duplicate_rows_for_full_outer_join(
2214        mut self,
2215        filter: &Bitmap,
2216        LeftNonEquiJoinState {
2217            first_output_row_id,
2218            has_more_output_rows,
2219            found_matched,
2220            ..
2221        }: &mut LeftNonEquiJoinState,
2222        RightNonEquiJoinState {
2223            build_row_ids,
2224            build_row_matched,
2225        }: &mut RightNonEquiJoinState,
2226    ) -> Self {
2227        let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2228
2229        for (&start_row_id, &end_row_id) in iter::once(&0)
2230            .chain(first_output_row_id.iter())
2231            .tuple_windows()
2232            .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2233        {
2234            for row_id in start_row_id..end_row_id {
2235                if filter.is_set(row_id) {
2236                    *found_matched = true;
2237                    new_visibility.set(row_id, true);
2238                }
2239            }
2240            if !*found_matched {
2241                new_visibility.set(start_row_id, true);
2242            }
2243            *found_matched = false;
2244        }
2245
2246        let start_row_id = first_output_row_id.last().copied().unwrap_or_default();
2247        for row_id in start_row_id..filter.len() {
2248            if filter.is_set(row_id) {
2249                *found_matched = true;
2250                new_visibility.set(row_id, true);
2251            }
2252        }
2253        if !*has_more_output_rows && !*found_matched {
2254            new_visibility.set(start_row_id, true);
2255        }
2256
2257        first_output_row_id.clear();
2258
2259        for (output_row_id, (output_row_non_null, &build_row_id)) in
2260            filter.iter().zip_eq_fast(build_row_ids.iter()).enumerate()
2261        {
2262            if output_row_non_null {
2263                build_row_matched[build_row_id] = true;
2264                new_visibility.set(output_row_id, true);
2265            }
2266        }
2267
2268        build_row_ids.clear();
2269
2270        self.0
2271            .set_visibility(new_visibility.finish() & self.0.visibility());
2272        self
2273    }
2274
2275    fn take(self) -> DataChunk {
2276        self.0
2277    }
2278}
2279
2280impl BoxedExecutorBuilder for HashJoinExecutor<()> {
2281    async fn new_boxed_executor(
2282        context: &ExecutorBuilder<'_>,
2283        inputs: Vec<BoxedExecutor>,
2284    ) -> Result<BoxedExecutor> {
2285        let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap();
2286
2287        let hash_join_node = try_match_expand!(
2288            context.plan_node().get_node_body().unwrap(),
2289            NodeBody::HashJoin
2290        )?;
2291
2292        let join_type = JoinType::from_prost(hash_join_node.get_join_type()?);
2293
2294        let cond = match hash_join_node.get_condition() {
2295            Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
2296            Err(_) => None,
2297        };
2298
2299        let left_key_idxs = hash_join_node
2300            .get_left_key()
2301            .iter()
2302            .map(|&idx| idx as usize)
2303            .collect_vec();
2304        let right_key_idxs = hash_join_node
2305            .get_right_key()
2306            .iter()
2307            .map(|&idx| idx as usize)
2308            .collect_vec();
2309
2310        ensure!(left_key_idxs.len() == right_key_idxs.len());
2311
2312        let right_data_types = right_child.schema().data_types();
2313        let right_key_types = right_key_idxs
2314            .iter()
2315            .map(|&idx| right_data_types[idx].clone())
2316            .collect_vec();
2317
2318        let output_indices: Vec<usize> = hash_join_node
2319            .get_output_indices()
2320            .iter()
2321            .map(|&x| x as usize)
2322            .collect();
2323
2324        let identity = context.plan_node().get_identity().clone();
2325
2326        let asof_desc = hash_join_node
2327            .asof_desc
2328            .map(|desc| AsOfDesc::from_protobuf(&desc))
2329            .transpose()?;
2330
2331        Ok(HashJoinExecutorArgs {
2332            join_type,
2333            output_indices,
2334            probe_side_source: left_child,
2335            build_side_source: right_child,
2336            probe_key_idxs: left_key_idxs,
2337            build_key_idxs: right_key_idxs,
2338            null_matched: hash_join_node.get_null_safe().clone(),
2339            cond,
2340            identity: identity.clone(),
2341            right_key_types,
2342            chunk_size: context.context().get_config().developer.chunk_size,
2343            asof_desc,
2344            spill_backend: if context.context().get_config().enable_spill {
2345                Some(Disk)
2346            } else {
2347                None
2348            },
2349            spill_metrics: context.context().spill_metrics(),
2350            shutdown_rx: context.shutdown_rx().clone(),
2351            mem_ctx: context.context().create_executor_mem_context(&identity),
2352        }
2353        .dispatch())
2354    }
2355}
2356
2357struct HashJoinExecutorArgs {
2358    join_type: JoinType,
2359    output_indices: Vec<usize>,
2360    probe_side_source: BoxedExecutor,
2361    build_side_source: BoxedExecutor,
2362    probe_key_idxs: Vec<usize>,
2363    build_key_idxs: Vec<usize>,
2364    null_matched: Vec<bool>,
2365    cond: Option<BoxedExpression>,
2366    identity: String,
2367    right_key_types: Vec<DataType>,
2368    chunk_size: usize,
2369    asof_desc: Option<AsOfDesc>,
2370    spill_backend: Option<SpillBackend>,
2371    spill_metrics: Arc<BatchSpillMetrics>,
2372    shutdown_rx: ShutdownToken,
2373    mem_ctx: MemoryContext,
2374}
2375
2376impl HashKeyDispatcher for HashJoinExecutorArgs {
2377    type Output = BoxedExecutor;
2378
2379    fn dispatch_impl<K: HashKey>(self) -> Self::Output {
2380        Box::new(HashJoinExecutor::<K>::new(
2381            self.join_type,
2382            self.output_indices,
2383            self.probe_side_source,
2384            self.build_side_source,
2385            self.probe_key_idxs,
2386            self.build_key_idxs,
2387            self.null_matched,
2388            self.cond.map(Arc::new),
2389            self.identity,
2390            self.chunk_size,
2391            self.asof_desc,
2392            self.spill_backend,
2393            self.spill_metrics,
2394            self.shutdown_rx,
2395            self.mem_ctx,
2396        ))
2397    }
2398
2399    fn data_types(&self) -> &[DataType] {
2400        &self.right_key_types
2401    }
2402}
2403
2404impl<K> HashJoinExecutor<K> {
2405    #[allow(clippy::too_many_arguments)]
2406    pub fn new(
2407        join_type: JoinType,
2408        output_indices: Vec<usize>,
2409        probe_side_source: BoxedExecutor,
2410        build_side_source: BoxedExecutor,
2411        probe_key_idxs: Vec<usize>,
2412        build_key_idxs: Vec<usize>,
2413        null_matched: Vec<bool>,
2414        cond: Option<Arc<BoxedExpression>>,
2415        identity: String,
2416        chunk_size: usize,
2417        asof_desc: Option<AsOfDesc>,
2418        spill_backend: Option<SpillBackend>,
2419        spill_metrics: Arc<BatchSpillMetrics>,
2420        shutdown_rx: ShutdownToken,
2421        mem_ctx: MemoryContext,
2422    ) -> Self {
2423        Self::new_inner(
2424            join_type,
2425            output_indices,
2426            probe_side_source,
2427            build_side_source,
2428            probe_key_idxs,
2429            build_key_idxs,
2430            null_matched,
2431            cond,
2432            identity,
2433            chunk_size,
2434            asof_desc,
2435            spill_backend,
2436            spill_metrics,
2437            None,
2438            shutdown_rx,
2439            mem_ctx,
2440        )
2441    }
2442
2443    #[allow(clippy::too_many_arguments)]
2444    fn new_inner(
2445        join_type: JoinType,
2446        output_indices: Vec<usize>,
2447        probe_side_source: BoxedExecutor,
2448        build_side_source: BoxedExecutor,
2449        probe_key_idxs: Vec<usize>,
2450        build_key_idxs: Vec<usize>,
2451        null_matched: Vec<bool>,
2452        cond: Option<Arc<BoxedExpression>>,
2453        identity: String,
2454        chunk_size: usize,
2455        asof_desc: Option<AsOfDesc>,
2456        spill_backend: Option<SpillBackend>,
2457        spill_metrics: Arc<BatchSpillMetrics>,
2458        memory_upper_bound: Option<u64>,
2459        shutdown_rx: ShutdownToken,
2460        mem_ctx: MemoryContext,
2461    ) -> Self {
2462        assert_eq!(probe_key_idxs.len(), build_key_idxs.len());
2463        assert_eq!(probe_key_idxs.len(), null_matched.len());
2464        let original_schema = match join_type {
2465            JoinType::LeftSemi | JoinType::LeftAnti => probe_side_source.schema().clone(),
2466            JoinType::RightSemi | JoinType::RightAnti => build_side_source.schema().clone(),
2467            _ => Schema::from_iter(
2468                probe_side_source
2469                    .schema()
2470                    .fields()
2471                    .iter()
2472                    .chain(build_side_source.schema().fields().iter())
2473                    .cloned(),
2474            ),
2475        };
2476        let schema = Schema::from_iter(
2477            output_indices
2478                .iter()
2479                .map(|&idx| original_schema[idx].clone()),
2480        );
2481        Self {
2482            join_type,
2483            original_schema,
2484            schema,
2485            output_indices,
2486            probe_side_source,
2487            build_side_source,
2488            probe_key_idxs,
2489            build_key_idxs,
2490            null_matched,
2491            cond,
2492            identity,
2493            chunk_size,
2494            asof_desc,
2495            shutdown_rx,
2496            spill_backend,
2497            spill_metrics,
2498            memory_upper_bound,
2499            mem_ctx,
2500            _phantom: PhantomData,
2501        }
2502    }
2503}
2504
2505#[cfg(test)]
2506mod tests {
2507    use futures::StreamExt;
2508    use futures_async_stream::for_await;
2509    use itertools::Itertools;
2510    use risingwave_common::array::{ArrayBuilderImpl, DataChunk};
2511    use risingwave_common::catalog::{Field, Schema};
2512    use risingwave_common::hash::Key32;
2513    use risingwave_common::memory::MemoryContext;
2514    use risingwave_common::metrics::LabelGuardedIntGauge;
2515    use risingwave_common::test_prelude::DataChunkTestExt;
2516    use risingwave_common::types::DataType;
2517    use risingwave_common::util::iter_util::ZipEqDebug;
2518    use risingwave_common::util::memcmp_encoding::encode_chunk;
2519    use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
2520    use risingwave_expr::expr::{BoxedExpression, build_from_pretty};
2521
2522    use super::{
2523        ChunkedData, HashJoinExecutor, JoinType, LeftNonEquiJoinState, RightNonEquiJoinState, RowId,
2524    };
2525    use crate::error::Result;
2526    use crate::executor::BoxedExecutor;
2527    use crate::executor::test_utils::MockExecutor;
2528    use crate::monitor::BatchSpillMetrics;
2529    use crate::spill::spill_op::SpillBackend;
2530    use crate::task::ShutdownToken;
2531
2532    const CHUNK_SIZE: usize = 1024;
2533
2534    struct DataChunkMerger {
2535        array_builders: Vec<ArrayBuilderImpl>,
2536        array_len: usize,
2537    }
2538
2539    impl DataChunkMerger {
2540        fn new(data_types: Vec<DataType>) -> Result<Self> {
2541            let array_builders = data_types
2542                .iter()
2543                .map(|data_type| data_type.create_array_builder(CHUNK_SIZE))
2544                .collect();
2545
2546            Ok(Self {
2547                array_builders,
2548                array_len: 0,
2549            })
2550        }
2551
2552        fn append(&mut self, data_chunk: &DataChunk) -> Result<()> {
2553            ensure!(self.array_builders.len() == data_chunk.dimension());
2554            for idx in 0..self.array_builders.len() {
2555                self.array_builders[idx].append_array(data_chunk.column_at(idx));
2556            }
2557            self.array_len += data_chunk.capacity();
2558
2559            Ok(())
2560        }
2561
2562        fn finish(self) -> Result<DataChunk> {
2563            let columns = self
2564                .array_builders
2565                .into_iter()
2566                .map(|b| b.finish().into())
2567                .collect();
2568
2569            Ok(DataChunk::new(columns, self.array_len))
2570        }
2571    }
2572
2573    /// Sort each row in the data chunk and compare with the rows in the data chunk.
2574    fn compare_data_chunk_with_rowsort(left: &DataChunk, right: &DataChunk) -> bool {
2575        assert!(left.is_compacted());
2576        assert!(right.is_compacted());
2577
2578        if left.cardinality() != right.cardinality() {
2579            return false;
2580        }
2581
2582        // Sort and compare
2583        let column_orders = (0..left.columns().len())
2584            .map(|i| ColumnOrder::new(i, OrderType::ascending()))
2585            .collect_vec();
2586        let left_encoded_chunk = encode_chunk(left, &column_orders).unwrap();
2587        let mut sorted_left = left_encoded_chunk
2588            .into_iter()
2589            .enumerate()
2590            .map(|(row_id, row)| (left.row_at_unchecked_vis(row_id), row))
2591            .collect_vec();
2592        sorted_left.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
2593
2594        let right_encoded_chunk = encode_chunk(right, &column_orders).unwrap();
2595        let mut sorted_right = right_encoded_chunk
2596            .into_iter()
2597            .enumerate()
2598            .map(|(row_id, row)| (right.row_at_unchecked_vis(row_id), row))
2599            .collect_vec();
2600        sorted_right.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
2601
2602        sorted_left
2603            .into_iter()
2604            .map(|(row, _)| row)
2605            .zip_eq_debug(sorted_right.into_iter().map(|(row, _)| row))
2606            .all(|(row1, row2)| row1 == row2)
2607    }
2608
2609    struct TestFixture {
2610        left_types: Vec<DataType>,
2611        right_types: Vec<DataType>,
2612        join_type: JoinType,
2613    }
2614
2615    /// Sql for creating test data:
2616    /// ```sql
2617    /// drop table t1 if exists;
2618    /// create table t1(v1 int, v2 float);
2619    /// insert into t1 values
2620    /// (1, 6.1::FLOAT), (2, null), (null, 8.4::FLOAT), (3, 3.9::FLOAT), (null, null),
2621    /// (4, 6.6::FLOAT), (3, null), (null, 0.7::FLOAT), (5, null), (null, 5.5::FLOAT);
2622    ///
2623    /// drop table t2 if exists;
2624    /// create table t2(v1 int, v2 real);
2625    /// insert into t2 values
2626    /// (8, 6.1::REAL), (2, null), (null, 8.9::REAL), (3, null), (null, 3.5::REAL),
2627    /// (6, null), (4, 7.5::REAL), (6, null), (null, 8::REAL), (7, null),
2628    /// (null, 9.1::REAL), (9, null), (3, 3.7::REAL), (9, null), (null, 9.6::REAL),
2629    /// (100, null), (null, 8.18::REAL), (200, null);
2630    /// ```
2631    impl TestFixture {
2632        fn with_join_type(join_type: JoinType) -> Self {
2633            Self {
2634                left_types: vec![DataType::Int32, DataType::Float32],
2635                right_types: vec![DataType::Int32, DataType::Float64],
2636                join_type,
2637            }
2638        }
2639
2640        fn create_left_executor(&self) -> BoxedExecutor {
2641            let schema = Schema {
2642                fields: vec![
2643                    Field::unnamed(DataType::Int32),
2644                    Field::unnamed(DataType::Float32),
2645                ],
2646            };
2647            let mut executor = MockExecutor::new(schema);
2648
2649            executor.add(DataChunk::from_pretty(
2650                "i f
2651                 1 6.1
2652                 2 .
2653                 . 8.4
2654                 3 3.9
2655                 . .  ",
2656            ));
2657
2658            executor.add(DataChunk::from_pretty(
2659                "i f
2660                 4 6.6
2661                 3 .
2662                 . 0.7
2663                 5 .
2664                 . 5.5",
2665            ));
2666
2667            Box::new(executor)
2668        }
2669
2670        fn create_right_executor(&self) -> BoxedExecutor {
2671            let schema = Schema {
2672                fields: vec![
2673                    Field::unnamed(DataType::Int32),
2674                    Field::unnamed(DataType::Float64),
2675                ],
2676            };
2677            let mut executor = MockExecutor::new(schema);
2678
2679            executor.add(DataChunk::from_pretty(
2680                "i F
2681                 8 6.1
2682                 2 .
2683                 . 8.9
2684                 3 .
2685                 . 3.5
2686                 6 .  ",
2687            ));
2688
2689            executor.add(DataChunk::from_pretty(
2690                "i F
2691                 4 7.5
2692                 6 .
2693                 . 8
2694                 7 .
2695                 . 9.1
2696                 9 .  ",
2697            ));
2698
2699            executor.add(DataChunk::from_pretty(
2700                "  i F
2701                   3 3.7
2702                   9 .
2703                   . 9.6
2704                 100 .
2705                   . 8.18
2706                 200 .   ",
2707            ));
2708
2709            Box::new(executor)
2710        }
2711
2712        fn output_data_types(&self) -> Vec<DataType> {
2713            let join_type = self.join_type;
2714            if join_type.keep_all() {
2715                [self.left_types.clone(), self.right_types.clone()].concat()
2716            } else if join_type.keep_left() {
2717                self.left_types.clone()
2718            } else if join_type.keep_right() {
2719                self.right_types.clone()
2720            } else {
2721                unreachable!()
2722            }
2723        }
2724
2725        fn create_cond() -> BoxedExpression {
2726            build_from_pretty("(less_than:boolean $1:float4 $3:float8)")
2727        }
2728
2729        fn create_join_executor_with_chunk_size_and_executors(
2730            &self,
2731            has_non_equi_cond: bool,
2732            null_safe: bool,
2733            chunk_size: usize,
2734            left_child: BoxedExecutor,
2735            right_child: BoxedExecutor,
2736            shutdown_rx: ShutdownToken,
2737            parent_mem_ctx: Option<MemoryContext>,
2738            test_spill: bool,
2739        ) -> BoxedExecutor {
2740            let join_type = self.join_type;
2741
2742            let output_indices = (0..match join_type {
2743                JoinType::LeftSemi | JoinType::LeftAnti => left_child.schema().fields().len(),
2744                JoinType::RightSemi | JoinType::RightAnti => right_child.schema().fields().len(),
2745                _ => left_child.schema().fields().len() + right_child.schema().fields().len(),
2746            })
2747                .collect();
2748
2749            let cond = if has_non_equi_cond {
2750                Some(Self::create_cond().into())
2751            } else {
2752                None
2753            };
2754
2755            let mem_ctx = if test_spill {
2756                MemoryContext::new_with_mem_limit(
2757                    parent_mem_ctx,
2758                    LabelGuardedIntGauge::test_int_gauge::<4>(),
2759                    0,
2760                )
2761            } else {
2762                MemoryContext::new(parent_mem_ctx, LabelGuardedIntGauge::test_int_gauge::<4>())
2763            };
2764            Box::new(HashJoinExecutor::<Key32>::new(
2765                join_type,
2766                output_indices,
2767                left_child,
2768                right_child,
2769                vec![0],
2770                vec![0],
2771                vec![null_safe],
2772                cond,
2773                "HashJoinExecutor".to_owned(),
2774                chunk_size,
2775                None,
2776                if test_spill {
2777                    Some(SpillBackend::Memory)
2778                } else {
2779                    None
2780                },
2781                BatchSpillMetrics::for_test(),
2782                shutdown_rx,
2783                mem_ctx,
2784            ))
2785        }
2786
2787        async fn do_test(&self, expected: DataChunk, has_non_equi_cond: bool, null_safe: bool) {
2788            let left_executor = self.create_left_executor();
2789            let right_executor = self.create_right_executor();
2790            self.do_test_with_chunk_size_and_executors(
2791                expected.clone(),
2792                has_non_equi_cond,
2793                null_safe,
2794                self::CHUNK_SIZE,
2795                left_executor,
2796                right_executor,
2797                false,
2798            )
2799            .await;
2800
2801            // Test spill
2802            let left_executor = self.create_left_executor();
2803            let right_executor = self.create_right_executor();
2804            self.do_test_with_chunk_size_and_executors(
2805                expected,
2806                has_non_equi_cond,
2807                null_safe,
2808                self::CHUNK_SIZE,
2809                left_executor,
2810                right_executor,
2811                true,
2812            )
2813            .await;
2814        }
2815
2816        async fn do_test_with_chunk_size_and_executors(
2817            &self,
2818            expected: DataChunk,
2819            has_non_equi_cond: bool,
2820            null_safe: bool,
2821            chunk_size: usize,
2822            left_executor: BoxedExecutor,
2823            right_executor: BoxedExecutor,
2824            test_spill: bool,
2825        ) {
2826            let parent_mem_context =
2827                MemoryContext::root(LabelGuardedIntGauge::test_int_gauge::<4>(), u64::MAX);
2828
2829            {
2830                let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2831                    has_non_equi_cond,
2832                    null_safe,
2833                    chunk_size,
2834                    left_executor,
2835                    right_executor,
2836                    ShutdownToken::empty(),
2837                    Some(parent_mem_context.clone()),
2838                    test_spill,
2839                );
2840
2841                let mut data_chunk_merger = DataChunkMerger::new(self.output_data_types()).unwrap();
2842
2843                let fields = &join_executor.schema().fields;
2844
2845                if self.join_type.keep_all() {
2846                    assert_eq!(fields[1].data_type, DataType::Float32);
2847                    assert_eq!(fields[3].data_type, DataType::Float64);
2848                } else if self.join_type.keep_left() {
2849                    assert_eq!(fields[1].data_type, DataType::Float32);
2850                } else if self.join_type.keep_right() {
2851                    assert_eq!(fields[1].data_type, DataType::Float64)
2852                } else {
2853                    unreachable!()
2854                }
2855
2856                let mut stream = join_executor.execute();
2857
2858                while let Some(data_chunk) = stream.next().await {
2859                    let data_chunk = data_chunk.unwrap();
2860                    let data_chunk = data_chunk.compact();
2861                    data_chunk_merger.append(&data_chunk).unwrap();
2862                }
2863
2864                let result_chunk = data_chunk_merger.finish().unwrap();
2865                println!("expected: {:?}", expected);
2866                println!("result: {:?}", result_chunk);
2867
2868                // TODO: Replace this with unsorted comparison
2869                // assert_eq!(expected, result_chunk);
2870                assert!(compare_data_chunk_with_rowsort(&expected, &result_chunk));
2871            }
2872
2873            assert_eq!(0, parent_mem_context.get_bytes_used());
2874        }
2875
2876        async fn do_test_shutdown(&self, has_non_equi_cond: bool) {
2877            // Test `ShutdownMsg::Cancel`
2878            let left_executor = self.create_left_executor();
2879            let right_executor = self.create_right_executor();
2880            let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
2881            let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2882                has_non_equi_cond,
2883                false,
2884                self::CHUNK_SIZE,
2885                left_executor,
2886                right_executor,
2887                shutdown_rx,
2888                None,
2889                false,
2890            );
2891            shutdown_tx.cancel();
2892            #[for_await]
2893            for chunk in join_executor.execute() {
2894                assert!(chunk.is_err());
2895                break;
2896            }
2897
2898            // Test `ShutdownMsg::Abort`
2899            let left_executor = self.create_left_executor();
2900            let right_executor = self.create_right_executor();
2901            let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
2902            let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2903                has_non_equi_cond,
2904                false,
2905                self::CHUNK_SIZE,
2906                left_executor,
2907                right_executor,
2908                shutdown_rx,
2909                None,
2910                false,
2911            );
2912            shutdown_tx.abort("test");
2913            #[for_await]
2914            for chunk in join_executor.execute() {
2915                assert!(chunk.is_err());
2916                break;
2917            }
2918        }
2919    }
2920
2921    /// Sql:
2922    /// ```sql
2923    /// select * from t1 join t2 on t1.v1 = t2.v1;
2924    /// ```
2925    #[tokio::test]
2926    async fn test_inner_join() {
2927        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2928
2929        let expected_chunk = DataChunk::from_pretty(
2930            "i   f   i   F
2931             2   .   2   .
2932             3   3.9 3   3.7
2933             3   3.9 3   .
2934             4   6.6 4   7.5
2935             3   .   3   3.7
2936             3   .   3   .",
2937        );
2938
2939        test_fixture.do_test(expected_chunk, false, false).await;
2940    }
2941
2942    /// Sql:
2943    /// ```sql
2944    /// select * from t1 join t2 on t1.v1 is not distinct from t2.v1;
2945    /// ```
2946    #[tokio::test]
2947    async fn test_null_safe_inner_join() {
2948        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2949
2950        let expected_chunk = DataChunk::from_pretty(
2951            "i   f   i   F
2952             2    .  2     .
2953             .  8.4  .  8.18
2954             .  8.4  .  9.6
2955             .  8.4  .  9.1
2956             .  8.4  .  8
2957             .  8.4  .  3.5
2958             .  8.4  .  8.9
2959             3  3.9  3  3.7
2960             3  3.9  3     .
2961             .    .  .  8.18
2962             .    .  .  9.6
2963             .    .  .  9.1
2964             .    .  .  8
2965             .    .  .  3.5
2966             .    .  .  8.9
2967             4  6.6  4  7.5
2968             3    .  3  3.7
2969             3    .  3     .
2970             .  0.7  .  8.18
2971             .  0.7  .  9.6
2972             .  0.7  .  9.1
2973             .  0.7  .  8
2974             .  0.7  .  3.5
2975             .  0.7  .  8.9
2976             .  5.5  .  8.18
2977             .  5.5  .  9.6
2978             .  5.5  .  9.1
2979             .  5.5  .  8
2980             .  5.5  .  3.5
2981             .  5.5  .  8.9",
2982        );
2983
2984        test_fixture.do_test(expected_chunk, false, true).await;
2985    }
2986
2987    /// Sql:
2988    /// ```sql
2989    /// select * from t1 join t2 on t1.v1 = t2.v1 and t1.v2 < t2.v2;
2990    /// ```
2991    #[tokio::test]
2992    async fn test_inner_join_with_non_equi_condition() {
2993        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2994
2995        let expected_chunk = DataChunk::from_pretty(
2996            "i   f   i   F
2997             4   6.6 4   7.5",
2998        );
2999
3000        test_fixture.do_test(expected_chunk, true, false).await;
3001    }
3002
3003    /// Sql:
3004    /// ```sql
3005    /// select t1.v2 as t1_v2, t2.v2 as t2_v2 from t1 left outer join t2 on t1.v1 = t2.v1;
3006    /// ```
3007    #[tokio::test]
3008    async fn test_left_outer_join() {
3009        let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
3010
3011        let expected_chunk = DataChunk::from_pretty(
3012            "i   f   i   F
3013             1   6.1 .   .
3014             2   .   2   .
3015             .   8.4 .   .
3016             3   3.9 3   3.7
3017             3   3.9 3   .
3018             .   .   .   .
3019             4   6.6 4   7.5
3020             3   .   3   3.7
3021             3   .   3   .
3022             .   0.7 .   .
3023             5   .   .   .
3024             .   5.5 .   .",
3025        );
3026
3027        test_fixture.do_test(expected_chunk, false, false).await;
3028    }
3029
3030    /// Sql:
3031    /// ```sql
3032    /// select * from t1 left outer join t2 on t1.v1 = t2.v1 and t1.v2 < t2.v2;
3033    /// ```
3034    #[tokio::test]
3035    async fn test_left_outer_join_with_non_equi_condition() {
3036        let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
3037
3038        let expected_chunk = DataChunk::from_pretty(
3039            "i   f   i   F
3040             2   .   .   .
3041             3   3.9 .   .
3042             4   6.6 4   7.5
3043             3   .   .   .
3044             1   6.1 .   .
3045             .   8.4 .   .
3046             .   .   .   .
3047             .   0.7 .   .
3048             5   .   .   .
3049             .   5.5 .   .",
3050        );
3051
3052        test_fixture.do_test(expected_chunk, true, false).await;
3053    }
3054
3055    /// Sql:
3056    /// ```sql
3057    /// select * from t1 right outer join t2 on t1.v1 = t2.v1;
3058    /// ```
3059    #[tokio::test]
3060    async fn test_right_outer_join() {
3061        let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
3062
3063        let expected_chunk = DataChunk::from_pretty(
3064            "i   f   i   F
3065             2   .   2   .
3066             3   3.9 3   3.7
3067             3   3.9 3   .
3068             4   6.6 4   7.5
3069             3   .   3   3.7
3070             3   .   3   .
3071             .   .   8   6.1
3072             .   .   .   8.9
3073             .   .   .   3.5
3074             .   .   6   .
3075             .   .   6   .
3076             .   .   .   8
3077             .   .   7   .
3078             .   .   .   9.1
3079             .   .   9   .
3080             .   .   9   .
3081             .   .   .   9.6
3082             .   .   100 .
3083             .   .   .   8.18
3084             .   .   200 .",
3085        );
3086
3087        test_fixture.do_test(expected_chunk, false, false).await;
3088    }
3089
3090    /// Sql:
3091    /// ```sql
3092    /// select * from t1 left outer join t2 on t1.v1 = t2.v1 and t1.v2 < t2.v2;
3093    /// ```
3094    #[tokio::test]
3095    async fn test_right_outer_join_with_non_equi_condition() {
3096        let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
3097
3098        let expected_chunk = DataChunk::from_pretty(
3099            "i   f   i   F
3100             4   6.6 4   7.5
3101             .   .   8   6.1
3102             .   .   2   .
3103             .   .   .   8.9
3104             .   .   3   .
3105             .   .   .   3.5
3106             .   .   6   .
3107             .   .   6   .
3108             .   .   .   8
3109             .   .   7   .
3110             .   .   .   9.1
3111             .   .   9   .
3112             .   .   3   3.7
3113             .   .   9   .
3114             .   .   .   9.6
3115             .   .   100 .
3116             .   .   .   8.18
3117             .   .   200 .",
3118        );
3119
3120        test_fixture.do_test(expected_chunk, true, false).await;
3121    }
3122
3123    /// ```sql
3124    /// select * from t1 full outer join t2 on t1.v1 = t2.v1;
3125    /// ```
3126    #[tokio::test]
3127    async fn test_full_outer_join() {
3128        let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
3129
3130        let expected_chunk = DataChunk::from_pretty(
3131            "i   f   i   F
3132             1   6.1 .   .
3133             2   .   2   .
3134             .   8.4 .   .
3135             3   3.9 3   3.7
3136             3   3.9 3   .
3137             .   .   .   .
3138             4   6.6 4   7.5
3139             3   .   3   3.7
3140             3   .   3   .
3141             .   0.7 .   .
3142             5   .   .   .
3143             .   5.5 .   .
3144             .   .   8   6.1
3145             .   .   .   8.9
3146             .   .   .   3.5
3147             .   .   6   .
3148             .   .   6   .
3149             .   .   .   8
3150             .   .   7   .
3151             .   .   .   9.1
3152             .   .   9   .
3153             .   .   9   .
3154             .   .   .   9.6
3155             .   .   100 .
3156             .   .   .   8.18
3157             .   .   200 .",
3158        );
3159
3160        test_fixture.do_test(expected_chunk, false, false).await;
3161    }
3162
3163    /// ```sql
3164    /// select * from t1 full outer join t2 on t1.v1 = t2.v1 and t1.v2 < t2.v2;
3165    /// ```
3166    #[tokio::test]
3167    async fn test_full_outer_join_with_non_equi_condition() {
3168        let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
3169
3170        let expected_chunk = DataChunk::from_pretty(
3171            "i   f   i   F
3172             2   .   .   .
3173             3   3.9 .   .
3174             4   6.6 4   7.5
3175             3   .   .   .
3176             1   6.1 .   .
3177             .   8.4 .   .
3178             .   .   .   .
3179             .   0.7 .   .
3180             5   .   .   .
3181             .   5.5 .   .
3182             .   .   8   6.1
3183             .   .   2   .
3184             .   .   .   8.9
3185             .   .   3   .
3186             .   .   .   3.5
3187             .   .   6   .
3188             .   .   6   .
3189             .   .   .   8
3190             .   .   7   .
3191             .   .   .   9.1
3192             .   .   9   .
3193             .   .   3   3.7
3194             .   .   9   .
3195             .   .   .   9.6
3196             .   .   100 .
3197             .   .   .   8.18
3198             .   .   200 .",
3199        );
3200
3201        test_fixture.do_test(expected_chunk, true, false).await;
3202    }
3203
3204    #[tokio::test]
3205    async fn test_left_anti_join() {
3206        let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
3207
3208        let expected_chunk = DataChunk::from_pretty(
3209            "i   f
3210             1   6.1
3211             .   8.4
3212             .   .
3213             .   0.7
3214             5   .
3215             .   5.5",
3216        );
3217
3218        test_fixture.do_test(expected_chunk, false, false).await;
3219    }
3220
3221    #[tokio::test]
3222    async fn test_left_anti_join_with_non_equi_condition() {
3223        let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
3224
3225        let expected_chunk = DataChunk::from_pretty(
3226            "i   f
3227             2   .
3228             3   3.9
3229             3   .
3230             1   6.1
3231             .   8.4
3232             .   .
3233             .   0.7
3234             5   .
3235             .   5.5",
3236        );
3237
3238        test_fixture.do_test(expected_chunk, true, false).await;
3239    }
3240
3241    #[tokio::test]
3242    async fn test_left_semi_join() {
3243        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3244
3245        let expected_chunk = DataChunk::from_pretty(
3246            "i   f
3247             2   .
3248             3   3.9
3249             4   6.6
3250             3   .",
3251        );
3252
3253        test_fixture.do_test(expected_chunk, false, false).await;
3254    }
3255
3256    #[tokio::test]
3257    async fn test_left_semi_join_with_non_equi_condition() {
3258        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3259
3260        let expected_chunk = DataChunk::from_pretty(
3261            "i   f
3262             4   6.6",
3263        );
3264
3265        test_fixture.do_test(expected_chunk, true, false).await;
3266    }
3267
3268    /// Tests handling of edge case:
3269    /// Match is found for a `probe_row`,
3270    /// but there are still candidate rows in the iterator for that `probe_row`.
3271    /// These should not be buffered or we will have duplicate rows in output.
3272    #[tokio::test]
3273    async fn test_left_semi_join_with_non_equi_condition_duplicates() {
3274        let schema = Schema {
3275            fields: vec![
3276                Field::unnamed(DataType::Int32),
3277                Field::unnamed(DataType::Float32),
3278            ],
3279        };
3280
3281        // Build side
3282        let mut left_executor = MockExecutor::new(schema);
3283        left_executor.add(DataChunk::from_pretty(
3284            "i f
3285                 1 1.0
3286                 1 1.0
3287                 1 1.0
3288                 1 1.0
3289                 2 1.0",
3290        ));
3291
3292        // Probe side
3293        let schema = Schema {
3294            fields: vec![
3295                Field::unnamed(DataType::Int32),
3296                Field::unnamed(DataType::Float64),
3297            ],
3298        };
3299        let mut right_executor = MockExecutor::new(schema);
3300        right_executor.add(DataChunk::from_pretty(
3301            "i F
3302                 1 2.0
3303                 1 2.0
3304                 1 2.0
3305                 1 2.0
3306                 2 2.0",
3307        ));
3308
3309        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3310        let expected_chunk = DataChunk::from_pretty(
3311            "i f
3312            1 1.0
3313            1 1.0
3314            1 1.0
3315            1 1.0
3316            2 1.0",
3317        );
3318
3319        test_fixture
3320            .do_test_with_chunk_size_and_executors(
3321                expected_chunk,
3322                true,
3323                false,
3324                3,
3325                Box::new(left_executor),
3326                Box::new(right_executor),
3327                false,
3328            )
3329            .await;
3330    }
3331
3332    #[tokio::test]
3333    async fn test_right_anti_join() {
3334        let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
3335
3336        let expected_chunk = DataChunk::from_pretty(
3337            "i   F
3338             8   6.1
3339             .   8.9
3340             .   3.5
3341             6   .
3342             6   .
3343             .   8.0
3344             7   .
3345             .   9.1
3346             9   .
3347             9   .
3348             .   9.6
3349             100 .
3350             .   8.18
3351             200 .",
3352        );
3353
3354        test_fixture.do_test(expected_chunk, false, false).await;
3355    }
3356
3357    #[tokio::test]
3358    async fn test_right_anti_join_with_non_equi_condition() {
3359        let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
3360
3361        let expected_chunk = DataChunk::from_pretty(
3362            "i   F
3363             8   6.1
3364             2   .
3365             .   8.9
3366             3   .
3367             .   3.5
3368             6   .
3369             6   .
3370             .   8
3371             7   .
3372             .   9.1
3373             9   .
3374             3   3.7
3375             9   .
3376             .   9.6
3377             100 .
3378             .   8.18
3379             200 .",
3380        );
3381
3382        test_fixture.do_test(expected_chunk, true, false).await;
3383    }
3384
3385    #[tokio::test]
3386    async fn test_right_semi_join() {
3387        let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
3388
3389        let expected_chunk = DataChunk::from_pretty(
3390            "i   F
3391             2   .
3392             3   .
3393             4   7.5
3394             3   3.7",
3395        );
3396
3397        test_fixture.do_test(expected_chunk, false, false).await;
3398    }
3399
3400    #[tokio::test]
3401    async fn test_right_semi_join_with_non_equi_condition() {
3402        let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
3403
3404        let expected_chunk = DataChunk::from_pretty(
3405            "i   F
3406             4   7.5",
3407        );
3408
3409        test_fixture.do_test(expected_chunk, true, false).await;
3410    }
3411
3412    #[tokio::test]
3413    async fn test_process_left_outer_join_non_equi_condition() {
3414        let chunk = DataChunk::from_pretty(
3415            "i   f   i   F
3416             1   3.5 1   5.5
3417             1   3.5 1   2.5
3418             2   4.0 .   .
3419             3   5.0 3   4.0
3420             3   5.0 3   3.0
3421             3   5.0 3   4.0
3422             3   5.0 3   3.0
3423             4   1.0 4   0
3424             4   1.0 4   9.0",
3425        );
3426        let expect = DataChunk::from_pretty(
3427            "i   f   i   F
3428             1   3.5 1   5.5
3429             2   4.0 .   .
3430             3   5.0 .   .
3431             3   5.0 .   .
3432             4   1.0 4   9.0",
3433        );
3434        let cond = TestFixture::create_cond();
3435        let mut state = LeftNonEquiJoinState {
3436            probe_column_count: 2,
3437            first_output_row_id: vec![0, 2, 3, 5, 7],
3438            has_more_output_rows: true,
3439            found_matched: false,
3440        };
3441        assert!(compare_data_chunk_with_rowsort(
3442            &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3443                chunk,
3444                cond.as_ref(),
3445                &mut state
3446            )
3447            .await
3448            .unwrap()
3449            .compact(),
3450            &expect
3451        ));
3452        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3453        assert!(state.found_matched);
3454
3455        let chunk = DataChunk::from_pretty(
3456            "i   f   i   F
3457             4   1.0 4   0.6
3458             4   1.0 4   2.0
3459             5   4.0 5   .
3460             6   7.0 6   .
3461             6   7.0 6   5.0",
3462        );
3463        let expect = DataChunk::from_pretty(
3464            "i   f   i   F
3465             4   1.0 4   2.0
3466             5   4.0 .   .
3467             6   7.0 .   .",
3468        );
3469        state.first_output_row_id = vec![2, 3];
3470        state.has_more_output_rows = false;
3471        assert!(compare_data_chunk_with_rowsort(
3472            &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3473                chunk,
3474                cond.as_ref(),
3475                &mut state
3476            )
3477            .await
3478            .unwrap()
3479            .compact(),
3480            &expect
3481        ));
3482        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3483        assert!(!state.found_matched);
3484
3485        let chunk = DataChunk::from_pretty(
3486            "i   f   i   F
3487             4   1.0 4   0.6
3488             4   1.0 4   1.0
3489             5   4.0 5   .
3490             6   7.0 6   .
3491             6   7.0 6   8.0",
3492        );
3493        let expect = DataChunk::from_pretty(
3494            "i   f   i   F
3495             4   1.0 .   .
3496             5   4.0 .   .
3497             6   7.0 6   8.0",
3498        );
3499        state.first_output_row_id = vec![2, 3];
3500        state.has_more_output_rows = false;
3501        assert!(compare_data_chunk_with_rowsort(
3502            &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3503                chunk,
3504                cond.as_ref(),
3505                &mut state
3506            )
3507            .await
3508            .unwrap()
3509            .compact(),
3510            &expect
3511        ));
3512        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3513        assert!(!state.found_matched);
3514    }
3515
3516    #[tokio::test]
3517    async fn test_process_left_semi_join_non_equi_condition() {
3518        let chunk = DataChunk::from_pretty(
3519            "i   f   i   F
3520             1   3.5 1   5.5
3521             1   3.5 1   2.5
3522             2   4.0 .   .
3523             3   5.0 3   4.0
3524             3   5.0 3   3.0
3525             3   5.0 3   4.0
3526             3   5.0 3   3.0
3527             4   1.0 4   0
3528             4   1.0 4   0.5",
3529        );
3530        let expect = DataChunk::from_pretty(
3531            "i   f   i   F
3532             1   3.5 1   5.5",
3533        );
3534        let cond = TestFixture::create_cond();
3535        let mut state = LeftNonEquiJoinState {
3536            probe_column_count: 2,
3537            first_output_row_id: vec![0, 2, 3, 5, 7],
3538            found_matched: false,
3539            ..Default::default()
3540        };
3541        assert!(compare_data_chunk_with_rowsort(
3542            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3543                chunk,
3544                cond.as_ref(),
3545                &mut state
3546            )
3547            .await
3548            .unwrap()
3549            .compact(),
3550            &expect
3551        ));
3552        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3553        assert!(!state.found_matched);
3554
3555        let chunk = DataChunk::from_pretty(
3556            "i   f   i   F
3557             4   1.0 4   0.6
3558             4   1.0 4   2.0
3559             5   4.0 5   .
3560             6   7.0 6   .
3561             6   7.0 6   5.0",
3562        );
3563        let expect = DataChunk::from_pretty(
3564            "i   f   i   F
3565             4   1.0 4   2.0",
3566        );
3567        state.first_output_row_id = vec![2, 3];
3568        assert!(compare_data_chunk_with_rowsort(
3569            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3570                chunk,
3571                cond.as_ref(),
3572                &mut state
3573            )
3574            .await
3575            .unwrap()
3576            .compact(),
3577            &expect
3578        ));
3579        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3580        assert!(!state.found_matched);
3581
3582        let chunk = DataChunk::from_pretty(
3583            "i   f   i   F
3584             4   1.0 4   0.6
3585             4   1.0 4   1.0
3586             5   4.0 5   .
3587             6   7.0 6   .
3588             6   7.0 6   8.0",
3589        );
3590        let expect = DataChunk::from_pretty(
3591            "i   f   i   F
3592             6   7.0 6   8.0",
3593        );
3594        state.first_output_row_id = vec![2, 3];
3595        assert!(compare_data_chunk_with_rowsort(
3596            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3597                chunk,
3598                cond.as_ref(),
3599                &mut state
3600            )
3601            .await
3602            .unwrap()
3603            .compact(),
3604            &expect
3605        ));
3606        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3607    }
3608
3609    #[tokio::test]
3610    async fn test_process_left_anti_join_non_equi_condition() {
3611        let chunk = DataChunk::from_pretty(
3612            "i   f   i   F
3613             1   3.5 1   5.5
3614             1   3.5 1   2.5
3615             2   4.0 .   .
3616             3   5.0 3   4.0
3617             3   5.0 3   3.0
3618             3   5.0 3   4.0
3619             3   5.0 3   3.0
3620             4   1.0 4   0
3621             4   1.0 4   0.5",
3622        );
3623        let expect = DataChunk::from_pretty(
3624            "i   f   i   F
3625             2   4.0 .   .
3626             3   5.0 3   4.0
3627             3   5.0 3   4.0",
3628        );
3629        let cond = TestFixture::create_cond();
3630        let mut state = LeftNonEquiJoinState {
3631            probe_column_count: 2,
3632            first_output_row_id: vec![0, 2, 3, 5, 7],
3633            has_more_output_rows: true,
3634            found_matched: false,
3635        };
3636        assert!(compare_data_chunk_with_rowsort(
3637            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3638                chunk,
3639                cond.as_ref(),
3640                &mut state
3641            )
3642            .await
3643            .unwrap()
3644            .compact(),
3645            &expect
3646        ));
3647        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3648        assert!(!state.found_matched);
3649
3650        let chunk = DataChunk::from_pretty(
3651            "i   f   i   F
3652             4   1.0 4   0.6
3653             4   1.0 4   2.0
3654             5   4.0 5   .
3655             6   7.0 6   .
3656             6   7.0 6   5.0",
3657        );
3658        let expect = DataChunk::from_pretty(
3659            "i   f   i   F
3660             5   4.0 5   .
3661             6   7.0 6   .",
3662        );
3663        state.first_output_row_id = vec![2, 3];
3664        state.has_more_output_rows = false;
3665        assert!(compare_data_chunk_with_rowsort(
3666            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3667                chunk,
3668                cond.as_ref(),
3669                &mut state
3670            )
3671            .await
3672            .unwrap()
3673            .compact(),
3674            &expect
3675        ));
3676        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3677        assert!(!state.found_matched);
3678
3679        let chunk = DataChunk::from_pretty(
3680            "i   f   i   F
3681             4   1.0 4   0.6
3682             4   1.0 4   1.0
3683             5   4.0 5   .
3684             6   7.0 6   .
3685             6   7.0 6   8.0",
3686        );
3687        let expect = DataChunk::from_pretty(
3688            "i   f   i   F
3689             4   1.0 4   0.6
3690             5   4.0 5   .",
3691        );
3692        state.first_output_row_id = vec![2, 3];
3693        state.has_more_output_rows = false;
3694        assert!(compare_data_chunk_with_rowsort(
3695            &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3696                chunk,
3697                cond.as_ref(),
3698                &mut state
3699            )
3700            .await
3701            .unwrap()
3702            .compact(),
3703            &expect
3704        ));
3705        assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3706    }
3707
3708    #[tokio::test]
3709    async fn test_process_right_outer_join_non_equi_condition() {
3710        let chunk = DataChunk::from_pretty(
3711            "i   f   i   F
3712             1   3.5 1   5.5
3713             1   3.5 1   2.5
3714             3   5.0 3   4.0
3715             3   5.0 3   3.0
3716             3   5.0 3   4.0
3717             3   5.0 3   3.0
3718             4   1.0 4   0
3719             4   1.0 4   0.5",
3720        );
3721        let expect = DataChunk::from_pretty(
3722            "i   f   i   F
3723             1   3.5 1   5.5",
3724        );
3725        let cond = TestFixture::create_cond();
3726        // For simplicity, all rows are in one chunk.
3727        // Build side table
3728        // 0  - (1, 5.5)
3729        // 1  - (1, 2.5)
3730        // 2  - ?
3731        // 3  - (3, 4.0)
3732        // 4  - (3, 3.0)
3733        // 5  - (4, 0)
3734        // 6  - ?
3735        // 7  - (4, 0.5)
3736        // 8  - (4, 0.6)
3737        // 9  - (4, 2.0)
3738        // 10 - (5, .)
3739        // 11 - ?
3740        // 12 - (6, .)
3741        // 13 - (6, 5.0)
3742        // Rows with '?' are never matched here.
3743        let build_row_matched = ChunkedData::with_chunk_sizes([14].into_iter()).unwrap();
3744        let mut state = RightNonEquiJoinState {
3745            build_row_ids: vec![
3746                RowId::new(0, 0),
3747                RowId::new(0, 1),
3748                RowId::new(0, 3),
3749                RowId::new(0, 4),
3750                RowId::new(0, 3),
3751                RowId::new(0, 4),
3752                RowId::new(0, 5),
3753                RowId::new(0, 7),
3754            ],
3755            build_row_matched,
3756        };
3757        assert!(compare_data_chunk_with_rowsort(
3758            &HashJoinExecutor::<Key32>::process_right_outer_join_non_equi_condition(
3759                chunk,
3760                cond.as_ref(),
3761                &mut state
3762            )
3763            .await
3764            .unwrap()
3765            .compact(),
3766            &expect
3767        ));
3768        assert_eq!(state.build_row_ids, Vec::new());
3769        assert_eq!(
3770            state.build_row_matched,
3771            ChunkedData::try_from(vec![{
3772                let mut v = vec![false; 14];
3773                v[0] = true;
3774                v
3775            }])
3776            .unwrap()
3777        );
3778
3779        let chunk = DataChunk::from_pretty(
3780            "i   f   i   F
3781             4   1.0 4   0.6
3782             4   1.0 4   2.0
3783             5   4.0 5   .
3784             6   7.0 6   .
3785             6   7.0 6   5.0",
3786        );
3787        let expect = DataChunk::from_pretty(
3788            "i   f   i   F
3789             4   1.0 4   2.0",
3790        );
3791        state.build_row_ids = vec![
3792            RowId::new(0, 8),
3793            RowId::new(0, 9),
3794            RowId::new(0, 10),
3795            RowId::new(0, 12),
3796            RowId::new(0, 13),
3797        ];
3798        assert!(compare_data_chunk_with_rowsort(
3799            &HashJoinExecutor::<Key32>::process_right_outer_join_non_equi_condition(
3800                chunk,
3801                cond.as_ref(),
3802                &mut state
3803            )
3804            .await
3805            .unwrap()
3806            .compact(),
3807            &expect
3808        ));
3809        assert_eq!(state.build_row_ids, Vec::new());
3810        assert_eq!(
3811            state.build_row_matched,
3812            ChunkedData::try_from(vec![{
3813                let mut v = vec![false; 14];
3814                v[0] = true;
3815                v[9] = true;
3816                v
3817            }])
3818            .unwrap()
3819        );
3820    }
3821
3822    #[tokio::test]
3823    async fn test_process_right_semi_anti_join_non_equi_condition() {
3824        let chunk = DataChunk::from_pretty(
3825            "i   f   i   F
3826             1   3.5 1   5.5
3827             1   3.5 1   2.5
3828             3   5.0 3   4.0
3829             3   5.0 3   3.0
3830             3   5.0 3   4.0
3831             3   5.0 3   3.0
3832             4   1.0 4   0
3833             4   1.0 4   0.5",
3834        );
3835        let cond = TestFixture::create_cond();
3836        let build_row_matched = ChunkedData::with_chunk_sizes([14].into_iter()).unwrap();
3837        let mut state = RightNonEquiJoinState {
3838            build_row_ids: vec![
3839                RowId::new(0, 0),
3840                RowId::new(0, 1),
3841                RowId::new(0, 3),
3842                RowId::new(0, 4),
3843                RowId::new(0, 3),
3844                RowId::new(0, 4),
3845                RowId::new(0, 5),
3846                RowId::new(0, 7),
3847            ],
3848            build_row_matched,
3849        };
3850
3851        assert!(
3852            HashJoinExecutor::<Key32>::process_right_semi_anti_join_non_equi_condition(
3853                chunk,
3854                cond.as_ref(),
3855                &mut state
3856            )
3857            .await
3858            .is_ok()
3859        );
3860        assert_eq!(state.build_row_ids, Vec::new());
3861        assert_eq!(
3862            state.build_row_matched,
3863            ChunkedData::try_from(vec![{
3864                let mut v = vec![false; 14];
3865                v[0] = true;
3866                v
3867            }])
3868            .unwrap()
3869        );
3870
3871        let chunk = DataChunk::from_pretty(
3872            "i   f   i   F
3873             4   1.0 4   0.6
3874             4   1.0 4   2.0
3875             5   4.0 5   .
3876             6   7.0 6   .
3877             6   7.0 6   5.0",
3878        );
3879        state.build_row_ids = vec![
3880            RowId::new(0, 8),
3881            RowId::new(0, 9),
3882            RowId::new(0, 10),
3883            RowId::new(0, 12),
3884            RowId::new(0, 13),
3885        ];
3886        assert!(
3887            HashJoinExecutor::<Key32>::process_right_semi_anti_join_non_equi_condition(
3888                chunk,
3889                cond.as_ref(),
3890                &mut state
3891            )
3892            .await
3893            .is_ok()
3894        );
3895        assert_eq!(state.build_row_ids, Vec::new());
3896        assert_eq!(
3897            state.build_row_matched,
3898            ChunkedData::try_from(vec![{
3899                let mut v = vec![false; 14];
3900                v[0] = true;
3901                v[9] = true;
3902                v
3903            }])
3904            .unwrap()
3905        );
3906    }
3907
3908    #[tokio::test]
3909    async fn test_process_full_outer_join_non_equi_condition() {
3910        let chunk = DataChunk::from_pretty(
3911            "i   f   i   F
3912             1   3.5 1   5.5
3913             1   3.5 1   2.5
3914             3   5.0 3   4.0
3915             3   5.0 3   3.0
3916             3   5.0 3   4.0
3917             3   5.0 3   3.0
3918             4   1.0 4   0
3919             4   1.0 4   0.5",
3920        );
3921        let expect = DataChunk::from_pretty(
3922            "i   f   i   F
3923             1   3.5 1   5.5
3924             3   5.0 .   .
3925             3   5.0 .   .",
3926        );
3927        let cond = TestFixture::create_cond();
3928        let mut left_state = LeftNonEquiJoinState {
3929            probe_column_count: 2,
3930            first_output_row_id: vec![0, 2, 4, 6],
3931            has_more_output_rows: true,
3932            found_matched: false,
3933        };
3934        let mut right_state = RightNonEquiJoinState {
3935            build_row_ids: vec![
3936                RowId::new(0, 0),
3937                RowId::new(0, 1),
3938                RowId::new(0, 3),
3939                RowId::new(0, 4),
3940                RowId::new(0, 3),
3941                RowId::new(0, 4),
3942                RowId::new(0, 5),
3943                RowId::new(0, 7),
3944            ],
3945            build_row_matched: ChunkedData::with_chunk_sizes([14].into_iter()).unwrap(),
3946        };
3947        assert!(compare_data_chunk_with_rowsort(
3948            &HashJoinExecutor::<Key32>::process_full_outer_join_non_equi_condition(
3949                chunk,
3950                cond.as_ref(),
3951                &mut left_state,
3952                &mut right_state,
3953            )
3954            .await
3955            .unwrap()
3956            .compact(),
3957            &expect
3958        ));
3959        assert_eq!(left_state.first_output_row_id, Vec::<usize>::new());
3960        assert!(!left_state.found_matched);
3961        assert_eq!(right_state.build_row_ids, Vec::new());
3962        assert_eq!(
3963            right_state.build_row_matched,
3964            ChunkedData::try_from(vec![{
3965                let mut v = vec![false; 14];
3966                v[0] = true;
3967                v
3968            }])
3969            .unwrap()
3970        );
3971
3972        let chunk = DataChunk::from_pretty(
3973            "i   f   i   F
3974             4   1.0 4   0.6
3975             4   1.0 4   2.0
3976             5   4.0 5   .
3977             6   7.0 6   .
3978             6   7.0 6   8.0",
3979        );
3980        let expect = DataChunk::from_pretty(
3981            "i   f   i   F
3982             4   1.0 4   2.0
3983             5   4.0 .   .
3984             6   7.0 6   8.0",
3985        );
3986        left_state.first_output_row_id = vec![2, 3];
3987        left_state.has_more_output_rows = false;
3988        right_state.build_row_ids = vec![
3989            RowId::new(0, 8),
3990            RowId::new(0, 9),
3991            RowId::new(0, 10),
3992            RowId::new(0, 12),
3993            RowId::new(0, 13),
3994        ];
3995        assert!(compare_data_chunk_with_rowsort(
3996            &HashJoinExecutor::<Key32>::process_full_outer_join_non_equi_condition(
3997                chunk,
3998                cond.as_ref(),
3999                &mut left_state,
4000                &mut right_state,
4001            )
4002            .await
4003            .unwrap()
4004            .compact(),
4005            &expect
4006        ));
4007        assert_eq!(left_state.first_output_row_id, Vec::<usize>::new());
4008        assert!(left_state.found_matched);
4009        assert_eq!(right_state.build_row_ids, Vec::new());
4010        assert_eq!(
4011            right_state.build_row_matched,
4012            ChunkedData::try_from(vec![{
4013                let mut v = vec![false; 14];
4014                v[0] = true;
4015                v[9] = true;
4016                v[13] = true;
4017                v
4018            }])
4019            .unwrap()
4020        );
4021    }
4022
4023    #[tokio::test]
4024    async fn test_shutdown() {
4025        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
4026        test_fixture.do_test_shutdown(false).await;
4027        test_fixture.do_test_shutdown(true).await;
4028
4029        let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
4030        test_fixture.do_test_shutdown(false).await;
4031        test_fixture.do_test_shutdown(true).await;
4032
4033        let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
4034        test_fixture.do_test_shutdown(false).await;
4035        test_fixture.do_test_shutdown(true).await;
4036
4037        let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
4038        test_fixture.do_test_shutdown(false).await;
4039        test_fixture.do_test_shutdown(true).await;
4040
4041        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
4042        test_fixture.do_test_shutdown(false).await;
4043        test_fixture.do_test_shutdown(true).await;
4044
4045        let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
4046        test_fixture.do_test_shutdown(false).await;
4047        test_fixture.do_test_shutdown(true).await;
4048
4049        let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
4050        test_fixture.do_test_shutdown(false).await;
4051        test_fixture.do_test_shutdown(true).await;
4052
4053        let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
4054        test_fixture.do_test_shutdown(false).await;
4055        test_fixture.do_test_shutdown(true).await;
4056    }
4057}