risingwave_batch_executors/executor/join/
local_lookup_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::marker::PhantomData;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
22use risingwave_common::catalog::{ColumnDesc, Field, Schema};
23use risingwave_common::hash::table_distribution::TableDistribution;
24use risingwave_common::hash::{
25    ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, VnodeCountCompat,
26    WorkerSlotId,
27};
28use risingwave_common::memory::MemoryContext;
29use risingwave_common::types::{DataType, Datum};
30use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
31use risingwave_common::util::iter_util::ZipEqFast;
32use risingwave_common::util::scan_range::ScanRange;
33use risingwave_common::util::tracing::TracingContext;
34use risingwave_expr::expr::{BoxedExpression, build_from_prost};
35use risingwave_pb::batch_plan::exchange_info::DistributionMode;
36use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
37use risingwave_pb::batch_plan::plan_node::NodeBody;
38use risingwave_pb::batch_plan::{
39    ExchangeInfo, ExchangeNode, LocalExecutePlan, PbExchangeSource, PbTaskId, PlanFragment,
40    PlanNode, RowSeqScanNode, TaskOutputId,
41};
42use risingwave_pb::common::{BatchQueryEpoch, WorkerNode};
43use risingwave_pb::plan_common::StorageTableDesc;
44
45use super::AsOfDesc;
46use crate::error::Result;
47use crate::executor::{
48    AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, DummyExecutor, Executor,
49    ExecutorBuilder, JoinType, LookupJoinBase, unix_timestamp_sec_to_epoch,
50};
51use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
52
53/// Inner side executor builder for the `LocalLookupJoinExecutor`
54struct InnerSideExecutorBuilder {
55    table_desc: StorageTableDesc,
56    table_distribution: TableDistribution,
57    vnode_mapping: ExpandedWorkerSlotMapping,
58    outer_side_key_types: Vec<DataType>,
59    inner_side_schema: Schema,
60    inner_side_column_ids: Vec<i32>,
61    inner_side_key_types: Vec<DataType>,
62    lookup_prefix_len: usize,
63    context: Arc<dyn BatchTaskContext>,
64    task_id: TaskId,
65    epoch: BatchQueryEpoch,
66    worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode>,
67    worker_slot_to_scan_range_mapping: HashMap<WorkerSlotId, Vec<(ScanRange, VirtualNode)>>,
68    #[expect(dead_code)]
69    chunk_size: usize,
70    shutdown_rx: ShutdownToken,
71    next_stage_id: usize,
72    as_of: Option<AsOf>,
73}
74
75/// Used to build the executor for the inner side
76pub trait LookupExecutorBuilder: Send + 'static {
77    fn reset(&mut self);
78
79    fn add_scan_range(
80        &mut self,
81        key_datums: Vec<Datum>,
82    ) -> impl Future<Output = Result<()>> + Send + '_;
83
84    fn build_executor(&mut self) -> impl Future<Output = Result<BoxedExecutor>> + Send + '_;
85}
86
87impl InnerSideExecutorBuilder {
88    /// Gets the virtual node based on the given `scan_range`
89    fn get_virtual_node(&self, scan_range: &ScanRange) -> Result<VirtualNode> {
90        let virtual_node = scan_range
91            .try_compute_vnode(&self.table_distribution)
92            .context("Could not compute vnode for lookup join")?;
93        Ok(virtual_node)
94    }
95
96    /// Creates the `RowSeqScanNode` that will be used for scanning the inner side table
97    /// based on the passed `scan_range` and virtual node.
98    fn create_row_seq_scan_node(&self, id: &WorkerSlotId) -> Result<NodeBody> {
99        let list = self.worker_slot_to_scan_range_mapping.get(id).unwrap();
100        let mut scan_ranges = vec![];
101        let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len());
102
103        list.iter().for_each(|(scan_range, vnode)| {
104            scan_ranges.push(scan_range.to_protobuf());
105            vnode_bitmap.set(vnode.to_index(), true);
106        });
107
108        let row_seq_scan_node = NodeBody::RowSeqScan(RowSeqScanNode {
109            table_desc: Some(self.table_desc.clone()),
110            column_ids: self.inner_side_column_ids.clone(),
111            scan_ranges,
112            ordered: false,
113            vnode_bitmap: Some(vnode_bitmap.finish().to_protobuf()),
114            limit: None,
115            as_of: self.as_of.as_ref().map(Into::into),
116        });
117
118        Ok(row_seq_scan_node)
119    }
120
121    /// Creates the `PbExchangeSource` using the given `id`.
122    fn build_prost_exchange_source(&self, id: &WorkerSlotId) -> Result<PbExchangeSource> {
123        let worker = self
124            .worker_slot_mapping
125            .get(id)
126            .context("No worker node found for the given worker slot id.")?;
127
128        let local_execute_plan = LocalExecutePlan {
129            plan: Some(PlanFragment {
130                root: Some(PlanNode {
131                    children: vec![],
132                    identity: "SeqScan".to_owned(),
133                    node_body: Some(self.create_row_seq_scan_node(id)?),
134                }),
135                exchange_info: Some(ExchangeInfo {
136                    mode: DistributionMode::Single as i32,
137                    ..Default::default()
138                }),
139            }),
140            epoch: Some(self.epoch),
141            tracing_context: TracingContext::from_current_span().to_protobuf(),
142        };
143
144        let prost_exchange_source = PbExchangeSource {
145            task_output_id: Some(TaskOutputId {
146                task_id: Some(PbTaskId {
147                    // FIXME: We should replace this random generated uuid to current query_id for
148                    // better dashboard. However, due to the lack of info of
149                    // stage_id and task_id, we can not do it now. Now just make sure it will not
150                    // conflict.
151                    query_id: self.task_id.query_id.clone(),
152                    stage_id: self.task_id.stage_id + 10000 + self.next_stage_id as u32,
153                    task_id: (*id).into(),
154                }),
155                output_id: 0,
156            }),
157            host: Some(worker.host.as_ref().unwrap().clone()),
158            local_execute_plan: Some(Plan(local_execute_plan)),
159        };
160
161        Ok(prost_exchange_source)
162    }
163}
164
165impl LookupExecutorBuilder for InnerSideExecutorBuilder {
166    fn reset(&mut self) {
167        self.worker_slot_to_scan_range_mapping = HashMap::new();
168    }
169
170    /// Adds the scan range made from the given `kwy_scalar_impls` into the worker slot id
171    /// hash map, along with the scan range's virtual node.
172    async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()> {
173        let mut scan_range = ScanRange::full_table_scan();
174
175        for ((datum, outer_type), inner_type) in key_datums
176            .into_iter()
177            .zip_eq_fast(
178                self.outer_side_key_types
179                    .iter()
180                    .take(self.lookup_prefix_len),
181            )
182            .zip_eq_fast(
183                self.inner_side_key_types
184                    .iter()
185                    .take(self.lookup_prefix_len),
186            )
187        {
188            let datum = if inner_type == outer_type {
189                datum
190            } else {
191                bail!("Join key types are not aligned: LHS: {outer_type:?}, RHS: {inner_type:?}");
192            };
193
194            scan_range.eq_conds.push(datum);
195        }
196
197        let vnode = self.get_virtual_node(&scan_range)?;
198        let worker_slot_id = self.vnode_mapping[vnode.to_index()];
199
200        let list = self
201            .worker_slot_to_scan_range_mapping
202            .entry(worker_slot_id)
203            .or_default();
204        list.push((scan_range, vnode));
205
206        Ok(())
207    }
208
209    /// Builds and returns the `ExchangeExecutor` used for the inner side of the
210    /// `LocalLookupJoinExecutor`.
211    async fn build_executor(&mut self) -> Result<BoxedExecutor> {
212        self.next_stage_id += 1;
213        let mut sources = vec![];
214        for id in self.worker_slot_to_scan_range_mapping.keys() {
215            sources.push(self.build_prost_exchange_source(id)?);
216        }
217
218        if sources.is_empty() {
219            return Ok(Box::new(DummyExecutor {
220                schema: Schema::default(),
221            }));
222        }
223
224        let exchange_node = NodeBody::Exchange(ExchangeNode {
225            sources,
226            sequential: true,
227            input_schema: self.inner_side_schema.to_prost(),
228        });
229
230        let plan_node = PlanNode {
231            children: vec![],
232            identity: "LocalLookupJoinExchangeExecutor".to_owned(),
233            node_body: Some(exchange_node),
234        };
235
236        let task_id = self.task_id.clone();
237
238        let executor_builder = ExecutorBuilder::new(
239            &plan_node,
240            &task_id,
241            self.context.clone(),
242            self.epoch,
243            self.shutdown_rx.clone(),
244        );
245
246        executor_builder.build().await
247    }
248}
249
250/// Local Lookup Join Executor.
251/// High level Execution flow:
252/// Repeat 1-3:
253///   1. Read N rows from outer side input and send keys to inner side builder after deduplication.
254///   2. Inner side input lookups inner side table with keys and builds hash map.
255///   3. Outer side rows join each inner side rows by probing the hash map.
256///
257/// Furthermore, we also want to minimize the number of RPC requests we send through the
258/// `ExchangeExecutors`. This is done by grouping rows with the same key datums together, and also
259/// by grouping together scan ranges that point to the same partition (and can thus be easily
260/// scanned by the same worker node).
261pub struct LocalLookupJoinExecutor<K, B: LookupExecutorBuilder> {
262    base: LookupJoinBase<K, B>,
263}
264
265impl<K: HashKey, B: LookupExecutorBuilder> Executor for LocalLookupJoinExecutor<K, B> {
266    fn schema(&self) -> &Schema {
267        &self.base.schema
268    }
269
270    fn identity(&self) -> &str {
271        &self.base.identity
272    }
273
274    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
275        Box::new(self.base).do_execute()
276    }
277}
278
279impl<K, B: LookupExecutorBuilder> LocalLookupJoinExecutor<K, B> {
280    pub fn new(base: LookupJoinBase<K, B>) -> Self {
281        Self { base }
282    }
283}
284
285pub struct LocalLookupJoinExecutorBuilder {}
286
287impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
288    async fn new_boxed_executor(
289        source: &ExecutorBuilder<'_>,
290        inputs: Vec<BoxedExecutor>,
291    ) -> Result<BoxedExecutor> {
292        let [outer_side_input]: [_; 1] = inputs.try_into().unwrap();
293
294        let lookup_join_node = try_match_expand!(
295            source.plan_node().get_node_body().unwrap(),
296            NodeBody::LocalLookupJoin
297        )?;
298        // as_of takes precedence
299        let as_of = lookup_join_node
300            .as_of
301            .as_ref()
302            .map(AsOf::try_from)
303            .transpose()?;
304        let query_epoch = as_of
305            .as_ref()
306            .map(|a| {
307                let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
308                tracing::debug!(epoch, "time travel");
309                risingwave_pb::common::BatchQueryEpoch {
310                    epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
311                        epoch,
312                    )),
313                }
314            })
315            .unwrap_or_else(|| source.epoch());
316
317        let join_type = JoinType::from_prost(lookup_join_node.get_join_type()?);
318        let condition = match lookup_join_node.get_condition() {
319            Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
320            Err(_) => None,
321        };
322
323        let output_indices: Vec<usize> = lookup_join_node
324            .get_output_indices()
325            .iter()
326            .map(|&x| x as usize)
327            .collect();
328
329        let outer_side_data_types = outer_side_input.schema().data_types();
330
331        let table_desc = lookup_join_node.get_inner_side_table_desc()?;
332        let inner_side_column_ids = lookup_join_node.get_inner_side_column_ids().to_vec();
333
334        let inner_side_schema = Schema {
335            fields: inner_side_column_ids
336                .iter()
337                .map(|&id| {
338                    let column = table_desc
339                        .columns
340                        .iter()
341                        .find(|c| c.column_id == id)
342                        .unwrap();
343                    Field::from(&ColumnDesc::from(column))
344                })
345                .collect_vec(),
346        };
347
348        let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
349            outer_side_input.schema().fields.clone()
350        } else {
351            [
352                outer_side_input.schema().fields.clone(),
353                inner_side_schema.fields.clone(),
354            ]
355            .concat()
356        };
357
358        let original_schema = Schema { fields };
359        let actual_schema = output_indices
360            .iter()
361            .map(|&idx| original_schema[idx].clone())
362            .collect();
363
364        let mut outer_side_key_idxs = vec![];
365        for outer_side_key in lookup_join_node.get_outer_side_key() {
366            outer_side_key_idxs.push(*outer_side_key as usize)
367        }
368
369        let outer_side_key_types: Vec<DataType> = outer_side_key_idxs
370            .iter()
371            .map(|&i| outer_side_data_types[i].clone())
372            .collect_vec();
373
374        let lookup_prefix_len: usize = lookup_join_node.get_lookup_prefix_len() as usize;
375
376        let mut inner_side_key_idxs = vec![];
377        for inner_side_key in lookup_join_node.get_inner_side_key() {
378            inner_side_key_idxs.push(*inner_side_key as usize)
379        }
380
381        let inner_side_key_types = inner_side_key_idxs
382            .iter()
383            .map(|&i| inner_side_schema.fields[i].data_type.clone())
384            .collect_vec();
385
386        let null_safe = lookup_join_node.get_null_safe().to_vec();
387
388        let vnode_mapping = lookup_join_node
389            .get_inner_side_vnode_mapping()
390            .iter()
391            .copied()
392            .map(WorkerSlotId::from)
393            .collect_vec();
394
395        assert!(!vnode_mapping.is_empty());
396
397        let chunk_size = source.context().get_config().developer.chunk_size;
398
399        let asof_desc = lookup_join_node
400            .asof_desc
401            .map(|desc| AsOfDesc::from_protobuf(&desc))
402            .transpose()?;
403
404        let worker_nodes = lookup_join_node.get_worker_nodes();
405        let worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode> = worker_nodes
406            .iter()
407            .flat_map(|worker| {
408                (0..(worker.compute_node_parallelism()))
409                    .map(|i| (WorkerSlotId::new(worker.id, i), worker.clone()))
410            })
411            .collect();
412
413        let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into());
414
415        let inner_side_builder = InnerSideExecutorBuilder {
416            table_desc: table_desc.clone(),
417            table_distribution: TableDistribution::new_from_storage_table_desc(vnodes, table_desc),
418            vnode_mapping,
419            outer_side_key_types,
420            inner_side_schema,
421            inner_side_column_ids,
422            inner_side_key_types: inner_side_key_types.clone(),
423            lookup_prefix_len,
424            context: source.context().clone(),
425            task_id: source.task_id.clone(),
426            epoch: query_epoch,
427            worker_slot_to_scan_range_mapping: HashMap::new(),
428            chunk_size,
429            shutdown_rx: source.shutdown_rx().clone(),
430            next_stage_id: 0,
431            worker_slot_mapping,
432            as_of,
433        };
434
435        let identity = source.plan_node().get_identity().clone();
436        Ok(LocalLookupJoinExecutorArgs {
437            join_type,
438            condition,
439            outer_side_input,
440            outer_side_data_types,
441            outer_side_key_idxs,
442            inner_side_builder,
443            inner_side_key_types,
444            inner_side_key_idxs,
445            null_safe,
446            lookup_prefix_len,
447            chunk_builder: DataChunkBuilder::new(original_schema.data_types(), chunk_size),
448            schema: actual_schema,
449            output_indices,
450            chunk_size,
451            asof_desc,
452            identity: identity.clone(),
453            shutdown_rx: source.shutdown_rx().clone(),
454            mem_ctx: source.context().create_executor_mem_context(&identity),
455        }
456        .dispatch())
457    }
458}
459
460struct LocalLookupJoinExecutorArgs<B: LookupExecutorBuilder> {
461    join_type: JoinType,
462    condition: Option<BoxedExpression>,
463    outer_side_input: BoxedExecutor,
464    outer_side_data_types: Vec<DataType>,
465    outer_side_key_idxs: Vec<usize>,
466    inner_side_builder: B,
467    inner_side_key_types: Vec<DataType>,
468    inner_side_key_idxs: Vec<usize>,
469    null_safe: Vec<bool>,
470    lookup_prefix_len: usize,
471    chunk_builder: DataChunkBuilder,
472    schema: Schema,
473    output_indices: Vec<usize>,
474    chunk_size: usize,
475    asof_desc: Option<AsOfDesc>,
476    identity: String,
477    shutdown_rx: ShutdownToken,
478    mem_ctx: MemoryContext,
479}
480
481impl<B: LookupExecutorBuilder> HashKeyDispatcher for LocalLookupJoinExecutorArgs<B> {
482    type Output = BoxedExecutor;
483
484    fn dispatch_impl<K: HashKey>(self) -> Self::Output {
485        Box::new(LocalLookupJoinExecutor::<K, B>::new(LookupJoinBase {
486            join_type: self.join_type,
487            condition: self.condition,
488            outer_side_input: self.outer_side_input,
489            outer_side_data_types: self.outer_side_data_types,
490            outer_side_key_idxs: self.outer_side_key_idxs,
491            inner_side_builder: self.inner_side_builder,
492            inner_side_key_types: self.inner_side_key_types,
493            inner_side_key_idxs: self.inner_side_key_idxs,
494            null_safe: self.null_safe,
495            lookup_prefix_len: self.lookup_prefix_len,
496            chunk_builder: self.chunk_builder,
497            schema: self.schema,
498            output_indices: self.output_indices,
499            chunk_size: self.chunk_size,
500            asof_desc: self.asof_desc,
501            identity: self.identity,
502            shutdown_rx: self.shutdown_rx,
503            mem_ctx: self.mem_ctx,
504            _phantom: PhantomData,
505        }))
506    }
507
508    fn data_types(&self) -> &[DataType] {
509        &self.inner_side_key_types
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use std::sync::Arc;
516
517    use risingwave_common::array::{DataChunk, DataChunkTestExt};
518    use risingwave_common::catalog::{Field, Schema};
519    use risingwave_common::hash::HashKeyDispatcher;
520    use risingwave_common::memory::MemoryContext;
521    use risingwave_common::types::DataType;
522    use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
523    use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
524    use risingwave_expr::expr::{BoxedExpression, build_from_pretty};
525
526    use crate::executor::join::JoinType;
527    use crate::executor::test_utils::{
528        FakeInnerSideExecutorBuilder, MockExecutor, diff_executor_output,
529    };
530    use crate::executor::{BoxedExecutor, SortExecutor};
531    use crate::local_lookup_join::LocalLookupJoinExecutorArgs;
532    use crate::monitor::BatchSpillMetrics;
533    use crate::task::ShutdownToken;
534
535    const CHUNK_SIZE: usize = 1024;
536
537    fn create_outer_side_input() -> BoxedExecutor {
538        let schema = Schema {
539            fields: vec![
540                Field::unnamed(DataType::Int32),
541                Field::unnamed(DataType::Float32),
542            ],
543        };
544        let mut executor = MockExecutor::new(schema);
545
546        executor.add(DataChunk::from_pretty(
547            "i f
548             1 6.1
549             2 8.4
550             3 3.9",
551        ));
552
553        executor.add(DataChunk::from_pretty(
554            "i f
555             2 5.5
556             5 4.1
557             5 9.1
558             . .",
559        ));
560
561        Box::new(executor)
562    }
563
564    fn create_lookup_join_executor(
565        join_type: JoinType,
566        condition: Option<BoxedExpression>,
567        null_safe: bool,
568    ) -> BoxedExecutor {
569        let outer_side_input = create_outer_side_input();
570
571        let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
572            outer_side_input.schema().fields.clone()
573        } else {
574            [
575                outer_side_input.schema().fields.clone(),
576                outer_side_input.schema().fields.clone(),
577            ]
578            .concat()
579        };
580        let original_schema = Schema { fields };
581
582        let inner_side_schema = Schema {
583            fields: outer_side_input.schema().fields.clone(),
584        };
585
586        let inner_side_data_types = inner_side_schema.data_types();
587        let outer_side_data_types = outer_side_input.schema().data_types();
588
589        LocalLookupJoinExecutorArgs {
590            join_type,
591            condition,
592            outer_side_input,
593            outer_side_data_types,
594            outer_side_key_idxs: vec![0],
595            inner_side_builder: FakeInnerSideExecutorBuilder::new(inner_side_schema),
596            inner_side_key_types: vec![inner_side_data_types[0].clone()],
597            inner_side_key_idxs: vec![0],
598            null_safe: vec![null_safe],
599            lookup_prefix_len: 1,
600            chunk_builder: DataChunkBuilder::new(original_schema.data_types(), CHUNK_SIZE),
601            schema: original_schema.clone(),
602            output_indices: (0..original_schema.len()).collect(),
603            chunk_size: CHUNK_SIZE,
604            asof_desc: None,
605            identity: "TestLookupJoinExecutor".to_owned(),
606            shutdown_rx: ShutdownToken::empty(),
607            mem_ctx: MemoryContext::none(),
608        }
609        .dispatch()
610    }
611
612    fn create_order_by_executor(child: BoxedExecutor) -> BoxedExecutor {
613        let column_orders = vec![
614            ColumnOrder {
615                column_index: 0,
616                order_type: OrderType::ascending(),
617            },
618            ColumnOrder {
619                column_index: 1,
620                order_type: OrderType::ascending(),
621            },
622        ];
623
624        Box::new(SortExecutor::new(
625            child,
626            Arc::new(column_orders),
627            "SortExecutor".into(),
628            CHUNK_SIZE,
629            MemoryContext::none(),
630            None,
631            BatchSpillMetrics::for_test(),
632        ))
633    }
634
635    async fn do_test(
636        join_type: JoinType,
637        condition: Option<BoxedExpression>,
638        null_safe: bool,
639        expected: DataChunk,
640    ) {
641        let lookup_join_executor = create_lookup_join_executor(join_type, condition, null_safe);
642        let order_by_executor = create_order_by_executor(lookup_join_executor);
643        let mut expected_mock_exec = MockExecutor::new(order_by_executor.schema().clone());
644        expected_mock_exec.add(expected);
645        diff_executor_output(order_by_executor, Box::new(expected_mock_exec)).await;
646    }
647
648    #[tokio::test]
649    async fn test_inner_join() {
650        let expected = DataChunk::from_pretty(
651            "i f   i f
652             1 6.1 1 9.2
653             2 5.5 2 5.5
654             2 5.5 2 4.4
655             2 8.4 2 5.5
656             2 8.4 2 4.4
657             5 4.1 5 2.3
658             5 4.1 5 3.7
659             5 9.1 5 2.3
660             5 9.1 5 3.7",
661        );
662
663        do_test(JoinType::Inner, None, false, expected).await;
664    }
665
666    #[tokio::test]
667    async fn test_null_safe_inner_join() {
668        let expected = DataChunk::from_pretty(
669            "i f   i f
670             1 6.1 1 9.2
671             2 5.5 2 5.5
672             2 5.5 2 4.4
673             2 8.4 2 5.5
674             2 8.4 2 4.4
675             5 4.1 5 2.3
676             5 4.1 5 3.7
677             5 9.1 5 2.3
678             5 9.1 5 3.7
679             .  .  .  .",
680        );
681
682        do_test(JoinType::Inner, None, true, expected).await;
683    }
684
685    #[tokio::test]
686    async fn test_left_outer_join() {
687        let expected = DataChunk::from_pretty(
688            "i f   i f
689             1 6.1 1 9.2
690             2 5.5 2 5.5
691             2 5.5 2 4.4
692             2 8.4 2 5.5
693             2 8.4 2 4.4
694             3 3.9 . .
695             5 4.1 5 2.3
696             5 4.1 5 3.7
697             5 9.1 5 2.3
698             5 9.1 5 3.7
699             . .   . .",
700        );
701
702        do_test(JoinType::LeftOuter, None, false, expected).await;
703    }
704
705    #[tokio::test]
706    async fn test_left_semi_join() {
707        let expected = DataChunk::from_pretty(
708            "i f
709             1 6.1
710             2 5.5
711             2 8.4
712             5 4.1
713             5 9.1",
714        );
715
716        do_test(JoinType::LeftSemi, None, false, expected).await;
717    }
718
719    #[tokio::test]
720    async fn test_left_anti_join() {
721        let expected = DataChunk::from_pretty(
722            "i f
723             3 3.9
724             . .",
725        );
726
727        do_test(JoinType::LeftAnti, None, false, expected).await;
728    }
729
730    #[tokio::test]
731    async fn test_inner_join_with_condition() {
732        let expected = DataChunk::from_pretty(
733            "i f   i f
734             1 6.1 1 9.2
735             2 5.5 2 5.5
736             2 8.4 2 5.5",
737        );
738        let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
739
740        do_test(JoinType::Inner, Some(condition), false, expected).await;
741    }
742
743    #[tokio::test]
744    async fn test_left_outer_join_with_condition() {
745        let expected = DataChunk::from_pretty(
746            "i f   i f
747             1 6.1 1 9.2
748             2 5.5 2 5.5
749             2 8.4 2 5.5
750             3 3.9 . .
751             5 4.1 . .
752             5 9.1 . .
753             . .   . .",
754        );
755        let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
756
757        do_test(JoinType::LeftOuter, Some(condition), false, expected).await;
758    }
759
760    #[tokio::test]
761    async fn test_left_semi_join_with_condition() {
762        let expected = DataChunk::from_pretty(
763            "i f
764             1 6.1
765             2 5.5
766             2 8.4",
767        );
768        let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
769
770        do_test(JoinType::LeftSemi, Some(condition), false, expected).await;
771    }
772
773    #[tokio::test]
774    async fn test_left_anti_join_with_condition() {
775        let expected = DataChunk::from_pretty(
776            "i f
777            3 3.9
778            5 4.1
779            5 9.1
780            . .",
781        );
782        let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
783
784        do_test(JoinType::LeftAnti, Some(condition), false, expected).await;
785    }
786}