1use futures_async_stream::try_stream;
16use risingwave_common::array::data_chunk_iter::RowRef;
17use risingwave_common::array::{Array, DataChunk};
18use risingwave_common::bitmap::BitmapBuilder;
19use risingwave_common::catalog::Schema;
20use risingwave_common::memory::MemoryContext;
21use risingwave_common::row::{RowExt, repeat_n};
22use risingwave_common::types::{DataType, Datum};
23use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use risingwave_common_estimate_size::EstimateSize;
26use risingwave_expr::expr::{
27 BoxedExpression, Expression, build_from_prost as expr_build_from_prost,
28};
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30
31use crate::error::{BatchError, Result};
32use crate::executor::join::{JoinType, concatenate, convert_row_to_chunk};
33use crate::executor::{
34 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
35};
36use crate::task::ShutdownToken;
37
38pub struct NestedLoopJoinExecutor {
46 join_expr: BoxedExpression,
48 join_type: JoinType,
50 original_schema: Schema,
52 schema: Schema,
54 output_indices: Vec<usize>,
57 left_child: BoxedExecutor,
59 right_child: BoxedExecutor,
61 identity: String,
63 chunk_size: usize,
65
66 mem_context: MemoryContext,
68
69 shutdown_rx: ShutdownToken,
70}
71
72impl Executor for NestedLoopJoinExecutor {
73 fn schema(&self) -> &Schema {
74 &self.schema
75 }
76
77 fn identity(&self) -> &str {
78 &self.identity
79 }
80
81 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
82 self.do_execute()
83 }
84}
85
86impl NestedLoopJoinExecutor {
87 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
88 async fn do_execute(self: Box<Self>) {
89 let left_data_types = self.left_child.schema().data_types();
90 let data_types = self.original_schema.data_types();
91
92 let mut chunk_builder = DataChunkBuilder::new(data_types, self.chunk_size);
93
94 let left: Vec<DataChunk> = {
96 let mut ret = Vec::with_capacity(1024);
97 #[for_await]
98 for chunk in self.left_child.execute() {
99 let c = chunk?;
100 trace!("Estimated chunk size is {:?}", c.estimated_heap_size());
101 if !self.mem_context.add(c.estimated_heap_size() as i64) {
102 Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
103 }
104 ret.push(c);
105 }
106 ret
107 };
108
109 let stream = match self.join_type {
111 JoinType::Inner => Self::do_inner_join,
112 JoinType::LeftOuter => Self::do_left_outer_join,
113 JoinType::LeftSemi => Self::do_left_semi_anti_join::<false>,
114 JoinType::LeftAnti => Self::do_left_semi_anti_join::<true>,
115 JoinType::RightOuter => Self::do_right_outer_join,
116 JoinType::RightSemi => Self::do_right_semi_anti_join::<false>,
117 JoinType::RightAnti => Self::do_right_semi_anti_join::<true>,
118 JoinType::FullOuter => Self::do_full_outer_join,
119 JoinType::AsOfInner | JoinType::AsOfLeftOuter => {
120 unimplemented!("AsOf join is not supported in NestedLoopJoinExecutor")
121 }
122 };
123
124 #[for_await]
125 for chunk in stream(
126 &mut chunk_builder,
127 left_data_types,
128 self.join_expr,
129 left,
130 self.right_child,
131 self.shutdown_rx.clone(),
132 ) {
133 yield chunk?.project(&self.output_indices)
134 }
135
136 if let Some(chunk) = chunk_builder.consume_all() {
138 yield chunk.project(&self.output_indices)
139 }
140 }
141}
142
143impl NestedLoopJoinExecutor {
144 async fn concatenate_and_eval(
147 expr: &dyn Expression,
148 left_row_types: &[DataType],
149 left_row: RowRef<'_>,
150 right_chunk: &DataChunk,
151 ) -> Result<DataChunk> {
152 let left_chunk = convert_row_to_chunk(&left_row, right_chunk.capacity(), left_row_types)?;
153 let mut chunk = concatenate(&left_chunk, right_chunk)?;
154 chunk.set_visibility(expr.eval(&chunk).await?.as_bool().iter().collect());
155 Ok(chunk)
156 }
157}
158
159impl BoxedExecutorBuilder for NestedLoopJoinExecutor {
160 async fn new_boxed_executor(
161 source: &ExecutorBuilder<'_>,
162 inputs: Vec<BoxedExecutor>,
163 ) -> Result<BoxedExecutor> {
164 let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap();
165
166 let nested_loop_join_node = try_match_expand!(
167 source.plan_node().get_node_body().unwrap(),
168 NodeBody::NestedLoopJoin
169 )?;
170
171 let join_type = JoinType::from_prost(nested_loop_join_node.get_join_type()?);
172 let join_expr = expr_build_from_prost(nested_loop_join_node.get_join_cond()?)?;
173
174 let output_indices = nested_loop_join_node
175 .output_indices
176 .iter()
177 .map(|&v| v as usize)
178 .collect();
179
180 let identity = source.plan_node().get_identity().clone();
181 let mem_context = source.context().create_executor_mem_context(&identity);
182
183 Ok(Box::new(NestedLoopJoinExecutor::new(
184 join_expr,
185 join_type,
186 output_indices,
187 left_child,
188 right_child,
189 identity,
190 source.context().get_config().developer.chunk_size,
191 mem_context,
192 source.shutdown_rx().clone(),
193 )))
194 }
195}
196
197impl NestedLoopJoinExecutor {
198 #[allow(clippy::too_many_arguments)]
199 pub fn new(
200 join_expr: BoxedExpression,
201 join_type: JoinType,
202 output_indices: Vec<usize>,
203 left_child: BoxedExecutor,
204 right_child: BoxedExecutor,
205 identity: String,
206 chunk_size: usize,
207 mem_context: MemoryContext,
208 shutdown_rx: ShutdownToken,
209 ) -> Self {
210 let original_schema = match join_type {
212 JoinType::LeftSemi | JoinType::LeftAnti => left_child.schema().clone(),
213 JoinType::RightSemi | JoinType::RightAnti => right_child.schema().clone(),
214 _ => Schema::from_iter(
215 left_child
216 .schema()
217 .fields()
218 .iter()
219 .chain(right_child.schema().fields().iter())
220 .cloned(),
221 ),
222 };
223 let schema = Schema::from_iter(
224 output_indices
225 .iter()
226 .map(|&idx| original_schema[idx].clone()),
227 );
228 Self {
229 join_expr,
230 join_type,
231 original_schema,
232 schema,
233 output_indices,
234 left_child,
235 right_child,
236 identity,
237 chunk_size,
238 mem_context,
239 shutdown_rx,
240 }
241 }
242}
243
244impl NestedLoopJoinExecutor {
245 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
246 async fn do_inner_join(
247 chunk_builder: &mut DataChunkBuilder,
248 left_data_types: Vec<DataType>,
249 join_expr: BoxedExpression,
250 left: Vec<DataChunk>,
251 right: BoxedExecutor,
252 shutdown_rx: ShutdownToken,
253 ) {
254 #[for_await]
256 for right_chunk in right.execute() {
257 let right_chunk = right_chunk?;
258 for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
260 shutdown_rx.check()?;
261 let chunk = Self::concatenate_and_eval(
264 join_expr.as_ref(),
265 &left_data_types,
266 left_row,
267 &right_chunk,
268 )
269 .await?;
270 if chunk.cardinality() > 0 {
272 for spilled in chunk_builder.append_chunk(chunk) {
273 yield spilled
274 }
275 }
276 }
277 }
278 }
279
280 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
281 async fn do_left_outer_join(
282 chunk_builder: &mut DataChunkBuilder,
283 left_data_types: Vec<DataType>,
284 join_expr: BoxedExpression,
285 left: Vec<DataChunk>,
286 right: BoxedExecutor,
287 shutdown_rx: ShutdownToken,
288 ) {
289 let mut matched = BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
290 let right_data_types = right.schema().data_types();
291 #[for_await]
294 for right_chunk in right.execute() {
295 let right_chunk = right_chunk?;
296 for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
297 shutdown_rx.check()?;
298 let chunk = Self::concatenate_and_eval(
299 join_expr.as_ref(),
300 &left_data_types,
301 left_row,
302 &right_chunk,
303 )
304 .await?;
305 if chunk.cardinality() > 0 {
306 matched.set(left_row_idx, true);
307 for spilled in chunk_builder.append_chunk(chunk) {
308 yield spilled
309 }
310 }
311 }
312 }
313 for (left_row, _) in left
315 .iter()
316 .flat_map(|chunk| chunk.rows())
317 .zip_eq_debug(matched.finish().iter())
318 .filter(|(_, matched)| !*matched)
319 {
320 shutdown_rx.check()?;
321 let row = left_row.chain(repeat_n(Datum::None, right_data_types.len()));
322 if let Some(chunk) = chunk_builder.append_one_row(row) {
323 yield chunk
324 }
325 }
326 }
327
328 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
329 async fn do_left_semi_anti_join<const ANTI_JOIN: bool>(
330 chunk_builder: &mut DataChunkBuilder,
331 left_data_types: Vec<DataType>,
332 join_expr: BoxedExpression,
333 left: Vec<DataChunk>,
334 right: BoxedExecutor,
335 shutdown_rx: ShutdownToken,
336 ) {
337 let mut matched = BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
338 #[for_await]
339 for right_chunk in right.execute() {
340 let right_chunk = right_chunk?;
341 for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
342 shutdown_rx.check()?;
343 if matched.is_set(left_row_idx) {
344 continue;
345 }
346 let chunk = Self::concatenate_and_eval(
347 join_expr.as_ref(),
348 &left_data_types,
349 left_row,
350 &right_chunk,
351 )
352 .await?;
353 if chunk.cardinality() > 0 {
354 matched.set(left_row_idx, true)
355 }
356 }
357 }
358 for (left_row, _) in left
359 .iter()
360 .flat_map(|chunk| chunk.rows())
361 .zip_eq_debug(matched.finish().iter())
362 .filter(|(_, matched)| if ANTI_JOIN { !*matched } else { *matched })
363 {
364 shutdown_rx.check()?;
365 if let Some(chunk) = chunk_builder.append_one_row(left_row) {
366 yield chunk
367 }
368 }
369 }
370
371 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
372 async fn do_right_outer_join(
373 chunk_builder: &mut DataChunkBuilder,
374 left_data_types: Vec<DataType>,
375 join_expr: BoxedExpression,
376 left: Vec<DataChunk>,
377 right: BoxedExecutor,
378 shutdown_rx: ShutdownToken,
379 ) {
380 #[for_await]
381 for right_chunk in right.execute() {
382 let right_chunk = right_chunk?;
383 let mut matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
385 for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
386 shutdown_rx.check()?;
387 let chunk = Self::concatenate_and_eval(
388 join_expr.as_ref(),
389 &left_data_types,
390 left_row,
391 &right_chunk,
392 )
393 .await?;
394 if chunk.cardinality() > 0 {
395 matched = &matched | chunk.visibility();
397 for spilled in chunk_builder.append_chunk(chunk) {
398 yield spilled
399 }
400 }
401 }
402 for (right_row, _) in right_chunk
403 .rows()
404 .zip_eq_debug(matched.iter())
405 .filter(|(_, matched)| !*matched)
406 {
407 shutdown_rx.check()?;
408 let row = repeat_n(Datum::None, left_data_types.len()).chain(right_row);
409 if let Some(chunk) = chunk_builder.append_one_row(row) {
410 yield chunk
411 }
412 }
413 }
414 }
415
416 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
417 async fn do_right_semi_anti_join<const ANTI_JOIN: bool>(
418 chunk_builder: &mut DataChunkBuilder,
419 left_data_types: Vec<DataType>,
420 join_expr: BoxedExpression,
421 left: Vec<DataChunk>,
422 right: BoxedExecutor,
423 shutdown_rx: ShutdownToken,
424 ) {
425 #[for_await]
426 for right_chunk in right.execute() {
427 let mut right_chunk = right_chunk?;
428 let mut matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
429 for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
430 shutdown_rx.check()?;
431 let chunk = Self::concatenate_and_eval(
432 join_expr.as_ref(),
433 &left_data_types,
434 left_row,
435 &right_chunk,
436 )
437 .await?;
438 if chunk.cardinality() > 0 {
439 matched = &matched | chunk.visibility();
441 }
442 }
443 if ANTI_JOIN {
444 matched = !&matched;
445 }
446 right_chunk.set_visibility(matched);
447 if right_chunk.cardinality() > 0 {
448 for spilled in chunk_builder.append_chunk(right_chunk) {
449 yield spilled
450 }
451 }
452 }
453 }
454
455 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
456 async fn do_full_outer_join(
457 chunk_builder: &mut DataChunkBuilder,
458 left_data_types: Vec<DataType>,
459 join_expr: BoxedExpression,
460 left: Vec<DataChunk>,
461 right: BoxedExecutor,
462 shutdown_rx: ShutdownToken,
463 ) {
464 let mut left_matched =
465 BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
466 let right_data_types = right.schema().data_types();
467 #[for_await]
468 for right_chunk in right.execute() {
469 let right_chunk = right_chunk?;
470 let mut right_matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
471 for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
472 shutdown_rx.check()?;
473 let chunk = Self::concatenate_and_eval(
474 join_expr.as_ref(),
475 &left_data_types,
476 left_row,
477 &right_chunk,
478 )
479 .await?;
480 if chunk.cardinality() > 0 {
481 left_matched.set(left_row_idx, true);
482 right_matched = &right_matched | chunk.visibility();
483 for spilled in chunk_builder.append_chunk(chunk) {
484 yield spilled
485 }
486 }
487 }
488 for (right_row, _) in right_chunk
490 .rows()
491 .zip_eq_debug(right_matched.iter())
492 .filter(|(_, matched)| !*matched)
493 {
494 shutdown_rx.check()?;
495 let row = repeat_n(Datum::None, left_data_types.len()).chain(right_row);
496 if let Some(chunk) = chunk_builder.append_one_row(row) {
497 yield chunk
498 }
499 }
500 }
501 for (left_row, _) in left
503 .iter()
504 .flat_map(|chunk| chunk.rows())
505 .zip_eq_debug(left_matched.finish().iter())
506 .filter(|(_, matched)| !*matched)
507 {
508 shutdown_rx.check()?;
509 let row = left_row.chain(repeat_n(Datum::None, right_data_types.len()));
510 if let Some(chunk) = chunk_builder.append_one_row(row) {
511 yield chunk
512 }
513 }
514 }
515}
516#[cfg(test)]
517mod tests {
518 use futures_async_stream::for_await;
519 use risingwave_common::array::*;
520 use risingwave_common::catalog::{Field, Schema};
521 use risingwave_common::memory::MemoryContext;
522 use risingwave_common::types::DataType;
523 use risingwave_expr::expr::build_from_pretty;
524
525 use crate::executor::BoxedExecutor;
526 use crate::executor::join::JoinType;
527 use crate::executor::join::nested_loop_join::NestedLoopJoinExecutor;
528 use crate::executor::test_utils::{MockExecutor, diff_executor_output};
529 use crate::task::ShutdownToken;
530
531 const CHUNK_SIZE: usize = 1024;
532
533 struct TestFixture {
534 join_type: JoinType,
535 }
536
537 impl TestFixture {
553 fn with_join_type(join_type: JoinType) -> Self {
554 Self { join_type }
555 }
556
557 fn create_left_executor(&self) -> BoxedExecutor {
558 let schema = Schema {
559 fields: vec![
560 Field::unnamed(DataType::Int32),
561 Field::unnamed(DataType::Float32),
562 ],
563 };
564 let mut executor = MockExecutor::new(schema);
565
566 executor.add(DataChunk::from_pretty(
567 "i f
568 1 6.1
569 2 8.4
570 3 3.9",
571 ));
572
573 executor.add(DataChunk::from_pretty(
574 "i f
575 3 6.6
576 4 0.7
577 6 5.5
578 6 5.6
579 8 7.0",
580 ));
581
582 Box::new(executor)
583 }
584
585 fn create_right_executor(&self) -> BoxedExecutor {
586 let schema = Schema {
587 fields: vec![
588 Field::unnamed(DataType::Int32),
589 Field::unnamed(DataType::Float64),
590 ],
591 };
592 let mut executor = MockExecutor::new(schema);
593
594 executor.add(DataChunk::from_pretty(
595 "i F
596 2 6.1
597 3 8.9
598 6 3.4
599 8 3.5",
600 ));
601
602 executor.add(DataChunk::from_pretty(
603 " i F
604 9 7.5
605 10 .
606 11 8
607 12 .",
608 ));
609
610 executor.add(DataChunk::from_pretty(
611 " i F
612 20 5.7
613 30 9.6
614 100 .
615 200 8.18",
616 ));
617
618 Box::new(executor)
619 }
620
621 fn create_join_executor(&self, shutdown_rx: ShutdownToken) -> BoxedExecutor {
622 let join_type = self.join_type;
623
624 let left_child = self.create_left_executor();
625 let right_child = self.create_right_executor();
626
627 let output_indices = match self.join_type {
628 JoinType::LeftSemi | JoinType::LeftAnti => vec![0, 1],
629 JoinType::RightSemi | JoinType::RightAnti => vec![0, 1],
630 _ => vec![0, 1, 2, 3],
631 };
632
633 Box::new(NestedLoopJoinExecutor::new(
634 build_from_pretty("(equal:boolean $0:int4 $2:int4)"),
635 join_type,
636 output_indices,
637 left_child,
638 right_child,
639 "NestedLoopJoinExecutor".into(),
640 CHUNK_SIZE,
641 MemoryContext::none(),
642 shutdown_rx,
643 ))
644 }
645
646 async fn do_test(&self, expected: DataChunk) {
647 let join_executor = self.create_join_executor(ShutdownToken::empty());
648 let mut expected_mock_exec = MockExecutor::new(join_executor.schema().clone());
649 expected_mock_exec.add(expected);
650 diff_executor_output(join_executor, Box::new(expected_mock_exec)).await;
651 }
652
653 async fn do_test_shutdown(&self) {
654 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
655 let join_executor = self.create_join_executor(shutdown_rx);
656 shutdown_tx.cancel();
657 #[for_await]
658 for chunk in join_executor.execute() {
659 assert!(chunk.is_err());
660 break;
661 }
662
663 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
664 let join_executor = self.create_join_executor(shutdown_rx);
665 shutdown_tx.abort("test");
666 #[for_await]
667 for chunk in join_executor.execute() {
668 assert!(chunk.is_err());
669 break;
670 }
671 }
672 }
673
674 #[tokio::test]
676 async fn test_inner_join() {
677 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
678
679 let expected_chunk = DataChunk::from_pretty(
680 "i f i F
681 2 8.4 2 6.1
682 3 3.9 3 8.9
683 3 6.6 3 8.9
684 6 5.5 6 3.4
685 6 5.6 6 3.4
686 8 7.0 8 3.5",
687 );
688
689 test_fixture.do_test(expected_chunk).await;
690 }
691
692 #[tokio::test]
694 async fn test_left_outer_join() {
695 let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
696
697 let expected_chunk = DataChunk::from_pretty(
698 "i f i F
699 2 8.4 2 6.1
700 3 3.9 3 8.9
701 3 6.6 3 8.9
702 6 5.5 6 3.4
703 6 5.6 6 3.4
704 8 7.0 8 3.5
705 1 6.1 . .
706 4 0.7 . .",
707 );
708
709 test_fixture.do_test(expected_chunk).await;
710 }
711
712 #[tokio::test]
713 async fn test_left_semi_join() {
714 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
715
716 let expected_chunk = DataChunk::from_pretty(
717 "i f
718 2 8.4
719 3 3.9
720 3 6.6
721 6 5.5
722 6 5.6
723 8 7.0",
724 );
725
726 test_fixture.do_test(expected_chunk).await;
727 }
728
729 #[tokio::test]
730 async fn test_left_anti_join() {
731 let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
732
733 let expected_chunk = DataChunk::from_pretty(
734 "i f
735 1 6.1
736 4 0.7",
737 );
738
739 test_fixture.do_test(expected_chunk).await;
740 }
741
742 #[tokio::test]
743 async fn test_right_outer_join() {
744 let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
745
746 let expected_chunk = DataChunk::from_pretty(
747 "i f i F
748 2 8.4 2 6.1
749 3 3.9 3 8.9
750 3 6.6 3 8.9
751 6 5.5 6 3.4
752 6 5.6 6 3.4
753 8 7.0 8 3.5
754 . . 9 7.5
755 . . 10 .
756 . . 11 8
757 . . 12 .
758 . . 20 5.7
759 . . 30 9.6
760 . . 100 .
761 . . 200 8.18",
762 );
763
764 test_fixture.do_test(expected_chunk).await;
765 }
766
767 #[tokio::test]
768 async fn test_right_semi_join() {
769 let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
770
771 let expected_chunk = DataChunk::from_pretty(
772 "i F
773 2 6.1
774 3 8.9
775 6 3.4
776 8 3.5",
777 );
778
779 test_fixture.do_test(expected_chunk).await;
780 }
781
782 #[tokio::test]
783 async fn test_right_anti_join() {
784 let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
785
786 let expected_chunk = DataChunk::from_pretty(
787 " i F
788 9 7.5
789 10 .
790 11 8
791 12 .
792 20 5.7
793 30 9.6
794 100 .
795 200 8.18",
796 );
797
798 test_fixture.do_test(expected_chunk).await;
799 }
800
801 #[tokio::test]
802 async fn test_full_outer_join() {
803 let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
804
805 let expected_chunk = DataChunk::from_pretty(
806 "i f i F
807 2 8.4 2 6.1
808 3 3.9 3 8.9
809 3 6.6 3 8.9
810 6 5.5 6 3.4
811 6 5.6 6 3.4
812 8 7.0 8 3.5
813 . . 9 7.5
814 . . 10 .
815 . . 11 8
816 . . 12 .
817 . . 20 5.7
818 . . 30 9.6
819 . . 100 .
820 . . 200 8.18
821 1 6.1 . .
822 4 0.7 . .",
823 );
824
825 test_fixture.do_test(expected_chunk).await;
826 }
827
828 #[tokio::test]
829 async fn test_shutdown_rx() {
830 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
831 test_fixture.do_test_shutdown().await;
832 let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
833 test_fixture.do_test_shutdown().await;
834 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
835 test_fixture.do_test_shutdown().await;
836 let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
837 test_fixture.do_test_shutdown().await;
838 let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
839 test_fixture.do_test_shutdown().await;
840 let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
841 test_fixture.do_test_shutdown().await;
842 let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
843 test_fixture.do_test_shutdown().await;
844 }
845}