risingwave_batch_executors/executor/join/
local_lookup_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::marker::PhantomData;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
22use risingwave_common::catalog::{ColumnDesc, Field, Schema};
23use risingwave_common::hash::table_distribution::TableDistribution;
24use risingwave_common::hash::{
25    ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, VnodeCountCompat,
26    WorkerSlotId,
27};
28use risingwave_common::memory::MemoryContext;
29use risingwave_common::types::{DataType, Datum};
30use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
31use risingwave_common::util::iter_util::ZipEqFast;
32use risingwave_common::util::scan_range::ScanRange;
33use risingwave_common::util::tracing::TracingContext;
34use risingwave_expr::expr::{BoxedExpression, build_from_prost};
35use risingwave_pb::batch_plan::exchange_info::DistributionMode;
36use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
37use risingwave_pb::batch_plan::plan_node::NodeBody;
38use risingwave_pb::batch_plan::{
39    ExchangeInfo, ExchangeNode, LocalExecutePlan, PbExchangeSource, PbTaskId, PlanFragment,
40    PlanNode, RowSeqScanNode, TaskOutputId,
41};
42use risingwave_pb::common::{BatchQueryEpoch, WorkerNode};
43use risingwave_pb::plan_common::StorageTableDesc;
44
45use super::AsOfDesc;
46use crate::error::Result;
47use crate::executor::{
48    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, DummyExecutor, Executor,
49    ExecutorBuilder, JoinType, LookupJoinBase,
50};
51use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
52
53/// Inner side executor builder for the `LocalLookupJoinExecutor`
54struct InnerSideExecutorBuilder {
55    table_desc: StorageTableDesc,
56    table_distribution: TableDistribution,
57    vnode_mapping: ExpandedWorkerSlotMapping,
58    outer_side_key_types: Vec<DataType>,
59    inner_side_schema: Schema,
60    inner_side_column_ids: Vec<i32>,
61    inner_side_key_types: Vec<DataType>,
62    lookup_prefix_len: usize,
63    context: Arc<dyn BatchTaskContext>,
64    task_id: TaskId,
65    query_epoch: BatchQueryEpoch,
66    worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode>,
67    worker_slot_to_scan_range_mapping: HashMap<WorkerSlotId, Vec<(ScanRange, VirtualNode)>>,
68    #[expect(dead_code)]
69    chunk_size: usize,
70    shutdown_rx: ShutdownToken,
71    next_stage_id: usize,
72}
73
74/// Used to build the executor for the inner side
75pub trait LookupExecutorBuilder: Send + 'static {
76    fn reset(&mut self);
77
78    fn add_scan_range(
79        &mut self,
80        key_datums: Vec<Datum>,
81    ) -> impl Future<Output = Result<()>> + Send + '_;
82
83    fn build_executor(&mut self) -> impl Future<Output = Result<BoxedExecutor>> + Send + '_;
84}
85
86impl InnerSideExecutorBuilder {
87    /// Gets the virtual node based on the given `scan_range`
88    fn get_virtual_node(&self, scan_range: &ScanRange) -> Result<VirtualNode> {
89        let virtual_node = scan_range
90            .try_compute_vnode(&self.table_distribution)
91            .context("Could not compute vnode for lookup join")?;
92        Ok(virtual_node)
93    }
94
95    /// Creates the `RowSeqScanNode` that will be used for scanning the inner side table
96    /// based on the passed `scan_range` and virtual node.
97    fn create_row_seq_scan_node(&self, id: &WorkerSlotId) -> Result<NodeBody> {
98        let list = self.worker_slot_to_scan_range_mapping.get(id).unwrap();
99        let mut scan_ranges = vec![];
100        let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len());
101
102        list.iter().for_each(|(scan_range, vnode)| {
103            scan_ranges.push(scan_range.to_protobuf());
104            vnode_bitmap.set(vnode.to_index(), true);
105        });
106
107        let row_seq_scan_node = NodeBody::RowSeqScan(RowSeqScanNode {
108            table_desc: Some(self.table_desc.clone()),
109            column_ids: self.inner_side_column_ids.clone(),
110            scan_ranges,
111            ordered: false,
112            vnode_bitmap: Some(vnode_bitmap.finish().to_protobuf()),
113            limit: None,
114            query_epoch: Some(self.query_epoch),
115        });
116
117        Ok(row_seq_scan_node)
118    }
119
120    /// Creates the `PbExchangeSource` using the given `id`.
121    fn build_prost_exchange_source(&self, id: &WorkerSlotId) -> Result<PbExchangeSource> {
122        let worker = self
123            .worker_slot_mapping
124            .get(id)
125            .context("No worker node found for the given worker slot id.")?;
126
127        let local_execute_plan = LocalExecutePlan {
128            plan: Some(PlanFragment {
129                root: Some(PlanNode {
130                    children: vec![],
131                    identity: "SeqScan".to_owned(),
132                    node_body: Some(self.create_row_seq_scan_node(id)?),
133                }),
134                exchange_info: Some(ExchangeInfo {
135                    mode: DistributionMode::Single as i32,
136                    ..Default::default()
137                }),
138            }),
139            tracing_context: TracingContext::from_current_span().to_protobuf(),
140        };
141
142        let prost_exchange_source = PbExchangeSource {
143            task_output_id: Some(TaskOutputId {
144                task_id: Some(PbTaskId {
145                    // FIXME: We should replace this random generated uuid to current query_id for
146                    // better dashboard. However, due to the lack of info of
147                    // stage_id and task_id, we can not do it now. Now just make sure it will not
148                    // conflict.
149                    query_id: self.task_id.query_id.clone(),
150                    stage_id: self.task_id.stage_id + 10000 + self.next_stage_id as u32,
151                    task_id: (*id).into(),
152                }),
153                output_id: 0,
154            }),
155            host: Some(worker.host.as_ref().unwrap().clone()),
156            local_execute_plan: Some(Plan(local_execute_plan)),
157        };
158
159        Ok(prost_exchange_source)
160    }
161}
162
163impl LookupExecutorBuilder for InnerSideExecutorBuilder {
164    fn reset(&mut self) {
165        self.worker_slot_to_scan_range_mapping = HashMap::new();
166    }
167
168    /// Adds the scan range made from the given `kwy_scalar_impls` into the worker slot id
169    /// hash map, along with the scan range's virtual node.
170    async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()> {
171        let mut scan_range = ScanRange::full_table_scan();
172
173        for ((datum, outer_type), inner_type) in key_datums
174            .into_iter()
175            .zip_eq_fast(
176                self.outer_side_key_types
177                    .iter()
178                    .take(self.lookup_prefix_len),
179            )
180            .zip_eq_fast(
181                self.inner_side_key_types
182                    .iter()
183                    .take(self.lookup_prefix_len),
184            )
185        {
186            let datum = if inner_type == outer_type {
187                datum
188            } else {
189                bail!("Join key types are not aligned: LHS: {outer_type:?}, RHS: {inner_type:?}");
190            };
191
192            scan_range.eq_conds.push(datum);
193        }
194
195        let vnode = self.get_virtual_node(&scan_range)?;
196        let worker_slot_id = self.vnode_mapping[vnode.to_index()];
197
198        let list = self
199            .worker_slot_to_scan_range_mapping
200            .entry(worker_slot_id)
201            .or_default();
202        list.push((scan_range, vnode));
203
204        Ok(())
205    }
206
207    /// Builds and returns the `ExchangeExecutor` used for the inner side of the
208    /// `LocalLookupJoinExecutor`.
209    async fn build_executor(&mut self) -> Result<BoxedExecutor> {
210        self.next_stage_id += 1;
211        let mut sources = vec![];
212        for id in self.worker_slot_to_scan_range_mapping.keys() {
213            sources.push(self.build_prost_exchange_source(id)?);
214        }
215
216        if sources.is_empty() {
217            return Ok(Box::new(DummyExecutor {
218                schema: Schema::default(),
219            }));
220        }
221
222        let exchange_node = NodeBody::Exchange(ExchangeNode {
223            sources,
224            sequential: true,
225            input_schema: self.inner_side_schema.to_prost(),
226        });
227
228        let plan_node = PlanNode {
229            children: vec![],
230            identity: "LocalLookupJoinExchangeExecutor".to_owned(),
231            node_body: Some(exchange_node),
232        };
233
234        let task_id = self.task_id.clone();
235
236        let executor_builder = ExecutorBuilder::new(
237            &plan_node,
238            &task_id,
239            self.context.clone(),
240            self.shutdown_rx.clone(),
241        );
242
243        executor_builder.build().await
244    }
245}
246
247/// Local Lookup Join Executor.
248/// High level Execution flow:
249/// Repeat 1-3:
250///   1. Read N rows from outer side input and send keys to inner side builder after deduplication.
251///   2. Inner side input lookups inner side table with keys and builds hash map.
252///   3. Outer side rows join each inner side rows by probing the hash map.
253///
254/// Furthermore, we also want to minimize the number of RPC requests we send through the
255/// `ExchangeExecutors`. This is done by grouping rows with the same key datums together, and also
256/// by grouping together scan ranges that point to the same partition (and can thus be easily
257/// scanned by the same worker node).
258pub struct LocalLookupJoinExecutor<K, B: LookupExecutorBuilder> {
259    base: LookupJoinBase<K, B>,
260}
261
262impl<K: HashKey, B: LookupExecutorBuilder> Executor for LocalLookupJoinExecutor<K, B> {
263    fn schema(&self) -> &Schema {
264        &self.base.schema
265    }
266
267    fn identity(&self) -> &str {
268        &self.base.identity
269    }
270
271    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
272        Box::new(self.base).do_execute()
273    }
274}
275
276impl<K, B: LookupExecutorBuilder> LocalLookupJoinExecutor<K, B> {
277    pub fn new(base: LookupJoinBase<K, B>) -> Self {
278        Self { base }
279    }
280}
281
282pub struct LocalLookupJoinExecutorBuilder {}
283
284impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
285    async fn new_boxed_executor(
286        source: &ExecutorBuilder<'_>,
287        inputs: Vec<BoxedExecutor>,
288    ) -> Result<BoxedExecutor> {
289        let [outer_side_input]: [_; 1] = inputs.try_into().unwrap();
290
291        let lookup_join_node = try_match_expand!(
292            source.plan_node().get_node_body().unwrap(),
293            NodeBody::LocalLookupJoin
294        )?;
295
296        let join_type = JoinType::from_prost(lookup_join_node.get_join_type()?);
297        let condition = match lookup_join_node.get_condition() {
298            Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
299            Err(_) => None,
300        };
301
302        let output_indices: Vec<usize> = lookup_join_node
303            .get_output_indices()
304            .iter()
305            .map(|&x| x as usize)
306            .collect();
307
308        let outer_side_data_types = outer_side_input.schema().data_types();
309
310        let table_desc = lookup_join_node.get_inner_side_table_desc()?;
311        let inner_side_column_ids = lookup_join_node.get_inner_side_column_ids().clone();
312
313        let inner_side_schema = Schema {
314            fields: inner_side_column_ids
315                .iter()
316                .map(|&id| {
317                    let column = table_desc
318                        .columns
319                        .iter()
320                        .find(|c| c.column_id == id)
321                        .unwrap();
322                    Field::from(&ColumnDesc::from(column))
323                })
324                .collect_vec(),
325        };
326
327        let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
328            outer_side_input.schema().fields.clone()
329        } else {
330            [
331                outer_side_input.schema().fields.clone(),
332                inner_side_schema.fields.clone(),
333            ]
334            .concat()
335        };
336
337        let original_schema = Schema { fields };
338        let actual_schema = output_indices
339            .iter()
340            .map(|&idx| original_schema[idx].clone())
341            .collect();
342
343        let mut outer_side_key_idxs = vec![];
344        for outer_side_key in lookup_join_node.get_outer_side_key() {
345            outer_side_key_idxs.push(*outer_side_key as usize)
346        }
347
348        let outer_side_key_types: Vec<DataType> = outer_side_key_idxs
349            .iter()
350            .map(|&i| outer_side_data_types[i].clone())
351            .collect_vec();
352
353        let lookup_prefix_len: usize = lookup_join_node.get_lookup_prefix_len() as usize;
354
355        let mut inner_side_key_idxs = vec![];
356        for inner_side_key in lookup_join_node.get_inner_side_key() {
357            inner_side_key_idxs.push(*inner_side_key as usize)
358        }
359
360        let inner_side_key_types = inner_side_key_idxs
361            .iter()
362            .map(|&i| inner_side_schema.fields[i].data_type.clone())
363            .collect_vec();
364
365        let null_safe = lookup_join_node.get_null_safe().clone();
366
367        let vnode_mapping = lookup_join_node
368            .get_inner_side_vnode_mapping()
369            .iter()
370            .copied()
371            .map(WorkerSlotId::from)
372            .collect_vec();
373
374        assert!(!vnode_mapping.is_empty());
375
376        let chunk_size = source.context().get_config().developer.chunk_size;
377
378        let asof_desc = lookup_join_node
379            .asof_desc
380            .map(|desc| AsOfDesc::from_protobuf(&desc))
381            .transpose()?;
382
383        let worker_nodes = lookup_join_node.get_worker_nodes();
384        let worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode> = worker_nodes
385            .iter()
386            .flat_map(|worker| {
387                (0..(worker.compute_node_parallelism()))
388                    .map(|i| (WorkerSlotId::new(worker.id, i), worker.clone()))
389            })
390            .collect();
391
392        let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into());
393
394        let inner_side_builder = InnerSideExecutorBuilder {
395            table_desc: table_desc.clone(),
396            table_distribution: TableDistribution::new_from_storage_table_desc(vnodes, table_desc),
397            vnode_mapping,
398            outer_side_key_types,
399            inner_side_schema,
400            inner_side_column_ids,
401            inner_side_key_types: inner_side_key_types.clone(),
402            lookup_prefix_len,
403            context: source.context().clone(),
404            task_id: source.task_id.clone(),
405            query_epoch: lookup_join_node
406                .query_epoch
407                .ok_or_else(|| anyhow!("query_epoch not set in local lookup join"))?,
408            worker_slot_to_scan_range_mapping: HashMap::new(),
409            chunk_size,
410            shutdown_rx: source.shutdown_rx().clone(),
411            next_stage_id: 0,
412            worker_slot_mapping,
413        };
414
415        let identity = source.plan_node().get_identity().clone();
416        Ok(LocalLookupJoinExecutorArgs {
417            join_type,
418            condition,
419            outer_side_input,
420            outer_side_data_types,
421            outer_side_key_idxs,
422            inner_side_builder,
423            inner_side_key_types,
424            inner_side_key_idxs,
425            null_safe,
426            lookup_prefix_len,
427            chunk_builder: DataChunkBuilder::new(original_schema.data_types(), chunk_size),
428            schema: actual_schema,
429            output_indices,
430            chunk_size,
431            asof_desc,
432            identity: identity.clone(),
433            shutdown_rx: source.shutdown_rx().clone(),
434            mem_ctx: source.context().create_executor_mem_context(&identity),
435        }
436        .dispatch())
437    }
438}
439
440struct LocalLookupJoinExecutorArgs<B: LookupExecutorBuilder> {
441    join_type: JoinType,
442    condition: Option<BoxedExpression>,
443    outer_side_input: BoxedExecutor,
444    outer_side_data_types: Vec<DataType>,
445    outer_side_key_idxs: Vec<usize>,
446    inner_side_builder: B,
447    inner_side_key_types: Vec<DataType>,
448    inner_side_key_idxs: Vec<usize>,
449    null_safe: Vec<bool>,
450    lookup_prefix_len: usize,
451    chunk_builder: DataChunkBuilder,
452    schema: Schema,
453    output_indices: Vec<usize>,
454    chunk_size: usize,
455    asof_desc: Option<AsOfDesc>,
456    identity: String,
457    shutdown_rx: ShutdownToken,
458    mem_ctx: MemoryContext,
459}
460
461impl<B: LookupExecutorBuilder> HashKeyDispatcher for LocalLookupJoinExecutorArgs<B> {
462    type Output = BoxedExecutor;
463
464    fn dispatch_impl<K: HashKey>(self) -> Self::Output {
465        Box::new(LocalLookupJoinExecutor::<K, B>::new(LookupJoinBase {
466            join_type: self.join_type,
467            condition: self.condition,
468            outer_side_input: self.outer_side_input,
469            outer_side_data_types: self.outer_side_data_types,
470            outer_side_key_idxs: self.outer_side_key_idxs,
471            inner_side_builder: self.inner_side_builder,
472            inner_side_key_types: self.inner_side_key_types,
473            inner_side_key_idxs: self.inner_side_key_idxs,
474            null_safe: self.null_safe,
475            lookup_prefix_len: self.lookup_prefix_len,
476            chunk_builder: self.chunk_builder,
477            schema: self.schema,
478            output_indices: self.output_indices,
479            chunk_size: self.chunk_size,
480            asof_desc: self.asof_desc,
481            identity: self.identity,
482            shutdown_rx: self.shutdown_rx,
483            mem_ctx: self.mem_ctx,
484            _phantom: PhantomData,
485        }))
486    }
487
488    fn data_types(&self) -> &[DataType] {
489        &self.inner_side_key_types
490    }
491}
492
493#[cfg(test)]
494mod tests {
495    use std::sync::Arc;
496
497    use risingwave_common::array::{DataChunk, DataChunkTestExt};
498    use risingwave_common::catalog::{Field, Schema};
499    use risingwave_common::hash::HashKeyDispatcher;
500    use risingwave_common::memory::MemoryContext;
501    use risingwave_common::types::DataType;
502    use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
503    use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
504    use risingwave_expr::expr::{BoxedExpression, build_from_pretty};
505
506    use crate::executor::join::JoinType;
507    use crate::executor::test_utils::{
508        FakeInnerSideExecutorBuilder, MockExecutor, diff_executor_output,
509    };
510    use crate::executor::{BoxedExecutor, SortExecutor};
511    use crate::local_lookup_join::LocalLookupJoinExecutorArgs;
512    use crate::monitor::BatchSpillMetrics;
513    use crate::task::ShutdownToken;
514
515    const CHUNK_SIZE: usize = 1024;
516
517    fn create_outer_side_input() -> BoxedExecutor {
518        let schema = Schema {
519            fields: vec![
520                Field::unnamed(DataType::Int32),
521                Field::unnamed(DataType::Float32),
522            ],
523        };
524        let mut executor = MockExecutor::new(schema);
525
526        executor.add(DataChunk::from_pretty(
527            "i f
528             1 6.1
529             2 8.4
530             3 3.9",
531        ));
532
533        executor.add(DataChunk::from_pretty(
534            "i f
535             2 5.5
536             5 4.1
537             5 9.1
538             . .",
539        ));
540
541        Box::new(executor)
542    }
543
544    fn create_lookup_join_executor(
545        join_type: JoinType,
546        condition: Option<BoxedExpression>,
547        null_safe: bool,
548    ) -> BoxedExecutor {
549        let outer_side_input = create_outer_side_input();
550
551        let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
552            outer_side_input.schema().fields.clone()
553        } else {
554            [
555                outer_side_input.schema().fields.clone(),
556                outer_side_input.schema().fields.clone(),
557            ]
558            .concat()
559        };
560        let original_schema = Schema { fields };
561
562        let inner_side_schema = Schema {
563            fields: outer_side_input.schema().fields.clone(),
564        };
565
566        let inner_side_data_types = inner_side_schema.data_types();
567        let outer_side_data_types = outer_side_input.schema().data_types();
568
569        LocalLookupJoinExecutorArgs {
570            join_type,
571            condition,
572            outer_side_input,
573            outer_side_data_types,
574            outer_side_key_idxs: vec![0],
575            inner_side_builder: FakeInnerSideExecutorBuilder::new(inner_side_schema),
576            inner_side_key_types: vec![inner_side_data_types[0].clone()],
577            inner_side_key_idxs: vec![0],
578            null_safe: vec![null_safe],
579            lookup_prefix_len: 1,
580            chunk_builder: DataChunkBuilder::new(original_schema.data_types(), CHUNK_SIZE),
581            schema: original_schema.clone(),
582            output_indices: (0..original_schema.len()).collect(),
583            chunk_size: CHUNK_SIZE,
584            asof_desc: None,
585            identity: "TestLookupJoinExecutor".to_owned(),
586            shutdown_rx: ShutdownToken::empty(),
587            mem_ctx: MemoryContext::none(),
588        }
589        .dispatch()
590    }
591
592    fn create_order_by_executor(child: BoxedExecutor) -> BoxedExecutor {
593        let column_orders = vec![
594            ColumnOrder {
595                column_index: 0,
596                order_type: OrderType::ascending(),
597            },
598            ColumnOrder {
599                column_index: 1,
600                order_type: OrderType::ascending(),
601            },
602        ];
603
604        Box::new(SortExecutor::new(
605            child,
606            Arc::new(column_orders),
607            "SortExecutor".into(),
608            CHUNK_SIZE,
609            MemoryContext::none(),
610            None,
611            BatchSpillMetrics::for_test(),
612        ))
613    }
614
615    async fn do_test(
616        join_type: JoinType,
617        condition: Option<BoxedExpression>,
618        null_safe: bool,
619        expected: DataChunk,
620    ) {
621        let lookup_join_executor = create_lookup_join_executor(join_type, condition, null_safe);
622        let order_by_executor = create_order_by_executor(lookup_join_executor);
623        let mut expected_mock_exec = MockExecutor::new(order_by_executor.schema().clone());
624        expected_mock_exec.add(expected);
625        diff_executor_output(order_by_executor, Box::new(expected_mock_exec)).await;
626    }
627
628    #[tokio::test]
629    async fn test_inner_join() {
630        let expected = DataChunk::from_pretty(
631            "i f   i f
632             1 6.1 1 9.2
633             2 5.5 2 5.5
634             2 5.5 2 4.4
635             2 8.4 2 5.5
636             2 8.4 2 4.4
637             5 4.1 5 2.3
638             5 4.1 5 3.7
639             5 9.1 5 2.3
640             5 9.1 5 3.7",
641        );
642
643        do_test(JoinType::Inner, None, false, expected).await;
644    }
645
646    #[tokio::test]
647    async fn test_null_safe_inner_join() {
648        let expected = DataChunk::from_pretty(
649            "i f   i f
650             1 6.1 1 9.2
651             2 5.5 2 5.5
652             2 5.5 2 4.4
653             2 8.4 2 5.5
654             2 8.4 2 4.4
655             5 4.1 5 2.3
656             5 4.1 5 3.7
657             5 9.1 5 2.3
658             5 9.1 5 3.7
659             .  .  .  .",
660        );
661
662        do_test(JoinType::Inner, None, true, expected).await;
663    }
664
665    #[tokio::test]
666    async fn test_left_outer_join() {
667        let expected = DataChunk::from_pretty(
668            "i f   i f
669             1 6.1 1 9.2
670             2 5.5 2 5.5
671             2 5.5 2 4.4
672             2 8.4 2 5.5
673             2 8.4 2 4.4
674             3 3.9 . .
675             5 4.1 5 2.3
676             5 4.1 5 3.7
677             5 9.1 5 2.3
678             5 9.1 5 3.7
679             . .   . .",
680        );
681
682        do_test(JoinType::LeftOuter, None, false, expected).await;
683    }
684
685    #[tokio::test]
686    async fn test_left_semi_join() {
687        let expected = DataChunk::from_pretty(
688            "i f
689             1 6.1
690             2 5.5
691             2 8.4
692             5 4.1
693             5 9.1",
694        );
695
696        do_test(JoinType::LeftSemi, None, false, expected).await;
697    }
698
699    #[tokio::test]
700    async fn test_left_anti_join() {
701        let expected = DataChunk::from_pretty(
702            "i f
703             3 3.9
704             . .",
705        );
706
707        do_test(JoinType::LeftAnti, None, false, expected).await;
708    }
709
710    #[tokio::test]
711    async fn test_inner_join_with_condition() {
712        let expected = DataChunk::from_pretty(
713            "i f   i f
714             1 6.1 1 9.2
715             2 5.5 2 5.5
716             2 8.4 2 5.5",
717        );
718        let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
719
720        do_test(JoinType::Inner, Some(condition), false, expected).await;
721    }
722
723    #[tokio::test]
724    async fn test_left_outer_join_with_condition() {
725        let expected = DataChunk::from_pretty(
726            "i f   i f
727             1 6.1 1 9.2
728             2 5.5 2 5.5
729             2 8.4 2 5.5
730             3 3.9 . .
731             5 4.1 . .
732             5 9.1 . .
733             . .   . .",
734        );
735        let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
736
737        do_test(JoinType::LeftOuter, Some(condition), false, expected).await;
738    }
739
740    #[tokio::test]
741    async fn test_left_semi_join_with_condition() {
742        let expected = DataChunk::from_pretty(
743            "i f
744             1 6.1
745             2 5.5
746             2 8.4",
747        );
748        let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
749
750        do_test(JoinType::LeftSemi, Some(condition), false, expected).await;
751    }
752
753    #[tokio::test]
754    async fn test_left_anti_join_with_condition() {
755        let expected = DataChunk::from_pretty(
756            "i f
757            3 3.9
758            5 4.1
759            5 9.1
760            . .",
761        );
762        let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
763
764        do_test(JoinType::LeftAnti, Some(condition), false, expected).await;
765    }
766}