Skip to main content

risingwave_batch_executors/executor/join/
nested_loop_join.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use risingwave_common::array::data_chunk_iter::RowRef;
17use risingwave_common::array::{Array, DataChunk};
18use risingwave_common::bitmap::BitmapBuilder;
19use risingwave_common::catalog::Schema;
20use risingwave_common::memory::MemoryContext;
21use risingwave_common::row::{RowExt, repeat_n};
22use risingwave_common::types::{DataType, Datum};
23use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use risingwave_common_estimate_size::EstimateSize;
26use risingwave_expr::expr::{BoxedExpression, build_from_prost as expr_build_from_prost};
27use risingwave_pb::batch_plan::plan_node::NodeBody;
28
29use crate::error::{BatchError, Result};
30use crate::executor::join::{JoinType, concatenate, convert_row_to_chunk};
31use crate::executor::{
32    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
33};
34use crate::task::ShutdownToken;
35
36/// Nested loop join executor.
37///
38///
39/// High Level Idea:
40/// 1. Iterate tuple from left child.
41/// 2. Concatenated with right chunk, eval expression and get visibility bitmap
42/// 3. Create new chunk with visibility bitmap and yield to upper.
43pub struct NestedLoopJoinExecutor {
44    /// Expression to eval join condition
45    join_expr: BoxedExpression,
46    /// Executor should handle different join type.
47    join_type: JoinType,
48    /// Original output schema
49    original_schema: Schema,
50    /// Actual output schema
51    schema: Schema,
52    /// We may only need certain columns.
53    /// `output_indices` are the indices of the columns that we needed.
54    output_indices: Vec<usize>,
55    /// Left child executor
56    left_child: BoxedExecutor,
57    /// Right child executor
58    right_child: BoxedExecutor,
59    /// Identity string of the executor
60    identity: String,
61    /// The maximum size of the chunk produced by executor at a time.
62    chunk_size: usize,
63
64    /// Memory context used for recording memory usage of executor.
65    mem_context: MemoryContext,
66
67    shutdown_rx: ShutdownToken,
68}
69
70impl Executor for NestedLoopJoinExecutor {
71    fn schema(&self) -> &Schema {
72        &self.schema
73    }
74
75    fn identity(&self) -> &str {
76        &self.identity
77    }
78
79    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
80        self.do_execute()
81    }
82}
83
84impl NestedLoopJoinExecutor {
85    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
86    async fn do_execute(self: Box<Self>) {
87        let left_data_types = self.left_child.schema().data_types();
88        let data_types = self.original_schema.data_types();
89
90        let mut chunk_builder = DataChunkBuilder::new(data_types, self.chunk_size);
91
92        // Cache the outputs of left child
93        let left: Vec<DataChunk> = {
94            let mut ret = Vec::with_capacity(1024);
95            #[for_await]
96            for chunk in self.left_child.execute() {
97                let c = chunk?;
98                trace!("Estimated chunk size is {:?}", c.estimated_heap_size());
99                if !self.mem_context.add(c.estimated_heap_size() as i64) {
100                    Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
101                }
102                ret.push(c);
103            }
104            ret
105        };
106
107        // Get the joined stream
108        let stream = match self.join_type {
109            JoinType::Inner => Self::do_inner_join,
110            JoinType::LeftOuter => Self::do_left_outer_join,
111            JoinType::LeftSemi => Self::do_left_semi_anti_join::<false>,
112            JoinType::LeftAnti => Self::do_left_semi_anti_join::<true>,
113            JoinType::RightOuter => Self::do_right_outer_join,
114            JoinType::RightSemi => Self::do_right_semi_anti_join::<false>,
115            JoinType::RightAnti => Self::do_right_semi_anti_join::<true>,
116            JoinType::FullOuter => Self::do_full_outer_join,
117            JoinType::AsOfInner | JoinType::AsOfLeftOuter => {
118                unimplemented!("AsOf join is not supported in NestedLoopJoinExecutor")
119            }
120        };
121
122        #[for_await]
123        for chunk in stream(
124            &mut chunk_builder,
125            left_data_types,
126            self.join_expr,
127            left,
128            self.right_child,
129            self.shutdown_rx.clone(),
130        ) {
131            yield chunk?.project(&self.output_indices)
132        }
133
134        // Handle remaining chunk
135        if let Some(chunk) = chunk_builder.consume_all() {
136            yield chunk.project(&self.output_indices)
137        }
138    }
139}
140
141impl NestedLoopJoinExecutor {
142    /// Create a chunk by concatenating a row with a chunk and set its visibility according to the
143    /// evaluation result of the expression.
144    async fn concatenate_and_eval(
145        expr: &BoxedExpression,
146        left_row_types: &[DataType],
147        left_row: RowRef<'_>,
148        right_chunk: &DataChunk,
149    ) -> Result<DataChunk> {
150        let left_chunk = convert_row_to_chunk(&left_row, right_chunk.capacity(), left_row_types)?;
151        let mut chunk = concatenate(&left_chunk, right_chunk)?;
152        chunk.set_visibility(expr.eval(&chunk).await?.as_bool().iter().collect());
153        Ok(chunk)
154    }
155}
156
157impl BoxedExecutorBuilder for NestedLoopJoinExecutor {
158    async fn new_boxed_executor(
159        source: &ExecutorBuilder<'_>,
160        inputs: Vec<BoxedExecutor>,
161    ) -> Result<BoxedExecutor> {
162        let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap();
163
164        let nested_loop_join_node = try_match_expand!(
165            source.plan_node().get_node_body().unwrap(),
166            NodeBody::NestedLoopJoin
167        )?;
168
169        let join_type = JoinType::from_prost(nested_loop_join_node.get_join_type()?);
170        let join_expr = expr_build_from_prost(nested_loop_join_node.get_join_cond()?)?;
171
172        let output_indices = nested_loop_join_node
173            .output_indices
174            .iter()
175            .map(|&v| v as usize)
176            .collect();
177
178        let identity = source.plan_node().get_identity().clone();
179        let mem_context = source.context().create_executor_mem_context(&identity);
180
181        Ok(Box::new(NestedLoopJoinExecutor::new(
182            join_expr,
183            join_type,
184            output_indices,
185            left_child,
186            right_child,
187            identity,
188            source.context().get_config().developer.chunk_size,
189            mem_context,
190            source.shutdown_rx().clone(),
191        )))
192    }
193}
194
195impl NestedLoopJoinExecutor {
196    pub fn new(
197        join_expr: BoxedExpression,
198        join_type: JoinType,
199        output_indices: Vec<usize>,
200        left_child: BoxedExecutor,
201        right_child: BoxedExecutor,
202        identity: String,
203        chunk_size: usize,
204        mem_context: MemoryContext,
205        shutdown_rx: ShutdownToken,
206    ) -> Self {
207        // TODO(Bowen): Merge this with derive schema in Logical Join (#790).
208        let original_schema = match join_type {
209            JoinType::LeftSemi | JoinType::LeftAnti => left_child.schema().clone(),
210            JoinType::RightSemi | JoinType::RightAnti => right_child.schema().clone(),
211            _ => Schema::from_iter(
212                left_child
213                    .schema()
214                    .fields()
215                    .iter()
216                    .chain(right_child.schema().fields().iter())
217                    .cloned(),
218            ),
219        };
220        let schema = Schema::from_iter(
221            output_indices
222                .iter()
223                .map(|&idx| original_schema[idx].clone()),
224        );
225        Self {
226            join_expr,
227            join_type,
228            original_schema,
229            schema,
230            output_indices,
231            left_child,
232            right_child,
233            identity,
234            chunk_size,
235            mem_context,
236            shutdown_rx,
237        }
238    }
239}
240
241impl NestedLoopJoinExecutor {
242    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
243    async fn do_inner_join(
244        chunk_builder: &mut DataChunkBuilder,
245        left_data_types: Vec<DataType>,
246        join_expr: BoxedExpression,
247        left: Vec<DataChunk>,
248        right: BoxedExecutor,
249        shutdown_rx: ShutdownToken,
250    ) {
251        // 1. Iterate over the right table by chunks.
252        #[for_await]
253        for right_chunk in right.execute() {
254            let right_chunk = right_chunk?;
255            // 2. Iterator over the left table by rows.
256            for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
257                shutdown_rx.check()?;
258                // 3. Concatenate the left row and right chunk into a single chunk and evaluate the
259                // expression on it.
260                let chunk = Self::concatenate_and_eval(
261                    &join_expr,
262                    &left_data_types,
263                    left_row,
264                    &right_chunk,
265                )
266                .await?;
267                // 4. Yield the concatenated chunk.
268                if chunk.cardinality() > 0 {
269                    for spilled in chunk_builder.append_chunk(chunk) {
270                        yield spilled
271                    }
272                }
273            }
274        }
275    }
276
277    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
278    async fn do_left_outer_join(
279        chunk_builder: &mut DataChunkBuilder,
280        left_data_types: Vec<DataType>,
281        join_expr: BoxedExpression,
282        left: Vec<DataChunk>,
283        right: BoxedExecutor,
284        shutdown_rx: ShutdownToken,
285    ) {
286        let mut matched = BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
287        let right_data_types = right.schema().data_types();
288        // Same as inner join except that a bitmap is used to track which row of the left table is
289        // matched.
290        #[for_await]
291        for right_chunk in right.execute() {
292            let right_chunk = right_chunk?;
293            for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
294                shutdown_rx.check()?;
295                let chunk = Self::concatenate_and_eval(
296                    &join_expr,
297                    &left_data_types,
298                    left_row,
299                    &right_chunk,
300                )
301                .await?;
302                if chunk.cardinality() > 0 {
303                    matched.set(left_row_idx, true);
304                    for spilled in chunk_builder.append_chunk(chunk) {
305                        yield spilled
306                    }
307                }
308            }
309        }
310        // Yield unmatched rows in the left table.
311        for (left_row, _) in left
312            .iter()
313            .flat_map(|chunk| chunk.rows())
314            .zip_eq_debug(matched.finish().iter())
315            .filter(|(_, matched)| !*matched)
316        {
317            shutdown_rx.check()?;
318            let row = left_row.chain(repeat_n(Datum::None, right_data_types.len()));
319            if let Some(chunk) = chunk_builder.append_one_row(row) {
320                yield chunk
321            }
322        }
323    }
324
325    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
326    async fn do_left_semi_anti_join<const ANTI_JOIN: bool>(
327        chunk_builder: &mut DataChunkBuilder,
328        left_data_types: Vec<DataType>,
329        join_expr: BoxedExpression,
330        left: Vec<DataChunk>,
331        right: BoxedExecutor,
332        shutdown_rx: ShutdownToken,
333    ) {
334        let mut matched = BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
335        #[for_await]
336        for right_chunk in right.execute() {
337            let right_chunk = right_chunk?;
338            for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
339                shutdown_rx.check()?;
340                if matched.is_set(left_row_idx) {
341                    continue;
342                }
343                let chunk = Self::concatenate_and_eval(
344                    &join_expr,
345                    &left_data_types,
346                    left_row,
347                    &right_chunk,
348                )
349                .await?;
350                if chunk.cardinality() > 0 {
351                    matched.set(left_row_idx, true)
352                }
353            }
354        }
355        for (left_row, _) in left
356            .iter()
357            .flat_map(|chunk| chunk.rows())
358            .zip_eq_debug(matched.finish().iter())
359            .filter(|(_, matched)| if ANTI_JOIN { !*matched } else { *matched })
360        {
361            shutdown_rx.check()?;
362            if let Some(chunk) = chunk_builder.append_one_row(left_row) {
363                yield chunk
364            }
365        }
366    }
367
368    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
369    async fn do_right_outer_join(
370        chunk_builder: &mut DataChunkBuilder,
371        left_data_types: Vec<DataType>,
372        join_expr: BoxedExpression,
373        left: Vec<DataChunk>,
374        right: BoxedExecutor,
375        shutdown_rx: ShutdownToken,
376    ) {
377        #[for_await]
378        for right_chunk in right.execute() {
379            let right_chunk = right_chunk?;
380            // Use a bitmap to track which row of the current right chunk is matched.
381            let mut matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
382            for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
383                shutdown_rx.check()?;
384                let chunk = Self::concatenate_and_eval(
385                    &join_expr,
386                    &left_data_types,
387                    left_row,
388                    &right_chunk,
389                )
390                .await?;
391                if chunk.cardinality() > 0 {
392                    // chunk.visibility() must be Some(_)
393                    matched = &matched | chunk.visibility();
394                    for spilled in chunk_builder.append_chunk(chunk) {
395                        yield spilled
396                    }
397                }
398            }
399            for (right_row, _) in right_chunk
400                .rows()
401                .zip_eq_debug(matched.iter())
402                .filter(|(_, matched)| !*matched)
403            {
404                shutdown_rx.check()?;
405                let row = repeat_n(Datum::None, left_data_types.len()).chain(right_row);
406                if let Some(chunk) = chunk_builder.append_one_row(row) {
407                    yield chunk
408                }
409            }
410        }
411    }
412
413    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
414    async fn do_right_semi_anti_join<const ANTI_JOIN: bool>(
415        chunk_builder: &mut DataChunkBuilder,
416        left_data_types: Vec<DataType>,
417        join_expr: BoxedExpression,
418        left: Vec<DataChunk>,
419        right: BoxedExecutor,
420        shutdown_rx: ShutdownToken,
421    ) {
422        #[for_await]
423        for right_chunk in right.execute() {
424            let mut right_chunk = right_chunk?;
425            let mut matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
426            for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
427                shutdown_rx.check()?;
428                let chunk = Self::concatenate_and_eval(
429                    &join_expr,
430                    &left_data_types,
431                    left_row,
432                    &right_chunk,
433                )
434                .await?;
435                if chunk.cardinality() > 0 {
436                    // chunk.visibility() must be Some(_)
437                    matched = &matched | chunk.visibility();
438                }
439            }
440            if ANTI_JOIN {
441                matched = !&matched;
442            }
443            right_chunk.set_visibility(matched);
444            if right_chunk.cardinality() > 0 {
445                for spilled in chunk_builder.append_chunk(right_chunk) {
446                    yield spilled
447                }
448            }
449        }
450    }
451
452    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
453    async fn do_full_outer_join(
454        chunk_builder: &mut DataChunkBuilder,
455        left_data_types: Vec<DataType>,
456        join_expr: BoxedExpression,
457        left: Vec<DataChunk>,
458        right: BoxedExecutor,
459        shutdown_rx: ShutdownToken,
460    ) {
461        let mut left_matched =
462            BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
463        let right_data_types = right.schema().data_types();
464        #[for_await]
465        for right_chunk in right.execute() {
466            let right_chunk = right_chunk?;
467            let mut right_matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
468            for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
469                shutdown_rx.check()?;
470                let chunk = Self::concatenate_and_eval(
471                    &join_expr,
472                    &left_data_types,
473                    left_row,
474                    &right_chunk,
475                )
476                .await?;
477                if chunk.cardinality() > 0 {
478                    left_matched.set(left_row_idx, true);
479                    right_matched = &right_matched | chunk.visibility();
480                    for spilled in chunk_builder.append_chunk(chunk) {
481                        yield spilled
482                    }
483                }
484            }
485            // Yield unmatched rows in the right table
486            for (right_row, _) in right_chunk
487                .rows()
488                .zip_eq_debug(right_matched.iter())
489                .filter(|(_, matched)| !*matched)
490            {
491                shutdown_rx.check()?;
492                let row = repeat_n(Datum::None, left_data_types.len()).chain(right_row);
493                if let Some(chunk) = chunk_builder.append_one_row(row) {
494                    yield chunk
495                }
496            }
497        }
498        // Yield unmatched rows in the left table.
499        for (left_row, _) in left
500            .iter()
501            .flat_map(|chunk| chunk.rows())
502            .zip_eq_debug(left_matched.finish().iter())
503            .filter(|(_, matched)| !*matched)
504        {
505            shutdown_rx.check()?;
506            let row = left_row.chain(repeat_n(Datum::None, right_data_types.len()));
507            if let Some(chunk) = chunk_builder.append_one_row(row) {
508                yield chunk
509            }
510        }
511    }
512}
513#[cfg(test)]
514mod tests {
515    use futures_async_stream::for_await;
516    use risingwave_common::array::*;
517    use risingwave_common::catalog::{Field, Schema};
518    use risingwave_common::memory::MemoryContext;
519    use risingwave_common::types::DataType;
520    use risingwave_expr::expr::build_from_pretty;
521
522    use crate::executor::BoxedExecutor;
523    use crate::executor::join::JoinType;
524    use crate::executor::join::nested_loop_join::NestedLoopJoinExecutor;
525    use crate::executor::test_utils::{MockExecutor, diff_executor_output};
526    use crate::task::ShutdownToken;
527
528    const CHUNK_SIZE: usize = 1024;
529
530    struct TestFixture {
531        join_type: JoinType,
532    }
533
534    /// Sql for creating test data:
535    /// ```sql
536    /// drop table t1 if exists;
537    /// create table t1(v1 int, v2 float);
538    /// insert into t1 values
539    /// (1, 6.1::FLOAT), (2, 8.4::FLOAT), (3, 3.9::FLOAT), (3, 6.6::FLOAT), (4, 0.7::FLOAT),
540    /// (6, 5.5::FLOAT), (6, 5.6::FLOAT), (8, 7.0::FLOAT);
541    ///
542    /// drop table t2 if exists;
543    /// create table t2(v1 int, v2 real);
544    /// insert into t2 values
545    /// (2, 6.1::REAL), (3, 8.9::REAL), (6, 3.4::REAL), (8, 3.5::REAL), (9, 7.5::REAL),
546    /// (10, null), (11, 8::REAL), (12, null), (20, 5.7::REAL), (30, 9.6::REAL),
547    /// (100, null), (200, 8.18::REAL);
548    /// ```
549    impl TestFixture {
550        fn with_join_type(join_type: JoinType) -> Self {
551            Self { join_type }
552        }
553
554        fn create_left_executor(&self) -> BoxedExecutor {
555            let schema = Schema {
556                fields: vec![
557                    Field::unnamed(DataType::Int32),
558                    Field::unnamed(DataType::Float32),
559                ],
560            };
561            let mut executor = MockExecutor::new(schema);
562
563            executor.add(DataChunk::from_pretty(
564                "i f
565                 1 6.1
566                 2 8.4
567                 3 3.9",
568            ));
569
570            executor.add(DataChunk::from_pretty(
571                "i f
572                 3 6.6
573                 4 0.7
574                 6 5.5
575                 6 5.6
576                 8 7.0",
577            ));
578
579            Box::new(executor)
580        }
581
582        fn create_right_executor(&self) -> BoxedExecutor {
583            let schema = Schema {
584                fields: vec![
585                    Field::unnamed(DataType::Int32),
586                    Field::unnamed(DataType::Float64),
587                ],
588            };
589            let mut executor = MockExecutor::new(schema);
590
591            executor.add(DataChunk::from_pretty(
592                "i F
593                 2 6.1
594                 3 8.9
595                 6 3.4
596                 8 3.5",
597            ));
598
599            executor.add(DataChunk::from_pretty(
600                " i F
601                  9 7.5
602                 10 .
603                 11 8
604                 12 .",
605            ));
606
607            executor.add(DataChunk::from_pretty(
608                "  i F
609                  20 5.7
610                  30 9.6
611                 100 .
612                 200 8.18",
613            ));
614
615            Box::new(executor)
616        }
617
618        fn create_join_executor(&self, shutdown_rx: ShutdownToken) -> BoxedExecutor {
619            let join_type = self.join_type;
620
621            let left_child = self.create_left_executor();
622            let right_child = self.create_right_executor();
623
624            let output_indices = match self.join_type {
625                JoinType::LeftSemi | JoinType::LeftAnti => vec![0, 1],
626                JoinType::RightSemi | JoinType::RightAnti => vec![0, 1],
627                _ => vec![0, 1, 2, 3],
628            };
629
630            Box::new(NestedLoopJoinExecutor::new(
631                build_from_pretty("(equal:boolean $0:int4 $2:int4)"),
632                join_type,
633                output_indices,
634                left_child,
635                right_child,
636                "NestedLoopJoinExecutor".into(),
637                CHUNK_SIZE,
638                MemoryContext::none(),
639                shutdown_rx,
640            ))
641        }
642
643        async fn do_test(&self, expected: DataChunk) {
644            let join_executor = self.create_join_executor(ShutdownToken::empty());
645            let mut expected_mock_exec = MockExecutor::new(join_executor.schema().clone());
646            expected_mock_exec.add(expected);
647            diff_executor_output(join_executor, Box::new(expected_mock_exec)).await;
648        }
649
650        async fn do_test_shutdown(&self) {
651            let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
652            let join_executor = self.create_join_executor(shutdown_rx);
653            shutdown_tx.cancel();
654            #[for_await]
655            for chunk in join_executor.execute() {
656                assert!(chunk.is_err());
657                break;
658            }
659
660            let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
661            let join_executor = self.create_join_executor(shutdown_rx);
662            shutdown_tx.abort("test");
663            #[for_await]
664            for chunk in join_executor.execute() {
665                assert!(chunk.is_err());
666                break;
667            }
668        }
669    }
670
671    /// sql: select * from t1, t2 where t1.v1 = t2.v1
672    #[tokio::test]
673    async fn test_inner_join() {
674        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
675
676        let expected_chunk = DataChunk::from_pretty(
677            "i f   i F
678             2 8.4 2 6.1
679             3 3.9 3 8.9
680             3 6.6 3 8.9
681             6 5.5 6 3.4
682             6 5.6 6 3.4
683             8 7.0 8 3.5",
684        );
685
686        test_fixture.do_test(expected_chunk).await;
687    }
688
689    /// sql: select * from t1 left outer join t2 on t1.v1 = t2.v1
690    #[tokio::test]
691    async fn test_left_outer_join() {
692        let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
693
694        let expected_chunk = DataChunk::from_pretty(
695            "i f   i F
696             2 8.4 2 6.1
697             3 3.9 3 8.9
698             3 6.6 3 8.9
699             6 5.5 6 3.4
700             6 5.6 6 3.4
701             8 7.0 8 3.5
702             1 6.1 . .
703             4 0.7 . .",
704        );
705
706        test_fixture.do_test(expected_chunk).await;
707    }
708
709    #[tokio::test]
710    async fn test_left_semi_join() {
711        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
712
713        let expected_chunk = DataChunk::from_pretty(
714            "i f
715             2 8.4
716             3 3.9
717             3 6.6
718             6 5.5
719             6 5.6
720             8 7.0",
721        );
722
723        test_fixture.do_test(expected_chunk).await;
724    }
725
726    #[tokio::test]
727    async fn test_left_anti_join() {
728        let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
729
730        let expected_chunk = DataChunk::from_pretty(
731            "i f
732             1 6.1
733             4 0.7",
734        );
735
736        test_fixture.do_test(expected_chunk).await;
737    }
738
739    #[tokio::test]
740    async fn test_right_outer_join() {
741        let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
742
743        let expected_chunk = DataChunk::from_pretty(
744            "i f   i F
745             2 8.4 2 6.1
746             3 3.9 3 8.9
747             3 6.6 3 8.9
748             6 5.5 6 3.4
749             6 5.6 6 3.4
750             8 7.0 8 3.5
751             . .   9 7.5
752             . .   10 .
753             . .   11 8
754             . .   12 .
755             . .   20 5.7
756             . .   30 9.6
757             . .   100 .
758             . .   200 8.18",
759        );
760
761        test_fixture.do_test(expected_chunk).await;
762    }
763
764    #[tokio::test]
765    async fn test_right_semi_join() {
766        let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
767
768        let expected_chunk = DataChunk::from_pretty(
769            "i F
770             2 6.1
771             3 8.9
772             6 3.4
773             8 3.5",
774        );
775
776        test_fixture.do_test(expected_chunk).await;
777    }
778
779    #[tokio::test]
780    async fn test_right_anti_join() {
781        let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
782
783        let expected_chunk = DataChunk::from_pretty(
784            "  i F
785               9 7.5
786              10 .
787              11 8
788              12 .
789              20 5.7
790              30 9.6
791             100 .
792             200 8.18",
793        );
794
795        test_fixture.do_test(expected_chunk).await;
796    }
797
798    #[tokio::test]
799    async fn test_full_outer_join() {
800        let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
801
802        let expected_chunk = DataChunk::from_pretty(
803            "i f   i F
804             2 8.4 2 6.1
805             3 3.9 3 8.9
806             3 6.6 3 8.9
807             6 5.5 6 3.4
808             6 5.6 6 3.4
809             8 7.0 8 3.5
810             . .   9 7.5
811             . .   10 .
812             . .   11 8
813             . .   12 .
814             . .   20 5.7
815             . .   30 9.6
816             . .   100 .
817             . .   200 8.18
818             1 6.1 . .
819             4 0.7 . .",
820        );
821
822        test_fixture.do_test(expected_chunk).await;
823    }
824
825    #[tokio::test]
826    async fn test_shutdown_rx() {
827        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
828        test_fixture.do_test_shutdown().await;
829        let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
830        test_fixture.do_test_shutdown().await;
831        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
832        test_fixture.do_test_shutdown().await;
833        let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
834        test_fixture.do_test_shutdown().await;
835        let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
836        test_fixture.do_test_shutdown().await;
837        let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
838        test_fixture.do_test_shutdown().await;
839        let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
840        test_fixture.do_test_shutdown().await;
841    }
842}