1use futures_async_stream::try_stream;
16use risingwave_common::array::data_chunk_iter::RowRef;
17use risingwave_common::array::{Array, DataChunk};
18use risingwave_common::bitmap::BitmapBuilder;
19use risingwave_common::catalog::Schema;
20use risingwave_common::memory::MemoryContext;
21use risingwave_common::row::{RowExt, repeat_n};
22use risingwave_common::types::{DataType, Datum};
23use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use risingwave_common_estimate_size::EstimateSize;
26use risingwave_expr::expr::{BoxedExpression, build_from_prost as expr_build_from_prost};
27use risingwave_pb::batch_plan::plan_node::NodeBody;
28
29use crate::error::{BatchError, Result};
30use crate::executor::join::{JoinType, concatenate, convert_row_to_chunk};
31use crate::executor::{
32 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
33};
34use crate::task::ShutdownToken;
35
36pub struct NestedLoopJoinExecutor {
44 join_expr: BoxedExpression,
46 join_type: JoinType,
48 original_schema: Schema,
50 schema: Schema,
52 output_indices: Vec<usize>,
55 left_child: BoxedExecutor,
57 right_child: BoxedExecutor,
59 identity: String,
61 chunk_size: usize,
63
64 mem_context: MemoryContext,
66
67 shutdown_rx: ShutdownToken,
68}
69
70impl Executor for NestedLoopJoinExecutor {
71 fn schema(&self) -> &Schema {
72 &self.schema
73 }
74
75 fn identity(&self) -> &str {
76 &self.identity
77 }
78
79 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
80 self.do_execute()
81 }
82}
83
84impl NestedLoopJoinExecutor {
85 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
86 async fn do_execute(self: Box<Self>) {
87 let left_data_types = self.left_child.schema().data_types();
88 let data_types = self.original_schema.data_types();
89
90 let mut chunk_builder = DataChunkBuilder::new(data_types, self.chunk_size);
91
92 let left: Vec<DataChunk> = {
94 let mut ret = Vec::with_capacity(1024);
95 #[for_await]
96 for chunk in self.left_child.execute() {
97 let c = chunk?;
98 trace!("Estimated chunk size is {:?}", c.estimated_heap_size());
99 if !self.mem_context.add(c.estimated_heap_size() as i64) {
100 Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
101 }
102 ret.push(c);
103 }
104 ret
105 };
106
107 let stream = match self.join_type {
109 JoinType::Inner => Self::do_inner_join,
110 JoinType::LeftOuter => Self::do_left_outer_join,
111 JoinType::LeftSemi => Self::do_left_semi_anti_join::<false>,
112 JoinType::LeftAnti => Self::do_left_semi_anti_join::<true>,
113 JoinType::RightOuter => Self::do_right_outer_join,
114 JoinType::RightSemi => Self::do_right_semi_anti_join::<false>,
115 JoinType::RightAnti => Self::do_right_semi_anti_join::<true>,
116 JoinType::FullOuter => Self::do_full_outer_join,
117 JoinType::AsOfInner | JoinType::AsOfLeftOuter => {
118 unimplemented!("AsOf join is not supported in NestedLoopJoinExecutor")
119 }
120 };
121
122 #[for_await]
123 for chunk in stream(
124 &mut chunk_builder,
125 left_data_types,
126 self.join_expr,
127 left,
128 self.right_child,
129 self.shutdown_rx.clone(),
130 ) {
131 yield chunk?.project(&self.output_indices)
132 }
133
134 if let Some(chunk) = chunk_builder.consume_all() {
136 yield chunk.project(&self.output_indices)
137 }
138 }
139}
140
141impl NestedLoopJoinExecutor {
142 async fn concatenate_and_eval(
145 expr: &BoxedExpression,
146 left_row_types: &[DataType],
147 left_row: RowRef<'_>,
148 right_chunk: &DataChunk,
149 ) -> Result<DataChunk> {
150 let left_chunk = convert_row_to_chunk(&left_row, right_chunk.capacity(), left_row_types)?;
151 let mut chunk = concatenate(&left_chunk, right_chunk)?;
152 chunk.set_visibility(expr.eval(&chunk).await?.as_bool().iter().collect());
153 Ok(chunk)
154 }
155}
156
157impl BoxedExecutorBuilder for NestedLoopJoinExecutor {
158 async fn new_boxed_executor(
159 source: &ExecutorBuilder<'_>,
160 inputs: Vec<BoxedExecutor>,
161 ) -> Result<BoxedExecutor> {
162 let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap();
163
164 let nested_loop_join_node = try_match_expand!(
165 source.plan_node().get_node_body().unwrap(),
166 NodeBody::NestedLoopJoin
167 )?;
168
169 let join_type = JoinType::from_prost(nested_loop_join_node.get_join_type()?);
170 let join_expr = expr_build_from_prost(nested_loop_join_node.get_join_cond()?)?;
171
172 let output_indices = nested_loop_join_node
173 .output_indices
174 .iter()
175 .map(|&v| v as usize)
176 .collect();
177
178 let identity = source.plan_node().get_identity().clone();
179 let mem_context = source.context().create_executor_mem_context(&identity);
180
181 Ok(Box::new(NestedLoopJoinExecutor::new(
182 join_expr,
183 join_type,
184 output_indices,
185 left_child,
186 right_child,
187 identity,
188 source.context().get_config().developer.chunk_size,
189 mem_context,
190 source.shutdown_rx().clone(),
191 )))
192 }
193}
194
195impl NestedLoopJoinExecutor {
196 pub fn new(
197 join_expr: BoxedExpression,
198 join_type: JoinType,
199 output_indices: Vec<usize>,
200 left_child: BoxedExecutor,
201 right_child: BoxedExecutor,
202 identity: String,
203 chunk_size: usize,
204 mem_context: MemoryContext,
205 shutdown_rx: ShutdownToken,
206 ) -> Self {
207 let original_schema = match join_type {
209 JoinType::LeftSemi | JoinType::LeftAnti => left_child.schema().clone(),
210 JoinType::RightSemi | JoinType::RightAnti => right_child.schema().clone(),
211 _ => Schema::from_iter(
212 left_child
213 .schema()
214 .fields()
215 .iter()
216 .chain(right_child.schema().fields().iter())
217 .cloned(),
218 ),
219 };
220 let schema = Schema::from_iter(
221 output_indices
222 .iter()
223 .map(|&idx| original_schema[idx].clone()),
224 );
225 Self {
226 join_expr,
227 join_type,
228 original_schema,
229 schema,
230 output_indices,
231 left_child,
232 right_child,
233 identity,
234 chunk_size,
235 mem_context,
236 shutdown_rx,
237 }
238 }
239}
240
241impl NestedLoopJoinExecutor {
242 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
243 async fn do_inner_join(
244 chunk_builder: &mut DataChunkBuilder,
245 left_data_types: Vec<DataType>,
246 join_expr: BoxedExpression,
247 left: Vec<DataChunk>,
248 right: BoxedExecutor,
249 shutdown_rx: ShutdownToken,
250 ) {
251 #[for_await]
253 for right_chunk in right.execute() {
254 let right_chunk = right_chunk?;
255 for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
257 shutdown_rx.check()?;
258 let chunk = Self::concatenate_and_eval(
261 &join_expr,
262 &left_data_types,
263 left_row,
264 &right_chunk,
265 )
266 .await?;
267 if chunk.cardinality() > 0 {
269 for spilled in chunk_builder.append_chunk(chunk) {
270 yield spilled
271 }
272 }
273 }
274 }
275 }
276
277 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
278 async fn do_left_outer_join(
279 chunk_builder: &mut DataChunkBuilder,
280 left_data_types: Vec<DataType>,
281 join_expr: BoxedExpression,
282 left: Vec<DataChunk>,
283 right: BoxedExecutor,
284 shutdown_rx: ShutdownToken,
285 ) {
286 let mut matched = BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
287 let right_data_types = right.schema().data_types();
288 #[for_await]
291 for right_chunk in right.execute() {
292 let right_chunk = right_chunk?;
293 for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
294 shutdown_rx.check()?;
295 let chunk = Self::concatenate_and_eval(
296 &join_expr,
297 &left_data_types,
298 left_row,
299 &right_chunk,
300 )
301 .await?;
302 if chunk.cardinality() > 0 {
303 matched.set(left_row_idx, true);
304 for spilled in chunk_builder.append_chunk(chunk) {
305 yield spilled
306 }
307 }
308 }
309 }
310 for (left_row, _) in left
312 .iter()
313 .flat_map(|chunk| chunk.rows())
314 .zip_eq_debug(matched.finish().iter())
315 .filter(|(_, matched)| !*matched)
316 {
317 shutdown_rx.check()?;
318 let row = left_row.chain(repeat_n(Datum::None, right_data_types.len()));
319 if let Some(chunk) = chunk_builder.append_one_row(row) {
320 yield chunk
321 }
322 }
323 }
324
325 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
326 async fn do_left_semi_anti_join<const ANTI_JOIN: bool>(
327 chunk_builder: &mut DataChunkBuilder,
328 left_data_types: Vec<DataType>,
329 join_expr: BoxedExpression,
330 left: Vec<DataChunk>,
331 right: BoxedExecutor,
332 shutdown_rx: ShutdownToken,
333 ) {
334 let mut matched = BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
335 #[for_await]
336 for right_chunk in right.execute() {
337 let right_chunk = right_chunk?;
338 for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
339 shutdown_rx.check()?;
340 if matched.is_set(left_row_idx) {
341 continue;
342 }
343 let chunk = Self::concatenate_and_eval(
344 &join_expr,
345 &left_data_types,
346 left_row,
347 &right_chunk,
348 )
349 .await?;
350 if chunk.cardinality() > 0 {
351 matched.set(left_row_idx, true)
352 }
353 }
354 }
355 for (left_row, _) in left
356 .iter()
357 .flat_map(|chunk| chunk.rows())
358 .zip_eq_debug(matched.finish().iter())
359 .filter(|(_, matched)| if ANTI_JOIN { !*matched } else { *matched })
360 {
361 shutdown_rx.check()?;
362 if let Some(chunk) = chunk_builder.append_one_row(left_row) {
363 yield chunk
364 }
365 }
366 }
367
368 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
369 async fn do_right_outer_join(
370 chunk_builder: &mut DataChunkBuilder,
371 left_data_types: Vec<DataType>,
372 join_expr: BoxedExpression,
373 left: Vec<DataChunk>,
374 right: BoxedExecutor,
375 shutdown_rx: ShutdownToken,
376 ) {
377 #[for_await]
378 for right_chunk in right.execute() {
379 let right_chunk = right_chunk?;
380 let mut matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
382 for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
383 shutdown_rx.check()?;
384 let chunk = Self::concatenate_and_eval(
385 &join_expr,
386 &left_data_types,
387 left_row,
388 &right_chunk,
389 )
390 .await?;
391 if chunk.cardinality() > 0 {
392 matched = &matched | chunk.visibility();
394 for spilled in chunk_builder.append_chunk(chunk) {
395 yield spilled
396 }
397 }
398 }
399 for (right_row, _) in right_chunk
400 .rows()
401 .zip_eq_debug(matched.iter())
402 .filter(|(_, matched)| !*matched)
403 {
404 shutdown_rx.check()?;
405 let row = repeat_n(Datum::None, left_data_types.len()).chain(right_row);
406 if let Some(chunk) = chunk_builder.append_one_row(row) {
407 yield chunk
408 }
409 }
410 }
411 }
412
413 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
414 async fn do_right_semi_anti_join<const ANTI_JOIN: bool>(
415 chunk_builder: &mut DataChunkBuilder,
416 left_data_types: Vec<DataType>,
417 join_expr: BoxedExpression,
418 left: Vec<DataChunk>,
419 right: BoxedExecutor,
420 shutdown_rx: ShutdownToken,
421 ) {
422 #[for_await]
423 for right_chunk in right.execute() {
424 let mut right_chunk = right_chunk?;
425 let mut matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
426 for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
427 shutdown_rx.check()?;
428 let chunk = Self::concatenate_and_eval(
429 &join_expr,
430 &left_data_types,
431 left_row,
432 &right_chunk,
433 )
434 .await?;
435 if chunk.cardinality() > 0 {
436 matched = &matched | chunk.visibility();
438 }
439 }
440 if ANTI_JOIN {
441 matched = !&matched;
442 }
443 right_chunk.set_visibility(matched);
444 if right_chunk.cardinality() > 0 {
445 for spilled in chunk_builder.append_chunk(right_chunk) {
446 yield spilled
447 }
448 }
449 }
450 }
451
452 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
453 async fn do_full_outer_join(
454 chunk_builder: &mut DataChunkBuilder,
455 left_data_types: Vec<DataType>,
456 join_expr: BoxedExpression,
457 left: Vec<DataChunk>,
458 right: BoxedExecutor,
459 shutdown_rx: ShutdownToken,
460 ) {
461 let mut left_matched =
462 BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
463 let right_data_types = right.schema().data_types();
464 #[for_await]
465 for right_chunk in right.execute() {
466 let right_chunk = right_chunk?;
467 let mut right_matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
468 for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
469 shutdown_rx.check()?;
470 let chunk = Self::concatenate_and_eval(
471 &join_expr,
472 &left_data_types,
473 left_row,
474 &right_chunk,
475 )
476 .await?;
477 if chunk.cardinality() > 0 {
478 left_matched.set(left_row_idx, true);
479 right_matched = &right_matched | chunk.visibility();
480 for spilled in chunk_builder.append_chunk(chunk) {
481 yield spilled
482 }
483 }
484 }
485 for (right_row, _) in right_chunk
487 .rows()
488 .zip_eq_debug(right_matched.iter())
489 .filter(|(_, matched)| !*matched)
490 {
491 shutdown_rx.check()?;
492 let row = repeat_n(Datum::None, left_data_types.len()).chain(right_row);
493 if let Some(chunk) = chunk_builder.append_one_row(row) {
494 yield chunk
495 }
496 }
497 }
498 for (left_row, _) in left
500 .iter()
501 .flat_map(|chunk| chunk.rows())
502 .zip_eq_debug(left_matched.finish().iter())
503 .filter(|(_, matched)| !*matched)
504 {
505 shutdown_rx.check()?;
506 let row = left_row.chain(repeat_n(Datum::None, right_data_types.len()));
507 if let Some(chunk) = chunk_builder.append_one_row(row) {
508 yield chunk
509 }
510 }
511 }
512}
513#[cfg(test)]
514mod tests {
515 use futures_async_stream::for_await;
516 use risingwave_common::array::*;
517 use risingwave_common::catalog::{Field, Schema};
518 use risingwave_common::memory::MemoryContext;
519 use risingwave_common::types::DataType;
520 use risingwave_expr::expr::build_from_pretty;
521
522 use crate::executor::BoxedExecutor;
523 use crate::executor::join::JoinType;
524 use crate::executor::join::nested_loop_join::NestedLoopJoinExecutor;
525 use crate::executor::test_utils::{MockExecutor, diff_executor_output};
526 use crate::task::ShutdownToken;
527
528 const CHUNK_SIZE: usize = 1024;
529
530 struct TestFixture {
531 join_type: JoinType,
532 }
533
534 impl TestFixture {
550 fn with_join_type(join_type: JoinType) -> Self {
551 Self { join_type }
552 }
553
554 fn create_left_executor(&self) -> BoxedExecutor {
555 let schema = Schema {
556 fields: vec![
557 Field::unnamed(DataType::Int32),
558 Field::unnamed(DataType::Float32),
559 ],
560 };
561 let mut executor = MockExecutor::new(schema);
562
563 executor.add(DataChunk::from_pretty(
564 "i f
565 1 6.1
566 2 8.4
567 3 3.9",
568 ));
569
570 executor.add(DataChunk::from_pretty(
571 "i f
572 3 6.6
573 4 0.7
574 6 5.5
575 6 5.6
576 8 7.0",
577 ));
578
579 Box::new(executor)
580 }
581
582 fn create_right_executor(&self) -> BoxedExecutor {
583 let schema = Schema {
584 fields: vec![
585 Field::unnamed(DataType::Int32),
586 Field::unnamed(DataType::Float64),
587 ],
588 };
589 let mut executor = MockExecutor::new(schema);
590
591 executor.add(DataChunk::from_pretty(
592 "i F
593 2 6.1
594 3 8.9
595 6 3.4
596 8 3.5",
597 ));
598
599 executor.add(DataChunk::from_pretty(
600 " i F
601 9 7.5
602 10 .
603 11 8
604 12 .",
605 ));
606
607 executor.add(DataChunk::from_pretty(
608 " i F
609 20 5.7
610 30 9.6
611 100 .
612 200 8.18",
613 ));
614
615 Box::new(executor)
616 }
617
618 fn create_join_executor(&self, shutdown_rx: ShutdownToken) -> BoxedExecutor {
619 let join_type = self.join_type;
620
621 let left_child = self.create_left_executor();
622 let right_child = self.create_right_executor();
623
624 let output_indices = match self.join_type {
625 JoinType::LeftSemi | JoinType::LeftAnti => vec![0, 1],
626 JoinType::RightSemi | JoinType::RightAnti => vec![0, 1],
627 _ => vec![0, 1, 2, 3],
628 };
629
630 Box::new(NestedLoopJoinExecutor::new(
631 build_from_pretty("(equal:boolean $0:int4 $2:int4)"),
632 join_type,
633 output_indices,
634 left_child,
635 right_child,
636 "NestedLoopJoinExecutor".into(),
637 CHUNK_SIZE,
638 MemoryContext::none(),
639 shutdown_rx,
640 ))
641 }
642
643 async fn do_test(&self, expected: DataChunk) {
644 let join_executor = self.create_join_executor(ShutdownToken::empty());
645 let mut expected_mock_exec = MockExecutor::new(join_executor.schema().clone());
646 expected_mock_exec.add(expected);
647 diff_executor_output(join_executor, Box::new(expected_mock_exec)).await;
648 }
649
650 async fn do_test_shutdown(&self) {
651 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
652 let join_executor = self.create_join_executor(shutdown_rx);
653 shutdown_tx.cancel();
654 #[for_await]
655 for chunk in join_executor.execute() {
656 assert!(chunk.is_err());
657 break;
658 }
659
660 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
661 let join_executor = self.create_join_executor(shutdown_rx);
662 shutdown_tx.abort("test");
663 #[for_await]
664 for chunk in join_executor.execute() {
665 assert!(chunk.is_err());
666 break;
667 }
668 }
669 }
670
671 #[tokio::test]
673 async fn test_inner_join() {
674 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
675
676 let expected_chunk = DataChunk::from_pretty(
677 "i f i F
678 2 8.4 2 6.1
679 3 3.9 3 8.9
680 3 6.6 3 8.9
681 6 5.5 6 3.4
682 6 5.6 6 3.4
683 8 7.0 8 3.5",
684 );
685
686 test_fixture.do_test(expected_chunk).await;
687 }
688
689 #[tokio::test]
691 async fn test_left_outer_join() {
692 let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
693
694 let expected_chunk = DataChunk::from_pretty(
695 "i f i F
696 2 8.4 2 6.1
697 3 3.9 3 8.9
698 3 6.6 3 8.9
699 6 5.5 6 3.4
700 6 5.6 6 3.4
701 8 7.0 8 3.5
702 1 6.1 . .
703 4 0.7 . .",
704 );
705
706 test_fixture.do_test(expected_chunk).await;
707 }
708
709 #[tokio::test]
710 async fn test_left_semi_join() {
711 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
712
713 let expected_chunk = DataChunk::from_pretty(
714 "i f
715 2 8.4
716 3 3.9
717 3 6.6
718 6 5.5
719 6 5.6
720 8 7.0",
721 );
722
723 test_fixture.do_test(expected_chunk).await;
724 }
725
726 #[tokio::test]
727 async fn test_left_anti_join() {
728 let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
729
730 let expected_chunk = DataChunk::from_pretty(
731 "i f
732 1 6.1
733 4 0.7",
734 );
735
736 test_fixture.do_test(expected_chunk).await;
737 }
738
739 #[tokio::test]
740 async fn test_right_outer_join() {
741 let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
742
743 let expected_chunk = DataChunk::from_pretty(
744 "i f i F
745 2 8.4 2 6.1
746 3 3.9 3 8.9
747 3 6.6 3 8.9
748 6 5.5 6 3.4
749 6 5.6 6 3.4
750 8 7.0 8 3.5
751 . . 9 7.5
752 . . 10 .
753 . . 11 8
754 . . 12 .
755 . . 20 5.7
756 . . 30 9.6
757 . . 100 .
758 . . 200 8.18",
759 );
760
761 test_fixture.do_test(expected_chunk).await;
762 }
763
764 #[tokio::test]
765 async fn test_right_semi_join() {
766 let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
767
768 let expected_chunk = DataChunk::from_pretty(
769 "i F
770 2 6.1
771 3 8.9
772 6 3.4
773 8 3.5",
774 );
775
776 test_fixture.do_test(expected_chunk).await;
777 }
778
779 #[tokio::test]
780 async fn test_right_anti_join() {
781 let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
782
783 let expected_chunk = DataChunk::from_pretty(
784 " i F
785 9 7.5
786 10 .
787 11 8
788 12 .
789 20 5.7
790 30 9.6
791 100 .
792 200 8.18",
793 );
794
795 test_fixture.do_test(expected_chunk).await;
796 }
797
798 #[tokio::test]
799 async fn test_full_outer_join() {
800 let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
801
802 let expected_chunk = DataChunk::from_pretty(
803 "i f i F
804 2 8.4 2 6.1
805 3 3.9 3 8.9
806 3 6.6 3 8.9
807 6 5.5 6 3.4
808 6 5.6 6 3.4
809 8 7.0 8 3.5
810 . . 9 7.5
811 . . 10 .
812 . . 11 8
813 . . 12 .
814 . . 20 5.7
815 . . 30 9.6
816 . . 100 .
817 . . 200 8.18
818 1 6.1 . .
819 4 0.7 . .",
820 );
821
822 test_fixture.do_test(expected_chunk).await;
823 }
824
825 #[tokio::test]
826 async fn test_shutdown_rx() {
827 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
828 test_fixture.do_test_shutdown().await;
829 let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
830 test_fixture.do_test_shutdown().await;
831 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
832 test_fixture.do_test_shutdown().await;
833 let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
834 test_fixture.do_test_shutdown().await;
835 let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
836 test_fixture.do_test_shutdown().await;
837 let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
838 test_fixture.do_test_shutdown().await;
839 let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
840 test_fixture.do_test_shutdown().await;
841 }
842}