risingwave_batch_executors/executor/join/
local_lookup_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::marker::PhantomData;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
22use risingwave_common::catalog::{ColumnDesc, Field, Schema};
23use risingwave_common::hash::table_distribution::TableDistribution;
24use risingwave_common::hash::{
25    ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, VnodeCountCompat,
26    WorkerSlotId,
27};
28use risingwave_common::memory::MemoryContext;
29use risingwave_common::types::{DataType, Datum};
30use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
31use risingwave_common::util::iter_util::ZipEqFast;
32use risingwave_common::util::scan_range::ScanRange;
33use risingwave_common::util::tracing::TracingContext;
34use risingwave_expr::expr::{BoxedExpression, build_from_prost};
35use risingwave_pb::batch_plan::exchange_info::DistributionMode;
36use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
37use risingwave_pb::batch_plan::plan_node::NodeBody;
38use risingwave_pb::batch_plan::{
39    ExchangeInfo, ExchangeNode, LocalExecutePlan, PbExchangeSource, PbTaskId, PlanFragment,
40    PlanNode, RowSeqScanNode, TaskOutputId,
41};
42use risingwave_pb::common::{BatchQueryEpoch, WorkerNode};
43use risingwave_pb::plan_common::StorageTableDesc;
44
45use super::AsOfDesc;
46use crate::error::Result;
47use crate::executor::{
48    AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, DummyExecutor, Executor,
49    ExecutorBuilder, JoinType, LookupJoinBase, unix_timestamp_sec_to_epoch,
50};
51use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
52
53/// Inner side executor builder for the `LocalLookupJoinExecutor`
54struct InnerSideExecutorBuilder {
55    table_desc: StorageTableDesc,
56    table_distribution: TableDistribution,
57    vnode_mapping: ExpandedWorkerSlotMapping,
58    outer_side_key_types: Vec<DataType>,
59    inner_side_schema: Schema,
60    inner_side_column_ids: Vec<i32>,
61    inner_side_key_types: Vec<DataType>,
62    lookup_prefix_len: usize,
63    context: Arc<dyn BatchTaskContext>,
64    task_id: TaskId,
65    epoch: BatchQueryEpoch,
66    worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode>,
67    worker_slot_to_scan_range_mapping: HashMap<WorkerSlotId, Vec<(ScanRange, VirtualNode)>>,
68    #[expect(dead_code)]
69    chunk_size: usize,
70    shutdown_rx: ShutdownToken,
71    next_stage_id: usize,
72    as_of: Option<AsOf>,
73}
74
75/// Used to build the executor for the inner side
76#[async_trait::async_trait]
77pub trait LookupExecutorBuilder: Send {
78    fn reset(&mut self);
79
80    async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()>;
81
82    async fn build_executor(&mut self) -> Result<BoxedExecutor>;
83}
84
85pub type BoxedLookupExecutorBuilder = Box<dyn LookupExecutorBuilder>;
86
87impl InnerSideExecutorBuilder {
88    /// Gets the virtual node based on the given `scan_range`
89    fn get_virtual_node(&self, scan_range: &ScanRange) -> Result<VirtualNode> {
90        let virtual_node = scan_range
91            .try_compute_vnode(&self.table_distribution)
92            .context("Could not compute vnode for lookup join")?;
93        Ok(virtual_node)
94    }
95
96    /// Creates the `RowSeqScanNode` that will be used for scanning the inner side table
97    /// based on the passed `scan_range` and virtual node.
98    fn create_row_seq_scan_node(&self, id: &WorkerSlotId) -> Result<NodeBody> {
99        let list = self.worker_slot_to_scan_range_mapping.get(id).unwrap();
100        let mut scan_ranges = vec![];
101        let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len());
102
103        list.iter().for_each(|(scan_range, vnode)| {
104            scan_ranges.push(scan_range.to_protobuf());
105            vnode_bitmap.set(vnode.to_index(), true);
106        });
107
108        let row_seq_scan_node = NodeBody::RowSeqScan(RowSeqScanNode {
109            table_desc: Some(self.table_desc.clone()),
110            column_ids: self.inner_side_column_ids.clone(),
111            scan_ranges,
112            ordered: false,
113            vnode_bitmap: Some(vnode_bitmap.finish().to_protobuf()),
114            limit: None,
115            as_of: self.as_of.as_ref().map(Into::into),
116        });
117
118        Ok(row_seq_scan_node)
119    }
120
121    /// Creates the `PbExchangeSource` using the given `id`.
122    fn build_prost_exchange_source(&self, id: &WorkerSlotId) -> Result<PbExchangeSource> {
123        let worker = self
124            .worker_slot_mapping
125            .get(id)
126            .context("No worker node found for the given worker slot id.")?;
127
128        let local_execute_plan = LocalExecutePlan {
129            plan: Some(PlanFragment {
130                root: Some(PlanNode {
131                    children: vec![],
132                    identity: "SeqScan".to_owned(),
133                    node_body: Some(self.create_row_seq_scan_node(id)?),
134                }),
135                exchange_info: Some(ExchangeInfo {
136                    mode: DistributionMode::Single as i32,
137                    ..Default::default()
138                }),
139            }),
140            epoch: Some(self.epoch),
141            tracing_context: TracingContext::from_current_span().to_protobuf(),
142        };
143
144        let prost_exchange_source = PbExchangeSource {
145            task_output_id: Some(TaskOutputId {
146                task_id: Some(PbTaskId {
147                    // FIXME: We should replace this random generated uuid to current query_id for
148                    // better dashboard. However, due to the lack of info of
149                    // stage_id and task_id, we can not do it now. Now just make sure it will not
150                    // conflict.
151                    query_id: self.task_id.query_id.clone(),
152                    stage_id: self.task_id.stage_id + 10000 + self.next_stage_id as u32,
153                    task_id: (*id).into(),
154                }),
155                output_id: 0,
156            }),
157            host: Some(worker.host.as_ref().unwrap().clone()),
158            local_execute_plan: Some(Plan(local_execute_plan)),
159        };
160
161        Ok(prost_exchange_source)
162    }
163}
164
165#[async_trait::async_trait]
166impl LookupExecutorBuilder for InnerSideExecutorBuilder {
167    fn reset(&mut self) {
168        self.worker_slot_to_scan_range_mapping = HashMap::new();
169    }
170
171    /// Adds the scan range made from the given `kwy_scalar_impls` into the worker slot id
172    /// hash map, along with the scan range's virtual node.
173    async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()> {
174        let mut scan_range = ScanRange::full_table_scan();
175
176        for ((datum, outer_type), inner_type) in key_datums
177            .into_iter()
178            .zip_eq_fast(
179                self.outer_side_key_types
180                    .iter()
181                    .take(self.lookup_prefix_len),
182            )
183            .zip_eq_fast(
184                self.inner_side_key_types
185                    .iter()
186                    .take(self.lookup_prefix_len),
187            )
188        {
189            let datum = if inner_type == outer_type {
190                datum
191            } else {
192                bail!("Join key types are not aligned: LHS: {outer_type:?}, RHS: {inner_type:?}");
193            };
194
195            scan_range.eq_conds.push(datum);
196        }
197
198        let vnode = self.get_virtual_node(&scan_range)?;
199        let worker_slot_id = self.vnode_mapping[vnode.to_index()];
200
201        let list = self
202            .worker_slot_to_scan_range_mapping
203            .entry(worker_slot_id)
204            .or_default();
205        list.push((scan_range, vnode));
206
207        Ok(())
208    }
209
210    /// Builds and returns the `ExchangeExecutor` used for the inner side of the
211    /// `LocalLookupJoinExecutor`.
212    async fn build_executor(&mut self) -> Result<BoxedExecutor> {
213        self.next_stage_id += 1;
214        let mut sources = vec![];
215        for id in self.worker_slot_to_scan_range_mapping.keys() {
216            sources.push(self.build_prost_exchange_source(id)?);
217        }
218
219        if sources.is_empty() {
220            return Ok(Box::new(DummyExecutor {
221                schema: Schema::default(),
222            }));
223        }
224
225        let exchange_node = NodeBody::Exchange(ExchangeNode {
226            sources,
227            sequential: true,
228            input_schema: self.inner_side_schema.to_prost(),
229        });
230
231        let plan_node = PlanNode {
232            children: vec![],
233            identity: "LocalLookupJoinExchangeExecutor".to_owned(),
234            node_body: Some(exchange_node),
235        };
236
237        let task_id = self.task_id.clone();
238
239        let executor_builder = ExecutorBuilder::new(
240            &plan_node,
241            &task_id,
242            self.context.clone(),
243            self.epoch,
244            self.shutdown_rx.clone(),
245        );
246
247        executor_builder.build().await
248    }
249}
250
251/// Local Lookup Join Executor.
252/// High level Execution flow:
253/// Repeat 1-3:
254///   1. Read N rows from outer side input and send keys to inner side builder after deduplication.
255///   2. Inner side input lookups inner side table with keys and builds hash map.
256///   3. Outer side rows join each inner side rows by probing the hash map.
257///
258/// Furthermore, we also want to minimize the number of RPC requests we send through the
259/// `ExchangeExecutors`. This is done by grouping rows with the same key datums together, and also
260/// by grouping together scan ranges that point to the same partition (and can thus be easily
261/// scanned by the same worker node).
262pub struct LocalLookupJoinExecutor<K> {
263    base: LookupJoinBase<K>,
264    _phantom: PhantomData<K>,
265}
266
267impl<K: HashKey> Executor for LocalLookupJoinExecutor<K> {
268    fn schema(&self) -> &Schema {
269        &self.base.schema
270    }
271
272    fn identity(&self) -> &str {
273        &self.base.identity
274    }
275
276    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
277        Box::new(self.base).do_execute()
278    }
279}
280
281impl<K> LocalLookupJoinExecutor<K> {
282    pub fn new(base: LookupJoinBase<K>) -> Self {
283        Self {
284            base,
285            _phantom: PhantomData,
286        }
287    }
288}
289
290pub struct LocalLookupJoinExecutorBuilder {}
291
292impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
293    async fn new_boxed_executor(
294        source: &ExecutorBuilder<'_>,
295        inputs: Vec<BoxedExecutor>,
296    ) -> Result<BoxedExecutor> {
297        let [outer_side_input]: [_; 1] = inputs.try_into().unwrap();
298
299        let lookup_join_node = try_match_expand!(
300            source.plan_node().get_node_body().unwrap(),
301            NodeBody::LocalLookupJoin
302        )?;
303        // as_of takes precedence
304        let as_of = lookup_join_node
305            .as_of
306            .as_ref()
307            .map(AsOf::try_from)
308            .transpose()?;
309        let query_epoch = as_of
310            .as_ref()
311            .map(|a| {
312                let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
313                tracing::debug!(epoch, "time travel");
314                risingwave_pb::common::BatchQueryEpoch {
315                    epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
316                        epoch,
317                    )),
318                }
319            })
320            .unwrap_or_else(|| source.epoch());
321
322        let join_type = JoinType::from_prost(lookup_join_node.get_join_type()?);
323        let condition = match lookup_join_node.get_condition() {
324            Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
325            Err(_) => None,
326        };
327
328        let output_indices: Vec<usize> = lookup_join_node
329            .get_output_indices()
330            .iter()
331            .map(|&x| x as usize)
332            .collect();
333
334        let outer_side_data_types = outer_side_input.schema().data_types();
335
336        let table_desc = lookup_join_node.get_inner_side_table_desc()?;
337        let inner_side_column_ids = lookup_join_node.get_inner_side_column_ids().to_vec();
338
339        let inner_side_schema = Schema {
340            fields: inner_side_column_ids
341                .iter()
342                .map(|&id| {
343                    let column = table_desc
344                        .columns
345                        .iter()
346                        .find(|c| c.column_id == id)
347                        .unwrap();
348                    Field::from(&ColumnDesc::from(column))
349                })
350                .collect_vec(),
351        };
352
353        let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
354            outer_side_input.schema().fields.clone()
355        } else {
356            [
357                outer_side_input.schema().fields.clone(),
358                inner_side_schema.fields.clone(),
359            ]
360            .concat()
361        };
362
363        let original_schema = Schema { fields };
364        let actual_schema = output_indices
365            .iter()
366            .map(|&idx| original_schema[idx].clone())
367            .collect();
368
369        let mut outer_side_key_idxs = vec![];
370        for outer_side_key in lookup_join_node.get_outer_side_key() {
371            outer_side_key_idxs.push(*outer_side_key as usize)
372        }
373
374        let outer_side_key_types: Vec<DataType> = outer_side_key_idxs
375            .iter()
376            .map(|&i| outer_side_data_types[i].clone())
377            .collect_vec();
378
379        let lookup_prefix_len: usize = lookup_join_node.get_lookup_prefix_len() as usize;
380
381        let mut inner_side_key_idxs = vec![];
382        for inner_side_key in lookup_join_node.get_inner_side_key() {
383            inner_side_key_idxs.push(*inner_side_key as usize)
384        }
385
386        let inner_side_key_types = inner_side_key_idxs
387            .iter()
388            .map(|&i| inner_side_schema.fields[i].data_type.clone())
389            .collect_vec();
390
391        let null_safe = lookup_join_node.get_null_safe().to_vec();
392
393        let vnode_mapping = lookup_join_node
394            .get_inner_side_vnode_mapping()
395            .iter()
396            .copied()
397            .map(WorkerSlotId::from)
398            .collect_vec();
399
400        assert!(!vnode_mapping.is_empty());
401
402        let chunk_size = source.context().get_config().developer.chunk_size;
403
404        let asof_desc = lookup_join_node
405            .asof_desc
406            .map(|desc| AsOfDesc::from_protobuf(&desc))
407            .transpose()?;
408
409        let worker_nodes = lookup_join_node.get_worker_nodes();
410        let worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode> = worker_nodes
411            .iter()
412            .flat_map(|worker| {
413                (0..(worker.compute_node_parallelism()))
414                    .map(|i| (WorkerSlotId::new(worker.id, i), worker.clone()))
415            })
416            .collect();
417
418        let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into());
419
420        let inner_side_builder = InnerSideExecutorBuilder {
421            table_desc: table_desc.clone(),
422            table_distribution: TableDistribution::new_from_storage_table_desc(vnodes, table_desc),
423            vnode_mapping,
424            outer_side_key_types,
425            inner_side_schema,
426            inner_side_column_ids,
427            inner_side_key_types: inner_side_key_types.clone(),
428            lookup_prefix_len,
429            context: source.context().clone(),
430            task_id: source.task_id.clone(),
431            epoch: query_epoch,
432            worker_slot_to_scan_range_mapping: HashMap::new(),
433            chunk_size,
434            shutdown_rx: source.shutdown_rx().clone(),
435            next_stage_id: 0,
436            worker_slot_mapping,
437            as_of,
438        };
439
440        let identity = source.plan_node().get_identity().clone();
441        Ok(LocalLookupJoinExecutorArgs {
442            join_type,
443            condition,
444            outer_side_input,
445            outer_side_data_types,
446            outer_side_key_idxs,
447            inner_side_builder: Box::new(inner_side_builder),
448            inner_side_key_types,
449            inner_side_key_idxs,
450            null_safe,
451            lookup_prefix_len,
452            chunk_builder: DataChunkBuilder::new(original_schema.data_types(), chunk_size),
453            schema: actual_schema,
454            output_indices,
455            chunk_size,
456            asof_desc,
457            identity: identity.clone(),
458            shutdown_rx: source.shutdown_rx().clone(),
459            mem_ctx: source.context().create_executor_mem_context(&identity),
460        }
461        .dispatch())
462    }
463}
464
465struct LocalLookupJoinExecutorArgs {
466    join_type: JoinType,
467    condition: Option<BoxedExpression>,
468    outer_side_input: BoxedExecutor,
469    outer_side_data_types: Vec<DataType>,
470    outer_side_key_idxs: Vec<usize>,
471    inner_side_builder: Box<dyn LookupExecutorBuilder>,
472    inner_side_key_types: Vec<DataType>,
473    inner_side_key_idxs: Vec<usize>,
474    null_safe: Vec<bool>,
475    lookup_prefix_len: usize,
476    chunk_builder: DataChunkBuilder,
477    schema: Schema,
478    output_indices: Vec<usize>,
479    chunk_size: usize,
480    asof_desc: Option<AsOfDesc>,
481    identity: String,
482    shutdown_rx: ShutdownToken,
483    mem_ctx: MemoryContext,
484}
485
486impl HashKeyDispatcher for LocalLookupJoinExecutorArgs {
487    type Output = BoxedExecutor;
488
489    fn dispatch_impl<K: HashKey>(self) -> Self::Output {
490        Box::new(LocalLookupJoinExecutor::<K>::new(LookupJoinBase::<K> {
491            join_type: self.join_type,
492            condition: self.condition,
493            outer_side_input: self.outer_side_input,
494            outer_side_data_types: self.outer_side_data_types,
495            outer_side_key_idxs: self.outer_side_key_idxs,
496            inner_side_builder: self.inner_side_builder,
497            inner_side_key_types: self.inner_side_key_types,
498            inner_side_key_idxs: self.inner_side_key_idxs,
499            null_safe: self.null_safe,
500            lookup_prefix_len: self.lookup_prefix_len,
501            chunk_builder: self.chunk_builder,
502            schema: self.schema,
503            output_indices: self.output_indices,
504            chunk_size: self.chunk_size,
505            asof_desc: self.asof_desc,
506            identity: self.identity,
507            shutdown_rx: self.shutdown_rx,
508            mem_ctx: self.mem_ctx,
509            _phantom: PhantomData,
510        }))
511    }
512
513    fn data_types(&self) -> &[DataType] {
514        &self.inner_side_key_types
515    }
516}
517
518#[cfg(test)]
519mod tests {
520    use std::sync::Arc;
521
522    use risingwave_common::array::{DataChunk, DataChunkTestExt};
523    use risingwave_common::catalog::{Field, Schema};
524    use risingwave_common::hash::HashKeyDispatcher;
525    use risingwave_common::memory::MemoryContext;
526    use risingwave_common::types::DataType;
527    use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
528    use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
529    use risingwave_expr::expr::{BoxedExpression, build_from_pretty};
530
531    use super::LocalLookupJoinExecutorArgs;
532    use crate::executor::join::JoinType;
533    use crate::executor::test_utils::{
534        FakeInnerSideExecutorBuilder, MockExecutor, diff_executor_output,
535    };
536    use crate::executor::{BoxedExecutor, SortExecutor};
537    use crate::monitor::BatchSpillMetrics;
538    use crate::task::ShutdownToken;
539
540    const CHUNK_SIZE: usize = 1024;
541
542    fn create_outer_side_input() -> BoxedExecutor {
543        let schema = Schema {
544            fields: vec![
545                Field::unnamed(DataType::Int32),
546                Field::unnamed(DataType::Float32),
547            ],
548        };
549        let mut executor = MockExecutor::new(schema);
550
551        executor.add(DataChunk::from_pretty(
552            "i f
553             1 6.1
554             2 8.4
555             3 3.9",
556        ));
557
558        executor.add(DataChunk::from_pretty(
559            "i f
560             2 5.5
561             5 4.1
562             5 9.1
563             . .",
564        ));
565
566        Box::new(executor)
567    }
568
569    fn create_lookup_join_executor(
570        join_type: JoinType,
571        condition: Option<BoxedExpression>,
572        null_safe: bool,
573    ) -> BoxedExecutor {
574        let outer_side_input = create_outer_side_input();
575
576        let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
577            outer_side_input.schema().fields.clone()
578        } else {
579            [
580                outer_side_input.schema().fields.clone(),
581                outer_side_input.schema().fields.clone(),
582            ]
583            .concat()
584        };
585        let original_schema = Schema { fields };
586
587        let inner_side_schema = Schema {
588            fields: outer_side_input.schema().fields.clone(),
589        };
590
591        let inner_side_data_types = inner_side_schema.data_types();
592        let outer_side_data_types = outer_side_input.schema().data_types();
593
594        LocalLookupJoinExecutorArgs {
595            join_type,
596            condition,
597            outer_side_input,
598            outer_side_data_types,
599            outer_side_key_idxs: vec![0],
600            inner_side_builder: Box::new(FakeInnerSideExecutorBuilder::new(inner_side_schema)),
601            inner_side_key_types: vec![inner_side_data_types[0].clone()],
602            inner_side_key_idxs: vec![0],
603            null_safe: vec![null_safe],
604            lookup_prefix_len: 1,
605            chunk_builder: DataChunkBuilder::new(original_schema.data_types(), CHUNK_SIZE),
606            schema: original_schema.clone(),
607            output_indices: (0..original_schema.len()).collect(),
608            chunk_size: CHUNK_SIZE,
609            asof_desc: None,
610            identity: "TestLookupJoinExecutor".to_owned(),
611            shutdown_rx: ShutdownToken::empty(),
612            mem_ctx: MemoryContext::none(),
613        }
614        .dispatch()
615    }
616
617    fn create_order_by_executor(child: BoxedExecutor) -> BoxedExecutor {
618        let column_orders = vec![
619            ColumnOrder {
620                column_index: 0,
621                order_type: OrderType::ascending(),
622            },
623            ColumnOrder {
624                column_index: 1,
625                order_type: OrderType::ascending(),
626            },
627        ];
628
629        Box::new(SortExecutor::new(
630            child,
631            Arc::new(column_orders),
632            "SortExecutor".into(),
633            CHUNK_SIZE,
634            MemoryContext::none(),
635            None,
636            BatchSpillMetrics::for_test(),
637        ))
638    }
639
640    async fn do_test(
641        join_type: JoinType,
642        condition: Option<BoxedExpression>,
643        null_safe: bool,
644        expected: DataChunk,
645    ) {
646        let lookup_join_executor = create_lookup_join_executor(join_type, condition, null_safe);
647        let order_by_executor = create_order_by_executor(lookup_join_executor);
648        let mut expected_mock_exec = MockExecutor::new(order_by_executor.schema().clone());
649        expected_mock_exec.add(expected);
650        diff_executor_output(order_by_executor, Box::new(expected_mock_exec)).await;
651    }
652
653    #[tokio::test]
654    async fn test_inner_join() {
655        let expected = DataChunk::from_pretty(
656            "i f   i f
657             1 6.1 1 9.2
658             2 5.5 2 5.5
659             2 5.5 2 4.4
660             2 8.4 2 5.5
661             2 8.4 2 4.4
662             5 4.1 5 2.3
663             5 4.1 5 3.7
664             5 9.1 5 2.3
665             5 9.1 5 3.7",
666        );
667
668        do_test(JoinType::Inner, None, false, expected).await;
669    }
670
671    #[tokio::test]
672    async fn test_null_safe_inner_join() {
673        let expected = DataChunk::from_pretty(
674            "i f   i f
675             1 6.1 1 9.2
676             2 5.5 2 5.5
677             2 5.5 2 4.4
678             2 8.4 2 5.5
679             2 8.4 2 4.4
680             5 4.1 5 2.3
681             5 4.1 5 3.7
682             5 9.1 5 2.3
683             5 9.1 5 3.7
684             .  .  .  .",
685        );
686
687        do_test(JoinType::Inner, None, true, expected).await;
688    }
689
690    #[tokio::test]
691    async fn test_left_outer_join() {
692        let expected = DataChunk::from_pretty(
693            "i f   i f
694             1 6.1 1 9.2
695             2 5.5 2 5.5
696             2 5.5 2 4.4
697             2 8.4 2 5.5
698             2 8.4 2 4.4
699             3 3.9 . .
700             5 4.1 5 2.3
701             5 4.1 5 3.7
702             5 9.1 5 2.3
703             5 9.1 5 3.7
704             . .   . .",
705        );
706
707        do_test(JoinType::LeftOuter, None, false, expected).await;
708    }
709
710    #[tokio::test]
711    async fn test_left_semi_join() {
712        let expected = DataChunk::from_pretty(
713            "i f
714             1 6.1
715             2 5.5
716             2 8.4
717             5 4.1
718             5 9.1",
719        );
720
721        do_test(JoinType::LeftSemi, None, false, expected).await;
722    }
723
724    #[tokio::test]
725    async fn test_left_anti_join() {
726        let expected = DataChunk::from_pretty(
727            "i f
728             3 3.9
729             . .",
730        );
731
732        do_test(JoinType::LeftAnti, None, false, expected).await;
733    }
734
735    #[tokio::test]
736    async fn test_inner_join_with_condition() {
737        let expected = DataChunk::from_pretty(
738            "i f   i f
739             1 6.1 1 9.2
740             2 5.5 2 5.5
741             2 8.4 2 5.5",
742        );
743        let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
744
745        do_test(JoinType::Inner, Some(condition), false, expected).await;
746    }
747
748    #[tokio::test]
749    async fn test_left_outer_join_with_condition() {
750        let expected = DataChunk::from_pretty(
751            "i f   i f
752             1 6.1 1 9.2
753             2 5.5 2 5.5
754             2 8.4 2 5.5
755             3 3.9 . .
756             5 4.1 . .
757             5 9.1 . .
758             . .   . .",
759        );
760        let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
761
762        do_test(JoinType::LeftOuter, Some(condition), false, expected).await;
763    }
764
765    #[tokio::test]
766    async fn test_left_semi_join_with_condition() {
767        let expected = DataChunk::from_pretty(
768            "i f
769             1 6.1
770             2 5.5
771             2 8.4",
772        );
773        let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
774
775        do_test(JoinType::LeftSemi, Some(condition), false, expected).await;
776    }
777
778    #[tokio::test]
779    async fn test_left_anti_join_with_condition() {
780        let expected = DataChunk::from_pretty(
781            "i f
782            3 3.9
783            5 4.1
784            5 9.1
785            . .",
786        );
787        let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
788
789        do_test(JoinType::LeftAnti, Some(condition), false, expected).await;
790    }
791}