risingwave_batch_executors/executor/join/
nested_loop_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use risingwave_common::array::data_chunk_iter::RowRef;
17use risingwave_common::array::{Array, DataChunk};
18use risingwave_common::bitmap::BitmapBuilder;
19use risingwave_common::catalog::Schema;
20use risingwave_common::memory::MemoryContext;
21use risingwave_common::row::{RowExt, repeat_n};
22use risingwave_common::types::{DataType, Datum};
23use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use risingwave_common_estimate_size::EstimateSize;
26use risingwave_expr::expr::{
27    BoxedExpression, Expression, build_from_prost as expr_build_from_prost,
28};
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30
31use crate::error::{BatchError, Result};
32use crate::executor::join::{JoinType, concatenate, convert_row_to_chunk};
33use crate::executor::{
34    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
35};
36use crate::task::ShutdownToken;
37
38/// Nested loop join executor.
39///
40///
41/// High Level Idea:
42/// 1. Iterate tuple from left child.
43/// 2. Concatenated with right chunk, eval expression and get visibility bitmap
44/// 3. Create new chunk with visibility bitmap and yield to upper.
45pub struct NestedLoopJoinExecutor {
46    /// Expression to eval join condition
47    join_expr: BoxedExpression,
48    /// Executor should handle different join type.
49    join_type: JoinType,
50    /// Original output schema
51    original_schema: Schema,
52    /// Actual output schema
53    schema: Schema,
54    /// We may only need certain columns.
55    /// `output_indices` are the indices of the columns that we needed.
56    output_indices: Vec<usize>,
57    /// Left child executor
58    left_child: BoxedExecutor,
59    /// Right child executor
60    right_child: BoxedExecutor,
61    /// Identity string of the executor
62    identity: String,
63    /// The maximum size of the chunk produced by executor at a time.
64    chunk_size: usize,
65
66    /// Memory context used for recording memory usage of executor.
67    mem_context: MemoryContext,
68
69    shutdown_rx: ShutdownToken,
70}
71
72impl Executor for NestedLoopJoinExecutor {
73    fn schema(&self) -> &Schema {
74        &self.schema
75    }
76
77    fn identity(&self) -> &str {
78        &self.identity
79    }
80
81    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
82        self.do_execute()
83    }
84}
85
86impl NestedLoopJoinExecutor {
87    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
88    async fn do_execute(self: Box<Self>) {
89        let left_data_types = self.left_child.schema().data_types();
90        let data_types = self.original_schema.data_types();
91
92        let mut chunk_builder = DataChunkBuilder::new(data_types, self.chunk_size);
93
94        // Cache the outputs of left child
95        let left: Vec<DataChunk> = {
96            let mut ret = Vec::with_capacity(1024);
97            #[for_await]
98            for chunk in self.left_child.execute() {
99                let c = chunk?;
100                trace!("Estimated chunk size is {:?}", c.estimated_heap_size());
101                if !self.mem_context.add(c.estimated_heap_size() as i64) {
102                    Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
103                }
104                ret.push(c);
105            }
106            ret
107        };
108
109        // Get the joined stream
110        let stream = match self.join_type {
111            JoinType::Inner => Self::do_inner_join,
112            JoinType::LeftOuter => Self::do_left_outer_join,
113            JoinType::LeftSemi => Self::do_left_semi_anti_join::<false>,
114            JoinType::LeftAnti => Self::do_left_semi_anti_join::<true>,
115            JoinType::RightOuter => Self::do_right_outer_join,
116            JoinType::RightSemi => Self::do_right_semi_anti_join::<false>,
117            JoinType::RightAnti => Self::do_right_semi_anti_join::<true>,
118            JoinType::FullOuter => Self::do_full_outer_join,
119            JoinType::AsOfInner | JoinType::AsOfLeftOuter => {
120                unimplemented!("AsOf join is not supported in NestedLoopJoinExecutor")
121            }
122        };
123
124        #[for_await]
125        for chunk in stream(
126            &mut chunk_builder,
127            left_data_types,
128            self.join_expr,
129            left,
130            self.right_child,
131            self.shutdown_rx.clone(),
132        ) {
133            yield chunk?.project(&self.output_indices)
134        }
135
136        // Handle remaining chunk
137        if let Some(chunk) = chunk_builder.consume_all() {
138            yield chunk.project(&self.output_indices)
139        }
140    }
141}
142
143impl NestedLoopJoinExecutor {
144    /// Create a chunk by concatenating a row with a chunk and set its visibility according to the
145    /// evaluation result of the expression.
146    async fn concatenate_and_eval(
147        expr: &dyn Expression,
148        left_row_types: &[DataType],
149        left_row: RowRef<'_>,
150        right_chunk: &DataChunk,
151    ) -> Result<DataChunk> {
152        let left_chunk = convert_row_to_chunk(&left_row, right_chunk.capacity(), left_row_types)?;
153        let mut chunk = concatenate(&left_chunk, right_chunk)?;
154        chunk.set_visibility(expr.eval(&chunk).await?.as_bool().iter().collect());
155        Ok(chunk)
156    }
157}
158
159impl BoxedExecutorBuilder for NestedLoopJoinExecutor {
160    async fn new_boxed_executor(
161        source: &ExecutorBuilder<'_>,
162        inputs: Vec<BoxedExecutor>,
163    ) -> Result<BoxedExecutor> {
164        let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap();
165
166        let nested_loop_join_node = try_match_expand!(
167            source.plan_node().get_node_body().unwrap(),
168            NodeBody::NestedLoopJoin
169        )?;
170
171        let join_type = JoinType::from_prost(nested_loop_join_node.get_join_type()?);
172        let join_expr = expr_build_from_prost(nested_loop_join_node.get_join_cond()?)?;
173
174        let output_indices = nested_loop_join_node
175            .output_indices
176            .iter()
177            .map(|&v| v as usize)
178            .collect();
179
180        let identity = source.plan_node().get_identity().clone();
181        let mem_context = source.context().create_executor_mem_context(&identity);
182
183        Ok(Box::new(NestedLoopJoinExecutor::new(
184            join_expr,
185            join_type,
186            output_indices,
187            left_child,
188            right_child,
189            identity,
190            source.context().get_config().developer.chunk_size,
191            mem_context,
192            source.shutdown_rx().clone(),
193        )))
194    }
195}
196
197impl NestedLoopJoinExecutor {
198    #[allow(clippy::too_many_arguments)]
199    pub fn new(
200        join_expr: BoxedExpression,
201        join_type: JoinType,
202        output_indices: Vec<usize>,
203        left_child: BoxedExecutor,
204        right_child: BoxedExecutor,
205        identity: String,
206        chunk_size: usize,
207        mem_context: MemoryContext,
208        shutdown_rx: ShutdownToken,
209    ) -> Self {
210        // TODO(Bowen): Merge this with derive schema in Logical Join (#790).
211        let original_schema = match join_type {
212            JoinType::LeftSemi | JoinType::LeftAnti => left_child.schema().clone(),
213            JoinType::RightSemi | JoinType::RightAnti => right_child.schema().clone(),
214            _ => Schema::from_iter(
215                left_child
216                    .schema()
217                    .fields()
218                    .iter()
219                    .chain(right_child.schema().fields().iter())
220                    .cloned(),
221            ),
222        };
223        let schema = Schema::from_iter(
224            output_indices
225                .iter()
226                .map(|&idx| original_schema[idx].clone()),
227        );
228        Self {
229            join_expr,
230            join_type,
231            original_schema,
232            schema,
233            output_indices,
234            left_child,
235            right_child,
236            identity,
237            chunk_size,
238            mem_context,
239            shutdown_rx,
240        }
241    }
242}
243
244impl NestedLoopJoinExecutor {
245    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
246    async fn do_inner_join(
247        chunk_builder: &mut DataChunkBuilder,
248        left_data_types: Vec<DataType>,
249        join_expr: BoxedExpression,
250        left: Vec<DataChunk>,
251        right: BoxedExecutor,
252        shutdown_rx: ShutdownToken,
253    ) {
254        // 1. Iterate over the right table by chunks.
255        #[for_await]
256        for right_chunk in right.execute() {
257            let right_chunk = right_chunk?;
258            // 2. Iterator over the left table by rows.
259            for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
260                shutdown_rx.check()?;
261                // 3. Concatenate the left row and right chunk into a single chunk and evaluate the
262                // expression on it.
263                let chunk = Self::concatenate_and_eval(
264                    join_expr.as_ref(),
265                    &left_data_types,
266                    left_row,
267                    &right_chunk,
268                )
269                .await?;
270                // 4. Yield the concatenated chunk.
271                if chunk.cardinality() > 0 {
272                    for spilled in chunk_builder.append_chunk(chunk) {
273                        yield spilled
274                    }
275                }
276            }
277        }
278    }
279
280    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
281    async fn do_left_outer_join(
282        chunk_builder: &mut DataChunkBuilder,
283        left_data_types: Vec<DataType>,
284        join_expr: BoxedExpression,
285        left: Vec<DataChunk>,
286        right: BoxedExecutor,
287        shutdown_rx: ShutdownToken,
288    ) {
289        let mut matched = BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
290        let right_data_types = right.schema().data_types();
291        // Same as inner join except that a bitmap is used to track which row of the left table is
292        // matched.
293        #[for_await]
294        for right_chunk in right.execute() {
295            let right_chunk = right_chunk?;
296            for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
297                shutdown_rx.check()?;
298                let chunk = Self::concatenate_and_eval(
299                    join_expr.as_ref(),
300                    &left_data_types,
301                    left_row,
302                    &right_chunk,
303                )
304                .await?;
305                if chunk.cardinality() > 0 {
306                    matched.set(left_row_idx, true);
307                    for spilled in chunk_builder.append_chunk(chunk) {
308                        yield spilled
309                    }
310                }
311            }
312        }
313        // Yield unmatched rows in the left table.
314        for (left_row, _) in left
315            .iter()
316            .flat_map(|chunk| chunk.rows())
317            .zip_eq_debug(matched.finish().iter())
318            .filter(|(_, matched)| !*matched)
319        {
320            shutdown_rx.check()?;
321            let row = left_row.chain(repeat_n(Datum::None, right_data_types.len()));
322            if let Some(chunk) = chunk_builder.append_one_row(row) {
323                yield chunk
324            }
325        }
326    }
327
328    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
329    async fn do_left_semi_anti_join<const ANTI_JOIN: bool>(
330        chunk_builder: &mut DataChunkBuilder,
331        left_data_types: Vec<DataType>,
332        join_expr: BoxedExpression,
333        left: Vec<DataChunk>,
334        right: BoxedExecutor,
335        shutdown_rx: ShutdownToken,
336    ) {
337        let mut matched = BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
338        #[for_await]
339        for right_chunk in right.execute() {
340            let right_chunk = right_chunk?;
341            for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
342                shutdown_rx.check()?;
343                if matched.is_set(left_row_idx) {
344                    continue;
345                }
346                let chunk = Self::concatenate_and_eval(
347                    join_expr.as_ref(),
348                    &left_data_types,
349                    left_row,
350                    &right_chunk,
351                )
352                .await?;
353                if chunk.cardinality() > 0 {
354                    matched.set(left_row_idx, true)
355                }
356            }
357        }
358        for (left_row, _) in left
359            .iter()
360            .flat_map(|chunk| chunk.rows())
361            .zip_eq_debug(matched.finish().iter())
362            .filter(|(_, matched)| if ANTI_JOIN { !*matched } else { *matched })
363        {
364            shutdown_rx.check()?;
365            if let Some(chunk) = chunk_builder.append_one_row(left_row) {
366                yield chunk
367            }
368        }
369    }
370
371    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
372    async fn do_right_outer_join(
373        chunk_builder: &mut DataChunkBuilder,
374        left_data_types: Vec<DataType>,
375        join_expr: BoxedExpression,
376        left: Vec<DataChunk>,
377        right: BoxedExecutor,
378        shutdown_rx: ShutdownToken,
379    ) {
380        #[for_await]
381        for right_chunk in right.execute() {
382            let right_chunk = right_chunk?;
383            // Use a bitmap to track which row of the current right chunk is matched.
384            let mut matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
385            for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
386                shutdown_rx.check()?;
387                let chunk = Self::concatenate_and_eval(
388                    join_expr.as_ref(),
389                    &left_data_types,
390                    left_row,
391                    &right_chunk,
392                )
393                .await?;
394                if chunk.cardinality() > 0 {
395                    // chunk.visibility() must be Some(_)
396                    matched = &matched | chunk.visibility();
397                    for spilled in chunk_builder.append_chunk(chunk) {
398                        yield spilled
399                    }
400                }
401            }
402            for (right_row, _) in right_chunk
403                .rows()
404                .zip_eq_debug(matched.iter())
405                .filter(|(_, matched)| !*matched)
406            {
407                shutdown_rx.check()?;
408                let row = repeat_n(Datum::None, left_data_types.len()).chain(right_row);
409                if let Some(chunk) = chunk_builder.append_one_row(row) {
410                    yield chunk
411                }
412            }
413        }
414    }
415
416    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
417    async fn do_right_semi_anti_join<const ANTI_JOIN: bool>(
418        chunk_builder: &mut DataChunkBuilder,
419        left_data_types: Vec<DataType>,
420        join_expr: BoxedExpression,
421        left: Vec<DataChunk>,
422        right: BoxedExecutor,
423        shutdown_rx: ShutdownToken,
424    ) {
425        #[for_await]
426        for right_chunk in right.execute() {
427            let mut right_chunk = right_chunk?;
428            let mut matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
429            for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
430                shutdown_rx.check()?;
431                let chunk = Self::concatenate_and_eval(
432                    join_expr.as_ref(),
433                    &left_data_types,
434                    left_row,
435                    &right_chunk,
436                )
437                .await?;
438                if chunk.cardinality() > 0 {
439                    // chunk.visibility() must be Some(_)
440                    matched = &matched | chunk.visibility();
441                }
442            }
443            if ANTI_JOIN {
444                matched = !&matched;
445            }
446            right_chunk.set_visibility(matched);
447            if right_chunk.cardinality() > 0 {
448                for spilled in chunk_builder.append_chunk(right_chunk) {
449                    yield spilled
450                }
451            }
452        }
453    }
454
455    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
456    async fn do_full_outer_join(
457        chunk_builder: &mut DataChunkBuilder,
458        left_data_types: Vec<DataType>,
459        join_expr: BoxedExpression,
460        left: Vec<DataChunk>,
461        right: BoxedExecutor,
462        shutdown_rx: ShutdownToken,
463    ) {
464        let mut left_matched =
465            BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
466        let right_data_types = right.schema().data_types();
467        #[for_await]
468        for right_chunk in right.execute() {
469            let right_chunk = right_chunk?;
470            let mut right_matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
471            for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
472                shutdown_rx.check()?;
473                let chunk = Self::concatenate_and_eval(
474                    join_expr.as_ref(),
475                    &left_data_types,
476                    left_row,
477                    &right_chunk,
478                )
479                .await?;
480                if chunk.cardinality() > 0 {
481                    left_matched.set(left_row_idx, true);
482                    right_matched = &right_matched | chunk.visibility();
483                    for spilled in chunk_builder.append_chunk(chunk) {
484                        yield spilled
485                    }
486                }
487            }
488            // Yield unmatched rows in the right table
489            for (right_row, _) in right_chunk
490                .rows()
491                .zip_eq_debug(right_matched.iter())
492                .filter(|(_, matched)| !*matched)
493            {
494                shutdown_rx.check()?;
495                let row = repeat_n(Datum::None, left_data_types.len()).chain(right_row);
496                if let Some(chunk) = chunk_builder.append_one_row(row) {
497                    yield chunk
498                }
499            }
500        }
501        // Yield unmatched rows in the left table.
502        for (left_row, _) in left
503            .iter()
504            .flat_map(|chunk| chunk.rows())
505            .zip_eq_debug(left_matched.finish().iter())
506            .filter(|(_, matched)| !*matched)
507        {
508            shutdown_rx.check()?;
509            let row = left_row.chain(repeat_n(Datum::None, right_data_types.len()));
510            if let Some(chunk) = chunk_builder.append_one_row(row) {
511                yield chunk
512            }
513        }
514    }
515}
516#[cfg(test)]
517mod tests {
518    use futures_async_stream::for_await;
519    use risingwave_common::array::*;
520    use risingwave_common::catalog::{Field, Schema};
521    use risingwave_common::memory::MemoryContext;
522    use risingwave_common::types::DataType;
523    use risingwave_expr::expr::build_from_pretty;
524
525    use crate::executor::BoxedExecutor;
526    use crate::executor::join::JoinType;
527    use crate::executor::join::nested_loop_join::NestedLoopJoinExecutor;
528    use crate::executor::test_utils::{MockExecutor, diff_executor_output};
529    use crate::task::ShutdownToken;
530
531    const CHUNK_SIZE: usize = 1024;
532
533    struct TestFixture {
534        join_type: JoinType,
535    }
536
537    /// Sql for creating test data:
538    /// ```sql
539    /// drop table t1 if exists;
540    /// create table t1(v1 int, v2 float);
541    /// insert into t1 values
542    /// (1, 6.1::FLOAT), (2, 8.4::FLOAT), (3, 3.9::FLOAT), (3, 6.6::FLOAT), (4, 0.7::FLOAT),
543    /// (6, 5.5::FLOAT), (6, 5.6::FLOAT), (8, 7.0::FLOAT);
544    ///
545    /// drop table t2 if exists;
546    /// create table t2(v1 int, v2 real);
547    /// insert into t2 values
548    /// (2, 6.1::REAL), (3, 8.9::REAL), (6, 3.4::REAL), (8, 3.5::REAL), (9, 7.5::REAL),
549    /// (10, null), (11, 8::REAL), (12, null), (20, 5.7::REAL), (30, 9.6::REAL),
550    /// (100, null), (200, 8.18::REAL);
551    /// ```
552    impl TestFixture {
553        fn with_join_type(join_type: JoinType) -> Self {
554            Self { join_type }
555        }
556
557        fn create_left_executor(&self) -> BoxedExecutor {
558            let schema = Schema {
559                fields: vec![
560                    Field::unnamed(DataType::Int32),
561                    Field::unnamed(DataType::Float32),
562                ],
563            };
564            let mut executor = MockExecutor::new(schema);
565
566            executor.add(DataChunk::from_pretty(
567                "i f
568                 1 6.1
569                 2 8.4
570                 3 3.9",
571            ));
572
573            executor.add(DataChunk::from_pretty(
574                "i f
575                 3 6.6
576                 4 0.7
577                 6 5.5
578                 6 5.6
579                 8 7.0",
580            ));
581
582            Box::new(executor)
583        }
584
585        fn create_right_executor(&self) -> BoxedExecutor {
586            let schema = Schema {
587                fields: vec![
588                    Field::unnamed(DataType::Int32),
589                    Field::unnamed(DataType::Float64),
590                ],
591            };
592            let mut executor = MockExecutor::new(schema);
593
594            executor.add(DataChunk::from_pretty(
595                "i F
596                 2 6.1
597                 3 8.9
598                 6 3.4
599                 8 3.5",
600            ));
601
602            executor.add(DataChunk::from_pretty(
603                " i F
604                  9 7.5
605                 10 .
606                 11 8
607                 12 .",
608            ));
609
610            executor.add(DataChunk::from_pretty(
611                "  i F
612                  20 5.7
613                  30 9.6
614                 100 .
615                 200 8.18",
616            ));
617
618            Box::new(executor)
619        }
620
621        fn create_join_executor(&self, shutdown_rx: ShutdownToken) -> BoxedExecutor {
622            let join_type = self.join_type;
623
624            let left_child = self.create_left_executor();
625            let right_child = self.create_right_executor();
626
627            let output_indices = match self.join_type {
628                JoinType::LeftSemi | JoinType::LeftAnti => vec![0, 1],
629                JoinType::RightSemi | JoinType::RightAnti => vec![0, 1],
630                _ => vec![0, 1, 2, 3],
631            };
632
633            Box::new(NestedLoopJoinExecutor::new(
634                build_from_pretty("(equal:boolean $0:int4 $2:int4)"),
635                join_type,
636                output_indices,
637                left_child,
638                right_child,
639                "NestedLoopJoinExecutor".into(),
640                CHUNK_SIZE,
641                MemoryContext::none(),
642                shutdown_rx,
643            ))
644        }
645
646        async fn do_test(&self, expected: DataChunk) {
647            let join_executor = self.create_join_executor(ShutdownToken::empty());
648            let mut expected_mock_exec = MockExecutor::new(join_executor.schema().clone());
649            expected_mock_exec.add(expected);
650            diff_executor_output(join_executor, Box::new(expected_mock_exec)).await;
651        }
652
653        async fn do_test_shutdown(&self) {
654            let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
655            let join_executor = self.create_join_executor(shutdown_rx);
656            shutdown_tx.cancel();
657            #[for_await]
658            for chunk in join_executor.execute() {
659                assert!(chunk.is_err());
660                break;
661            }
662
663            let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
664            let join_executor = self.create_join_executor(shutdown_rx);
665            shutdown_tx.abort("test");
666            #[for_await]
667            for chunk in join_executor.execute() {
668                assert!(chunk.is_err());
669                break;
670            }
671        }
672    }
673
674    /// sql: select * from t1, t2 where t1.v1 = t2.v1
675    #[tokio::test]
676    async fn test_inner_join() {
677        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
678
679        let expected_chunk = DataChunk::from_pretty(
680            "i f   i F
681             2 8.4 2 6.1
682             3 3.9 3 8.9
683             3 6.6 3 8.9
684             6 5.5 6 3.4
685             6 5.6 6 3.4
686             8 7.0 8 3.5",
687        );
688
689        test_fixture.do_test(expected_chunk).await;
690    }
691
692    /// sql: select * from t1 left outer join t2 on t1.v1 = t2.v1
693    #[tokio::test]
694    async fn test_left_outer_join() {
695        let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
696
697        let expected_chunk = DataChunk::from_pretty(
698            "i f   i F
699             2 8.4 2 6.1
700             3 3.9 3 8.9
701             3 6.6 3 8.9
702             6 5.5 6 3.4
703             6 5.6 6 3.4
704             8 7.0 8 3.5
705             1 6.1 . .
706             4 0.7 . .",
707        );
708
709        test_fixture.do_test(expected_chunk).await;
710    }
711
712    #[tokio::test]
713    async fn test_left_semi_join() {
714        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
715
716        let expected_chunk = DataChunk::from_pretty(
717            "i f
718             2 8.4
719             3 3.9
720             3 6.6
721             6 5.5
722             6 5.6
723             8 7.0",
724        );
725
726        test_fixture.do_test(expected_chunk).await;
727    }
728
729    #[tokio::test]
730    async fn test_left_anti_join() {
731        let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
732
733        let expected_chunk = DataChunk::from_pretty(
734            "i f
735             1 6.1
736             4 0.7",
737        );
738
739        test_fixture.do_test(expected_chunk).await;
740    }
741
742    #[tokio::test]
743    async fn test_right_outer_join() {
744        let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
745
746        let expected_chunk = DataChunk::from_pretty(
747            "i f   i F
748             2 8.4 2 6.1
749             3 3.9 3 8.9
750             3 6.6 3 8.9
751             6 5.5 6 3.4
752             6 5.6 6 3.4
753             8 7.0 8 3.5
754             . .   9 7.5
755             . .   10 .
756             . .   11 8
757             . .   12 .
758             . .   20 5.7
759             . .   30 9.6
760             . .   100 .
761             . .   200 8.18",
762        );
763
764        test_fixture.do_test(expected_chunk).await;
765    }
766
767    #[tokio::test]
768    async fn test_right_semi_join() {
769        let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
770
771        let expected_chunk = DataChunk::from_pretty(
772            "i F
773             2 6.1
774             3 8.9
775             6 3.4
776             8 3.5",
777        );
778
779        test_fixture.do_test(expected_chunk).await;
780    }
781
782    #[tokio::test]
783    async fn test_right_anti_join() {
784        let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
785
786        let expected_chunk = DataChunk::from_pretty(
787            "  i F
788               9 7.5
789              10 .
790              11 8
791              12 .
792              20 5.7
793              30 9.6
794             100 .
795             200 8.18",
796        );
797
798        test_fixture.do_test(expected_chunk).await;
799    }
800
801    #[tokio::test]
802    async fn test_full_outer_join() {
803        let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
804
805        let expected_chunk = DataChunk::from_pretty(
806            "i f   i F
807             2 8.4 2 6.1
808             3 3.9 3 8.9
809             3 6.6 3 8.9
810             6 5.5 6 3.4
811             6 5.6 6 3.4
812             8 7.0 8 3.5
813             . .   9 7.5
814             . .   10 .
815             . .   11 8
816             . .   12 .
817             . .   20 5.7
818             . .   30 9.6
819             . .   100 .
820             . .   200 8.18
821             1 6.1 . .
822             4 0.7 . .",
823        );
824
825        test_fixture.do_test(expected_chunk).await;
826    }
827
828    #[tokio::test]
829    async fn test_shutdown_rx() {
830        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
831        test_fixture.do_test_shutdown().await;
832        let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
833        test_fixture.do_test_shutdown().await;
834        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
835        test_fixture.do_test_shutdown().await;
836        let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
837        test_fixture.do_test_shutdown().await;
838        let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
839        test_fixture.do_test_shutdown().await;
840        let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
841        test_fixture.do_test_shutdown().await;
842        let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
843        test_fixture.do_test_shutdown().await;
844    }
845}