risingwave_batch_executors/executor/join/
nested_loop_join.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use risingwave_common::array::data_chunk_iter::RowRef;
17use risingwave_common::array::{Array, DataChunk};
18use risingwave_common::bitmap::BitmapBuilder;
19use risingwave_common::catalog::Schema;
20use risingwave_common::memory::MemoryContext;
21use risingwave_common::row::{RowExt, repeat_n};
22use risingwave_common::types::{DataType, Datum};
23use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use risingwave_common_estimate_size::EstimateSize;
26use risingwave_expr::expr::{
27    BoxedExpression, Expression, build_from_prost as expr_build_from_prost,
28};
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30
31use crate::error::{BatchError, Result};
32use crate::executor::join::{JoinType, concatenate, convert_row_to_chunk};
33use crate::executor::{
34    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
35};
36use crate::task::ShutdownToken;
37
38/// Nested loop join executor.
39///
40///
41/// High Level Idea:
42/// 1. Iterate tuple from left child.
43/// 2. Concatenated with right chunk, eval expression and get visibility bitmap
44/// 3. Create new chunk with visibility bitmap and yield to upper.
45pub struct NestedLoopJoinExecutor {
46    /// Expression to eval join condition
47    join_expr: BoxedExpression,
48    /// Executor should handle different join type.
49    join_type: JoinType,
50    /// Original output schema
51    original_schema: Schema,
52    /// Actual output schema
53    schema: Schema,
54    /// We may only need certain columns.
55    /// `output_indices` are the indices of the columns that we needed.
56    output_indices: Vec<usize>,
57    /// Left child executor
58    left_child: BoxedExecutor,
59    /// Right child executor
60    right_child: BoxedExecutor,
61    /// Identity string of the executor
62    identity: String,
63    /// The maximum size of the chunk produced by executor at a time.
64    chunk_size: usize,
65
66    /// Memory context used for recording memory usage of executor.
67    mem_context: MemoryContext,
68
69    shutdown_rx: ShutdownToken,
70}
71
72impl Executor for NestedLoopJoinExecutor {
73    fn schema(&self) -> &Schema {
74        &self.schema
75    }
76
77    fn identity(&self) -> &str {
78        &self.identity
79    }
80
81    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
82        self.do_execute()
83    }
84}
85
86impl NestedLoopJoinExecutor {
87    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
88    async fn do_execute(self: Box<Self>) {
89        let left_data_types = self.left_child.schema().data_types();
90        let data_types = self.original_schema.data_types();
91
92        let mut chunk_builder = DataChunkBuilder::new(data_types, self.chunk_size);
93
94        // Cache the outputs of left child
95        let left: Vec<DataChunk> = {
96            let mut ret = Vec::with_capacity(1024);
97            #[for_await]
98            for chunk in self.left_child.execute() {
99                let c = chunk?;
100                trace!("Estimated chunk size is {:?}", c.estimated_heap_size());
101                if !self.mem_context.add(c.estimated_heap_size() as i64) {
102                    Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
103                }
104                ret.push(c);
105            }
106            ret
107        };
108
109        // Get the joined stream
110        let stream = match self.join_type {
111            JoinType::Inner => Self::do_inner_join,
112            JoinType::LeftOuter => Self::do_left_outer_join,
113            JoinType::LeftSemi => Self::do_left_semi_anti_join::<false>,
114            JoinType::LeftAnti => Self::do_left_semi_anti_join::<true>,
115            JoinType::RightOuter => Self::do_right_outer_join,
116            JoinType::RightSemi => Self::do_right_semi_anti_join::<false>,
117            JoinType::RightAnti => Self::do_right_semi_anti_join::<true>,
118            JoinType::FullOuter => Self::do_full_outer_join,
119            JoinType::AsOfInner | JoinType::AsOfLeftOuter => {
120                unimplemented!("AsOf join is not supported in NestedLoopJoinExecutor")
121            }
122        };
123
124        #[for_await]
125        for chunk in stream(
126            &mut chunk_builder,
127            left_data_types,
128            self.join_expr,
129            left,
130            self.right_child,
131            self.shutdown_rx.clone(),
132        ) {
133            yield chunk?.project(&self.output_indices)
134        }
135
136        // Handle remaining chunk
137        if let Some(chunk) = chunk_builder.consume_all() {
138            yield chunk.project(&self.output_indices)
139        }
140    }
141}
142
143impl NestedLoopJoinExecutor {
144    /// Create a chunk by concatenating a row with a chunk and set its visibility according to the
145    /// evaluation result of the expression.
146    async fn concatenate_and_eval(
147        expr: &dyn Expression,
148        left_row_types: &[DataType],
149        left_row: RowRef<'_>,
150        right_chunk: &DataChunk,
151    ) -> Result<DataChunk> {
152        let left_chunk = convert_row_to_chunk(&left_row, right_chunk.capacity(), left_row_types)?;
153        let mut chunk = concatenate(&left_chunk, right_chunk)?;
154        chunk.set_visibility(expr.eval(&chunk).await?.as_bool().iter().collect());
155        Ok(chunk)
156    }
157}
158
159impl BoxedExecutorBuilder for NestedLoopJoinExecutor {
160    async fn new_boxed_executor(
161        source: &ExecutorBuilder<'_>,
162        inputs: Vec<BoxedExecutor>,
163    ) -> Result<BoxedExecutor> {
164        let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap();
165
166        let nested_loop_join_node = try_match_expand!(
167            source.plan_node().get_node_body().unwrap(),
168            NodeBody::NestedLoopJoin
169        )?;
170
171        let join_type = JoinType::from_prost(nested_loop_join_node.get_join_type()?);
172        let join_expr = expr_build_from_prost(nested_loop_join_node.get_join_cond()?)?;
173
174        let output_indices = nested_loop_join_node
175            .output_indices
176            .iter()
177            .map(|&v| v as usize)
178            .collect();
179
180        let identity = source.plan_node().get_identity().clone();
181        let mem_context = source.context().create_executor_mem_context(&identity);
182
183        Ok(Box::new(NestedLoopJoinExecutor::new(
184            join_expr,
185            join_type,
186            output_indices,
187            left_child,
188            right_child,
189            identity,
190            source.context().get_config().developer.chunk_size,
191            mem_context,
192            source.shutdown_rx().clone(),
193        )))
194    }
195}
196
197impl NestedLoopJoinExecutor {
198    pub fn new(
199        join_expr: BoxedExpression,
200        join_type: JoinType,
201        output_indices: Vec<usize>,
202        left_child: BoxedExecutor,
203        right_child: BoxedExecutor,
204        identity: String,
205        chunk_size: usize,
206        mem_context: MemoryContext,
207        shutdown_rx: ShutdownToken,
208    ) -> Self {
209        // TODO(Bowen): Merge this with derive schema in Logical Join (#790).
210        let original_schema = match join_type {
211            JoinType::LeftSemi | JoinType::LeftAnti => left_child.schema().clone(),
212            JoinType::RightSemi | JoinType::RightAnti => right_child.schema().clone(),
213            _ => Schema::from_iter(
214                left_child
215                    .schema()
216                    .fields()
217                    .iter()
218                    .chain(right_child.schema().fields().iter())
219                    .cloned(),
220            ),
221        };
222        let schema = Schema::from_iter(
223            output_indices
224                .iter()
225                .map(|&idx| original_schema[idx].clone()),
226        );
227        Self {
228            join_expr,
229            join_type,
230            original_schema,
231            schema,
232            output_indices,
233            left_child,
234            right_child,
235            identity,
236            chunk_size,
237            mem_context,
238            shutdown_rx,
239        }
240    }
241}
242
243impl NestedLoopJoinExecutor {
244    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
245    async fn do_inner_join(
246        chunk_builder: &mut DataChunkBuilder,
247        left_data_types: Vec<DataType>,
248        join_expr: BoxedExpression,
249        left: Vec<DataChunk>,
250        right: BoxedExecutor,
251        shutdown_rx: ShutdownToken,
252    ) {
253        // 1. Iterate over the right table by chunks.
254        #[for_await]
255        for right_chunk in right.execute() {
256            let right_chunk = right_chunk?;
257            // 2. Iterator over the left table by rows.
258            for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
259                shutdown_rx.check()?;
260                // 3. Concatenate the left row and right chunk into a single chunk and evaluate the
261                // expression on it.
262                let chunk = Self::concatenate_and_eval(
263                    join_expr.as_ref(),
264                    &left_data_types,
265                    left_row,
266                    &right_chunk,
267                )
268                .await?;
269                // 4. Yield the concatenated chunk.
270                if chunk.cardinality() > 0 {
271                    for spilled in chunk_builder.append_chunk(chunk) {
272                        yield spilled
273                    }
274                }
275            }
276        }
277    }
278
279    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
280    async fn do_left_outer_join(
281        chunk_builder: &mut DataChunkBuilder,
282        left_data_types: Vec<DataType>,
283        join_expr: BoxedExpression,
284        left: Vec<DataChunk>,
285        right: BoxedExecutor,
286        shutdown_rx: ShutdownToken,
287    ) {
288        let mut matched = BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
289        let right_data_types = right.schema().data_types();
290        // Same as inner join except that a bitmap is used to track which row of the left table is
291        // matched.
292        #[for_await]
293        for right_chunk in right.execute() {
294            let right_chunk = right_chunk?;
295            for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
296                shutdown_rx.check()?;
297                let chunk = Self::concatenate_and_eval(
298                    join_expr.as_ref(),
299                    &left_data_types,
300                    left_row,
301                    &right_chunk,
302                )
303                .await?;
304                if chunk.cardinality() > 0 {
305                    matched.set(left_row_idx, true);
306                    for spilled in chunk_builder.append_chunk(chunk) {
307                        yield spilled
308                    }
309                }
310            }
311        }
312        // Yield unmatched rows in the left table.
313        for (left_row, _) in left
314            .iter()
315            .flat_map(|chunk| chunk.rows())
316            .zip_eq_debug(matched.finish().iter())
317            .filter(|(_, matched)| !*matched)
318        {
319            shutdown_rx.check()?;
320            let row = left_row.chain(repeat_n(Datum::None, right_data_types.len()));
321            if let Some(chunk) = chunk_builder.append_one_row(row) {
322                yield chunk
323            }
324        }
325    }
326
327    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
328    async fn do_left_semi_anti_join<const ANTI_JOIN: bool>(
329        chunk_builder: &mut DataChunkBuilder,
330        left_data_types: Vec<DataType>,
331        join_expr: BoxedExpression,
332        left: Vec<DataChunk>,
333        right: BoxedExecutor,
334        shutdown_rx: ShutdownToken,
335    ) {
336        let mut matched = BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
337        #[for_await]
338        for right_chunk in right.execute() {
339            let right_chunk = right_chunk?;
340            for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
341                shutdown_rx.check()?;
342                if matched.is_set(left_row_idx) {
343                    continue;
344                }
345                let chunk = Self::concatenate_and_eval(
346                    join_expr.as_ref(),
347                    &left_data_types,
348                    left_row,
349                    &right_chunk,
350                )
351                .await?;
352                if chunk.cardinality() > 0 {
353                    matched.set(left_row_idx, true)
354                }
355            }
356        }
357        for (left_row, _) in left
358            .iter()
359            .flat_map(|chunk| chunk.rows())
360            .zip_eq_debug(matched.finish().iter())
361            .filter(|(_, matched)| if ANTI_JOIN { !*matched } else { *matched })
362        {
363            shutdown_rx.check()?;
364            if let Some(chunk) = chunk_builder.append_one_row(left_row) {
365                yield chunk
366            }
367        }
368    }
369
370    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
371    async fn do_right_outer_join(
372        chunk_builder: &mut DataChunkBuilder,
373        left_data_types: Vec<DataType>,
374        join_expr: BoxedExpression,
375        left: Vec<DataChunk>,
376        right: BoxedExecutor,
377        shutdown_rx: ShutdownToken,
378    ) {
379        #[for_await]
380        for right_chunk in right.execute() {
381            let right_chunk = right_chunk?;
382            // Use a bitmap to track which row of the current right chunk is matched.
383            let mut matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
384            for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
385                shutdown_rx.check()?;
386                let chunk = Self::concatenate_and_eval(
387                    join_expr.as_ref(),
388                    &left_data_types,
389                    left_row,
390                    &right_chunk,
391                )
392                .await?;
393                if chunk.cardinality() > 0 {
394                    // chunk.visibility() must be Some(_)
395                    matched = &matched | chunk.visibility();
396                    for spilled in chunk_builder.append_chunk(chunk) {
397                        yield spilled
398                    }
399                }
400            }
401            for (right_row, _) in right_chunk
402                .rows()
403                .zip_eq_debug(matched.iter())
404                .filter(|(_, matched)| !*matched)
405            {
406                shutdown_rx.check()?;
407                let row = repeat_n(Datum::None, left_data_types.len()).chain(right_row);
408                if let Some(chunk) = chunk_builder.append_one_row(row) {
409                    yield chunk
410                }
411            }
412        }
413    }
414
415    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
416    async fn do_right_semi_anti_join<const ANTI_JOIN: bool>(
417        chunk_builder: &mut DataChunkBuilder,
418        left_data_types: Vec<DataType>,
419        join_expr: BoxedExpression,
420        left: Vec<DataChunk>,
421        right: BoxedExecutor,
422        shutdown_rx: ShutdownToken,
423    ) {
424        #[for_await]
425        for right_chunk in right.execute() {
426            let mut right_chunk = right_chunk?;
427            let mut matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
428            for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
429                shutdown_rx.check()?;
430                let chunk = Self::concatenate_and_eval(
431                    join_expr.as_ref(),
432                    &left_data_types,
433                    left_row,
434                    &right_chunk,
435                )
436                .await?;
437                if chunk.cardinality() > 0 {
438                    // chunk.visibility() must be Some(_)
439                    matched = &matched | chunk.visibility();
440                }
441            }
442            if ANTI_JOIN {
443                matched = !&matched;
444            }
445            right_chunk.set_visibility(matched);
446            if right_chunk.cardinality() > 0 {
447                for spilled in chunk_builder.append_chunk(right_chunk) {
448                    yield spilled
449                }
450            }
451        }
452    }
453
454    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
455    async fn do_full_outer_join(
456        chunk_builder: &mut DataChunkBuilder,
457        left_data_types: Vec<DataType>,
458        join_expr: BoxedExpression,
459        left: Vec<DataChunk>,
460        right: BoxedExecutor,
461        shutdown_rx: ShutdownToken,
462    ) {
463        let mut left_matched =
464            BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
465        let right_data_types = right.schema().data_types();
466        #[for_await]
467        for right_chunk in right.execute() {
468            let right_chunk = right_chunk?;
469            let mut right_matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
470            for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
471                shutdown_rx.check()?;
472                let chunk = Self::concatenate_and_eval(
473                    join_expr.as_ref(),
474                    &left_data_types,
475                    left_row,
476                    &right_chunk,
477                )
478                .await?;
479                if chunk.cardinality() > 0 {
480                    left_matched.set(left_row_idx, true);
481                    right_matched = &right_matched | chunk.visibility();
482                    for spilled in chunk_builder.append_chunk(chunk) {
483                        yield spilled
484                    }
485                }
486            }
487            // Yield unmatched rows in the right table
488            for (right_row, _) in right_chunk
489                .rows()
490                .zip_eq_debug(right_matched.iter())
491                .filter(|(_, matched)| !*matched)
492            {
493                shutdown_rx.check()?;
494                let row = repeat_n(Datum::None, left_data_types.len()).chain(right_row);
495                if let Some(chunk) = chunk_builder.append_one_row(row) {
496                    yield chunk
497                }
498            }
499        }
500        // Yield unmatched rows in the left table.
501        for (left_row, _) in left
502            .iter()
503            .flat_map(|chunk| chunk.rows())
504            .zip_eq_debug(left_matched.finish().iter())
505            .filter(|(_, matched)| !*matched)
506        {
507            shutdown_rx.check()?;
508            let row = left_row.chain(repeat_n(Datum::None, right_data_types.len()));
509            if let Some(chunk) = chunk_builder.append_one_row(row) {
510                yield chunk
511            }
512        }
513    }
514}
515#[cfg(test)]
516mod tests {
517    use futures_async_stream::for_await;
518    use risingwave_common::array::*;
519    use risingwave_common::catalog::{Field, Schema};
520    use risingwave_common::memory::MemoryContext;
521    use risingwave_common::types::DataType;
522    use risingwave_expr::expr::build_from_pretty;
523
524    use crate::executor::BoxedExecutor;
525    use crate::executor::join::JoinType;
526    use crate::executor::join::nested_loop_join::NestedLoopJoinExecutor;
527    use crate::executor::test_utils::{MockExecutor, diff_executor_output};
528    use crate::task::ShutdownToken;
529
530    const CHUNK_SIZE: usize = 1024;
531
532    struct TestFixture {
533        join_type: JoinType,
534    }
535
536    /// Sql for creating test data:
537    /// ```sql
538    /// drop table t1 if exists;
539    /// create table t1(v1 int, v2 float);
540    /// insert into t1 values
541    /// (1, 6.1::FLOAT), (2, 8.4::FLOAT), (3, 3.9::FLOAT), (3, 6.6::FLOAT), (4, 0.7::FLOAT),
542    /// (6, 5.5::FLOAT), (6, 5.6::FLOAT), (8, 7.0::FLOAT);
543    ///
544    /// drop table t2 if exists;
545    /// create table t2(v1 int, v2 real);
546    /// insert into t2 values
547    /// (2, 6.1::REAL), (3, 8.9::REAL), (6, 3.4::REAL), (8, 3.5::REAL), (9, 7.5::REAL),
548    /// (10, null), (11, 8::REAL), (12, null), (20, 5.7::REAL), (30, 9.6::REAL),
549    /// (100, null), (200, 8.18::REAL);
550    /// ```
551    impl TestFixture {
552        fn with_join_type(join_type: JoinType) -> Self {
553            Self { join_type }
554        }
555
556        fn create_left_executor(&self) -> BoxedExecutor {
557            let schema = Schema {
558                fields: vec![
559                    Field::unnamed(DataType::Int32),
560                    Field::unnamed(DataType::Float32),
561                ],
562            };
563            let mut executor = MockExecutor::new(schema);
564
565            executor.add(DataChunk::from_pretty(
566                "i f
567                 1 6.1
568                 2 8.4
569                 3 3.9",
570            ));
571
572            executor.add(DataChunk::from_pretty(
573                "i f
574                 3 6.6
575                 4 0.7
576                 6 5.5
577                 6 5.6
578                 8 7.0",
579            ));
580
581            Box::new(executor)
582        }
583
584        fn create_right_executor(&self) -> BoxedExecutor {
585            let schema = Schema {
586                fields: vec![
587                    Field::unnamed(DataType::Int32),
588                    Field::unnamed(DataType::Float64),
589                ],
590            };
591            let mut executor = MockExecutor::new(schema);
592
593            executor.add(DataChunk::from_pretty(
594                "i F
595                 2 6.1
596                 3 8.9
597                 6 3.4
598                 8 3.5",
599            ));
600
601            executor.add(DataChunk::from_pretty(
602                " i F
603                  9 7.5
604                 10 .
605                 11 8
606                 12 .",
607            ));
608
609            executor.add(DataChunk::from_pretty(
610                "  i F
611                  20 5.7
612                  30 9.6
613                 100 .
614                 200 8.18",
615            ));
616
617            Box::new(executor)
618        }
619
620        fn create_join_executor(&self, shutdown_rx: ShutdownToken) -> BoxedExecutor {
621            let join_type = self.join_type;
622
623            let left_child = self.create_left_executor();
624            let right_child = self.create_right_executor();
625
626            let output_indices = match self.join_type {
627                JoinType::LeftSemi | JoinType::LeftAnti => vec![0, 1],
628                JoinType::RightSemi | JoinType::RightAnti => vec![0, 1],
629                _ => vec![0, 1, 2, 3],
630            };
631
632            Box::new(NestedLoopJoinExecutor::new(
633                build_from_pretty("(equal:boolean $0:int4 $2:int4)"),
634                join_type,
635                output_indices,
636                left_child,
637                right_child,
638                "NestedLoopJoinExecutor".into(),
639                CHUNK_SIZE,
640                MemoryContext::none(),
641                shutdown_rx,
642            ))
643        }
644
645        async fn do_test(&self, expected: DataChunk) {
646            let join_executor = self.create_join_executor(ShutdownToken::empty());
647            let mut expected_mock_exec = MockExecutor::new(join_executor.schema().clone());
648            expected_mock_exec.add(expected);
649            diff_executor_output(join_executor, Box::new(expected_mock_exec)).await;
650        }
651
652        async fn do_test_shutdown(&self) {
653            let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
654            let join_executor = self.create_join_executor(shutdown_rx);
655            shutdown_tx.cancel();
656            #[for_await]
657            for chunk in join_executor.execute() {
658                assert!(chunk.is_err());
659                break;
660            }
661
662            let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
663            let join_executor = self.create_join_executor(shutdown_rx);
664            shutdown_tx.abort("test");
665            #[for_await]
666            for chunk in join_executor.execute() {
667                assert!(chunk.is_err());
668                break;
669            }
670        }
671    }
672
673    /// sql: select * from t1, t2 where t1.v1 = t2.v1
674    #[tokio::test]
675    async fn test_inner_join() {
676        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
677
678        let expected_chunk = DataChunk::from_pretty(
679            "i f   i F
680             2 8.4 2 6.1
681             3 3.9 3 8.9
682             3 6.6 3 8.9
683             6 5.5 6 3.4
684             6 5.6 6 3.4
685             8 7.0 8 3.5",
686        );
687
688        test_fixture.do_test(expected_chunk).await;
689    }
690
691    /// sql: select * from t1 left outer join t2 on t1.v1 = t2.v1
692    #[tokio::test]
693    async fn test_left_outer_join() {
694        let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
695
696        let expected_chunk = DataChunk::from_pretty(
697            "i f   i F
698             2 8.4 2 6.1
699             3 3.9 3 8.9
700             3 6.6 3 8.9
701             6 5.5 6 3.4
702             6 5.6 6 3.4
703             8 7.0 8 3.5
704             1 6.1 . .
705             4 0.7 . .",
706        );
707
708        test_fixture.do_test(expected_chunk).await;
709    }
710
711    #[tokio::test]
712    async fn test_left_semi_join() {
713        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
714
715        let expected_chunk = DataChunk::from_pretty(
716            "i f
717             2 8.4
718             3 3.9
719             3 6.6
720             6 5.5
721             6 5.6
722             8 7.0",
723        );
724
725        test_fixture.do_test(expected_chunk).await;
726    }
727
728    #[tokio::test]
729    async fn test_left_anti_join() {
730        let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
731
732        let expected_chunk = DataChunk::from_pretty(
733            "i f
734             1 6.1
735             4 0.7",
736        );
737
738        test_fixture.do_test(expected_chunk).await;
739    }
740
741    #[tokio::test]
742    async fn test_right_outer_join() {
743        let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
744
745        let expected_chunk = DataChunk::from_pretty(
746            "i f   i F
747             2 8.4 2 6.1
748             3 3.9 3 8.9
749             3 6.6 3 8.9
750             6 5.5 6 3.4
751             6 5.6 6 3.4
752             8 7.0 8 3.5
753             . .   9 7.5
754             . .   10 .
755             . .   11 8
756             . .   12 .
757             . .   20 5.7
758             . .   30 9.6
759             . .   100 .
760             . .   200 8.18",
761        );
762
763        test_fixture.do_test(expected_chunk).await;
764    }
765
766    #[tokio::test]
767    async fn test_right_semi_join() {
768        let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
769
770        let expected_chunk = DataChunk::from_pretty(
771            "i F
772             2 6.1
773             3 8.9
774             6 3.4
775             8 3.5",
776        );
777
778        test_fixture.do_test(expected_chunk).await;
779    }
780
781    #[tokio::test]
782    async fn test_right_anti_join() {
783        let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
784
785        let expected_chunk = DataChunk::from_pretty(
786            "  i F
787               9 7.5
788              10 .
789              11 8
790              12 .
791              20 5.7
792              30 9.6
793             100 .
794             200 8.18",
795        );
796
797        test_fixture.do_test(expected_chunk).await;
798    }
799
800    #[tokio::test]
801    async fn test_full_outer_join() {
802        let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
803
804        let expected_chunk = DataChunk::from_pretty(
805            "i f   i F
806             2 8.4 2 6.1
807             3 3.9 3 8.9
808             3 6.6 3 8.9
809             6 5.5 6 3.4
810             6 5.6 6 3.4
811             8 7.0 8 3.5
812             . .   9 7.5
813             . .   10 .
814             . .   11 8
815             . .   12 .
816             . .   20 5.7
817             . .   30 9.6
818             . .   100 .
819             . .   200 8.18
820             1 6.1 . .
821             4 0.7 . .",
822        );
823
824        test_fixture.do_test(expected_chunk).await;
825    }
826
827    #[tokio::test]
828    async fn test_shutdown_rx() {
829        let test_fixture = TestFixture::with_join_type(JoinType::Inner);
830        test_fixture.do_test_shutdown().await;
831        let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
832        test_fixture.do_test_shutdown().await;
833        let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
834        test_fixture.do_test_shutdown().await;
835        let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
836        test_fixture.do_test_shutdown().await;
837        let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
838        test_fixture.do_test_shutdown().await;
839        let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
840        test_fixture.do_test_shutdown().await;
841        let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
842        test_fixture.do_test_shutdown().await;
843    }
844}