1use futures_async_stream::try_stream;
16use risingwave_common::array::data_chunk_iter::RowRef;
17use risingwave_common::array::{Array, DataChunk};
18use risingwave_common::bitmap::BitmapBuilder;
19use risingwave_common::catalog::Schema;
20use risingwave_common::memory::MemoryContext;
21use risingwave_common::row::{RowExt, repeat_n};
22use risingwave_common::types::{DataType, Datum};
23use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use risingwave_common_estimate_size::EstimateSize;
26use risingwave_expr::expr::{
27 BoxedExpression, Expression, build_from_prost as expr_build_from_prost,
28};
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30
31use crate::error::{BatchError, Result};
32use crate::executor::join::{JoinType, concatenate, convert_row_to_chunk};
33use crate::executor::{
34 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
35};
36use crate::task::ShutdownToken;
37
38pub struct NestedLoopJoinExecutor {
46 join_expr: BoxedExpression,
48 join_type: JoinType,
50 original_schema: Schema,
52 schema: Schema,
54 output_indices: Vec<usize>,
57 left_child: BoxedExecutor,
59 right_child: BoxedExecutor,
61 identity: String,
63 chunk_size: usize,
65
66 mem_context: MemoryContext,
68
69 shutdown_rx: ShutdownToken,
70}
71
72impl Executor for NestedLoopJoinExecutor {
73 fn schema(&self) -> &Schema {
74 &self.schema
75 }
76
77 fn identity(&self) -> &str {
78 &self.identity
79 }
80
81 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
82 self.do_execute()
83 }
84}
85
86impl NestedLoopJoinExecutor {
87 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
88 async fn do_execute(self: Box<Self>) {
89 let left_data_types = self.left_child.schema().data_types();
90 let data_types = self.original_schema.data_types();
91
92 let mut chunk_builder = DataChunkBuilder::new(data_types, self.chunk_size);
93
94 let left: Vec<DataChunk> = {
96 let mut ret = Vec::with_capacity(1024);
97 #[for_await]
98 for chunk in self.left_child.execute() {
99 let c = chunk?;
100 trace!("Estimated chunk size is {:?}", c.estimated_heap_size());
101 if !self.mem_context.add(c.estimated_heap_size() as i64) {
102 Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
103 }
104 ret.push(c);
105 }
106 ret
107 };
108
109 let stream = match self.join_type {
111 JoinType::Inner => Self::do_inner_join,
112 JoinType::LeftOuter => Self::do_left_outer_join,
113 JoinType::LeftSemi => Self::do_left_semi_anti_join::<false>,
114 JoinType::LeftAnti => Self::do_left_semi_anti_join::<true>,
115 JoinType::RightOuter => Self::do_right_outer_join,
116 JoinType::RightSemi => Self::do_right_semi_anti_join::<false>,
117 JoinType::RightAnti => Self::do_right_semi_anti_join::<true>,
118 JoinType::FullOuter => Self::do_full_outer_join,
119 JoinType::AsOfInner | JoinType::AsOfLeftOuter => {
120 unimplemented!("AsOf join is not supported in NestedLoopJoinExecutor")
121 }
122 };
123
124 #[for_await]
125 for chunk in stream(
126 &mut chunk_builder,
127 left_data_types,
128 self.join_expr,
129 left,
130 self.right_child,
131 self.shutdown_rx.clone(),
132 ) {
133 yield chunk?.project(&self.output_indices)
134 }
135
136 if let Some(chunk) = chunk_builder.consume_all() {
138 yield chunk.project(&self.output_indices)
139 }
140 }
141}
142
143impl NestedLoopJoinExecutor {
144 async fn concatenate_and_eval(
147 expr: &dyn Expression,
148 left_row_types: &[DataType],
149 left_row: RowRef<'_>,
150 right_chunk: &DataChunk,
151 ) -> Result<DataChunk> {
152 let left_chunk = convert_row_to_chunk(&left_row, right_chunk.capacity(), left_row_types)?;
153 let mut chunk = concatenate(&left_chunk, right_chunk)?;
154 chunk.set_visibility(expr.eval(&chunk).await?.as_bool().iter().collect());
155 Ok(chunk)
156 }
157}
158
159impl BoxedExecutorBuilder for NestedLoopJoinExecutor {
160 async fn new_boxed_executor(
161 source: &ExecutorBuilder<'_>,
162 inputs: Vec<BoxedExecutor>,
163 ) -> Result<BoxedExecutor> {
164 let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap();
165
166 let nested_loop_join_node = try_match_expand!(
167 source.plan_node().get_node_body().unwrap(),
168 NodeBody::NestedLoopJoin
169 )?;
170
171 let join_type = JoinType::from_prost(nested_loop_join_node.get_join_type()?);
172 let join_expr = expr_build_from_prost(nested_loop_join_node.get_join_cond()?)?;
173
174 let output_indices = nested_loop_join_node
175 .output_indices
176 .iter()
177 .map(|&v| v as usize)
178 .collect();
179
180 let identity = source.plan_node().get_identity().clone();
181 let mem_context = source.context().create_executor_mem_context(&identity);
182
183 Ok(Box::new(NestedLoopJoinExecutor::new(
184 join_expr,
185 join_type,
186 output_indices,
187 left_child,
188 right_child,
189 identity,
190 source.context().get_config().developer.chunk_size,
191 mem_context,
192 source.shutdown_rx().clone(),
193 )))
194 }
195}
196
197impl NestedLoopJoinExecutor {
198 pub fn new(
199 join_expr: BoxedExpression,
200 join_type: JoinType,
201 output_indices: Vec<usize>,
202 left_child: BoxedExecutor,
203 right_child: BoxedExecutor,
204 identity: String,
205 chunk_size: usize,
206 mem_context: MemoryContext,
207 shutdown_rx: ShutdownToken,
208 ) -> Self {
209 let original_schema = match join_type {
211 JoinType::LeftSemi | JoinType::LeftAnti => left_child.schema().clone(),
212 JoinType::RightSemi | JoinType::RightAnti => right_child.schema().clone(),
213 _ => Schema::from_iter(
214 left_child
215 .schema()
216 .fields()
217 .iter()
218 .chain(right_child.schema().fields().iter())
219 .cloned(),
220 ),
221 };
222 let schema = Schema::from_iter(
223 output_indices
224 .iter()
225 .map(|&idx| original_schema[idx].clone()),
226 );
227 Self {
228 join_expr,
229 join_type,
230 original_schema,
231 schema,
232 output_indices,
233 left_child,
234 right_child,
235 identity,
236 chunk_size,
237 mem_context,
238 shutdown_rx,
239 }
240 }
241}
242
243impl NestedLoopJoinExecutor {
244 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
245 async fn do_inner_join(
246 chunk_builder: &mut DataChunkBuilder,
247 left_data_types: Vec<DataType>,
248 join_expr: BoxedExpression,
249 left: Vec<DataChunk>,
250 right: BoxedExecutor,
251 shutdown_rx: ShutdownToken,
252 ) {
253 #[for_await]
255 for right_chunk in right.execute() {
256 let right_chunk = right_chunk?;
257 for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
259 shutdown_rx.check()?;
260 let chunk = Self::concatenate_and_eval(
263 join_expr.as_ref(),
264 &left_data_types,
265 left_row,
266 &right_chunk,
267 )
268 .await?;
269 if chunk.cardinality() > 0 {
271 for spilled in chunk_builder.append_chunk(chunk) {
272 yield spilled
273 }
274 }
275 }
276 }
277 }
278
279 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
280 async fn do_left_outer_join(
281 chunk_builder: &mut DataChunkBuilder,
282 left_data_types: Vec<DataType>,
283 join_expr: BoxedExpression,
284 left: Vec<DataChunk>,
285 right: BoxedExecutor,
286 shutdown_rx: ShutdownToken,
287 ) {
288 let mut matched = BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
289 let right_data_types = right.schema().data_types();
290 #[for_await]
293 for right_chunk in right.execute() {
294 let right_chunk = right_chunk?;
295 for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
296 shutdown_rx.check()?;
297 let chunk = Self::concatenate_and_eval(
298 join_expr.as_ref(),
299 &left_data_types,
300 left_row,
301 &right_chunk,
302 )
303 .await?;
304 if chunk.cardinality() > 0 {
305 matched.set(left_row_idx, true);
306 for spilled in chunk_builder.append_chunk(chunk) {
307 yield spilled
308 }
309 }
310 }
311 }
312 for (left_row, _) in left
314 .iter()
315 .flat_map(|chunk| chunk.rows())
316 .zip_eq_debug(matched.finish().iter())
317 .filter(|(_, matched)| !*matched)
318 {
319 shutdown_rx.check()?;
320 let row = left_row.chain(repeat_n(Datum::None, right_data_types.len()));
321 if let Some(chunk) = chunk_builder.append_one_row(row) {
322 yield chunk
323 }
324 }
325 }
326
327 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
328 async fn do_left_semi_anti_join<const ANTI_JOIN: bool>(
329 chunk_builder: &mut DataChunkBuilder,
330 left_data_types: Vec<DataType>,
331 join_expr: BoxedExpression,
332 left: Vec<DataChunk>,
333 right: BoxedExecutor,
334 shutdown_rx: ShutdownToken,
335 ) {
336 let mut matched = BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
337 #[for_await]
338 for right_chunk in right.execute() {
339 let right_chunk = right_chunk?;
340 for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
341 shutdown_rx.check()?;
342 if matched.is_set(left_row_idx) {
343 continue;
344 }
345 let chunk = Self::concatenate_and_eval(
346 join_expr.as_ref(),
347 &left_data_types,
348 left_row,
349 &right_chunk,
350 )
351 .await?;
352 if chunk.cardinality() > 0 {
353 matched.set(left_row_idx, true)
354 }
355 }
356 }
357 for (left_row, _) in left
358 .iter()
359 .flat_map(|chunk| chunk.rows())
360 .zip_eq_debug(matched.finish().iter())
361 .filter(|(_, matched)| if ANTI_JOIN { !*matched } else { *matched })
362 {
363 shutdown_rx.check()?;
364 if let Some(chunk) = chunk_builder.append_one_row(left_row) {
365 yield chunk
366 }
367 }
368 }
369
370 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
371 async fn do_right_outer_join(
372 chunk_builder: &mut DataChunkBuilder,
373 left_data_types: Vec<DataType>,
374 join_expr: BoxedExpression,
375 left: Vec<DataChunk>,
376 right: BoxedExecutor,
377 shutdown_rx: ShutdownToken,
378 ) {
379 #[for_await]
380 for right_chunk in right.execute() {
381 let right_chunk = right_chunk?;
382 let mut matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
384 for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
385 shutdown_rx.check()?;
386 let chunk = Self::concatenate_and_eval(
387 join_expr.as_ref(),
388 &left_data_types,
389 left_row,
390 &right_chunk,
391 )
392 .await?;
393 if chunk.cardinality() > 0 {
394 matched = &matched | chunk.visibility();
396 for spilled in chunk_builder.append_chunk(chunk) {
397 yield spilled
398 }
399 }
400 }
401 for (right_row, _) in right_chunk
402 .rows()
403 .zip_eq_debug(matched.iter())
404 .filter(|(_, matched)| !*matched)
405 {
406 shutdown_rx.check()?;
407 let row = repeat_n(Datum::None, left_data_types.len()).chain(right_row);
408 if let Some(chunk) = chunk_builder.append_one_row(row) {
409 yield chunk
410 }
411 }
412 }
413 }
414
415 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
416 async fn do_right_semi_anti_join<const ANTI_JOIN: bool>(
417 chunk_builder: &mut DataChunkBuilder,
418 left_data_types: Vec<DataType>,
419 join_expr: BoxedExpression,
420 left: Vec<DataChunk>,
421 right: BoxedExecutor,
422 shutdown_rx: ShutdownToken,
423 ) {
424 #[for_await]
425 for right_chunk in right.execute() {
426 let mut right_chunk = right_chunk?;
427 let mut matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
428 for left_row in left.iter().flat_map(|chunk| chunk.rows()) {
429 shutdown_rx.check()?;
430 let chunk = Self::concatenate_and_eval(
431 join_expr.as_ref(),
432 &left_data_types,
433 left_row,
434 &right_chunk,
435 )
436 .await?;
437 if chunk.cardinality() > 0 {
438 matched = &matched | chunk.visibility();
440 }
441 }
442 if ANTI_JOIN {
443 matched = !&matched;
444 }
445 right_chunk.set_visibility(matched);
446 if right_chunk.cardinality() > 0 {
447 for spilled in chunk_builder.append_chunk(right_chunk) {
448 yield spilled
449 }
450 }
451 }
452 }
453
454 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
455 async fn do_full_outer_join(
456 chunk_builder: &mut DataChunkBuilder,
457 left_data_types: Vec<DataType>,
458 join_expr: BoxedExpression,
459 left: Vec<DataChunk>,
460 right: BoxedExecutor,
461 shutdown_rx: ShutdownToken,
462 ) {
463 let mut left_matched =
464 BitmapBuilder::zeroed(left.iter().map(|chunk| chunk.capacity()).sum());
465 let right_data_types = right.schema().data_types();
466 #[for_await]
467 for right_chunk in right.execute() {
468 let right_chunk = right_chunk?;
469 let mut right_matched = BitmapBuilder::zeroed(right_chunk.capacity()).finish();
470 for (left_row_idx, left_row) in left.iter().flat_map(|chunk| chunk.rows()).enumerate() {
471 shutdown_rx.check()?;
472 let chunk = Self::concatenate_and_eval(
473 join_expr.as_ref(),
474 &left_data_types,
475 left_row,
476 &right_chunk,
477 )
478 .await?;
479 if chunk.cardinality() > 0 {
480 left_matched.set(left_row_idx, true);
481 right_matched = &right_matched | chunk.visibility();
482 for spilled in chunk_builder.append_chunk(chunk) {
483 yield spilled
484 }
485 }
486 }
487 for (right_row, _) in right_chunk
489 .rows()
490 .zip_eq_debug(right_matched.iter())
491 .filter(|(_, matched)| !*matched)
492 {
493 shutdown_rx.check()?;
494 let row = repeat_n(Datum::None, left_data_types.len()).chain(right_row);
495 if let Some(chunk) = chunk_builder.append_one_row(row) {
496 yield chunk
497 }
498 }
499 }
500 for (left_row, _) in left
502 .iter()
503 .flat_map(|chunk| chunk.rows())
504 .zip_eq_debug(left_matched.finish().iter())
505 .filter(|(_, matched)| !*matched)
506 {
507 shutdown_rx.check()?;
508 let row = left_row.chain(repeat_n(Datum::None, right_data_types.len()));
509 if let Some(chunk) = chunk_builder.append_one_row(row) {
510 yield chunk
511 }
512 }
513 }
514}
515#[cfg(test)]
516mod tests {
517 use futures_async_stream::for_await;
518 use risingwave_common::array::*;
519 use risingwave_common::catalog::{Field, Schema};
520 use risingwave_common::memory::MemoryContext;
521 use risingwave_common::types::DataType;
522 use risingwave_expr::expr::build_from_pretty;
523
524 use crate::executor::BoxedExecutor;
525 use crate::executor::join::JoinType;
526 use crate::executor::join::nested_loop_join::NestedLoopJoinExecutor;
527 use crate::executor::test_utils::{MockExecutor, diff_executor_output};
528 use crate::task::ShutdownToken;
529
530 const CHUNK_SIZE: usize = 1024;
531
532 struct TestFixture {
533 join_type: JoinType,
534 }
535
536 impl TestFixture {
552 fn with_join_type(join_type: JoinType) -> Self {
553 Self { join_type }
554 }
555
556 fn create_left_executor(&self) -> BoxedExecutor {
557 let schema = Schema {
558 fields: vec![
559 Field::unnamed(DataType::Int32),
560 Field::unnamed(DataType::Float32),
561 ],
562 };
563 let mut executor = MockExecutor::new(schema);
564
565 executor.add(DataChunk::from_pretty(
566 "i f
567 1 6.1
568 2 8.4
569 3 3.9",
570 ));
571
572 executor.add(DataChunk::from_pretty(
573 "i f
574 3 6.6
575 4 0.7
576 6 5.5
577 6 5.6
578 8 7.0",
579 ));
580
581 Box::new(executor)
582 }
583
584 fn create_right_executor(&self) -> BoxedExecutor {
585 let schema = Schema {
586 fields: vec![
587 Field::unnamed(DataType::Int32),
588 Field::unnamed(DataType::Float64),
589 ],
590 };
591 let mut executor = MockExecutor::new(schema);
592
593 executor.add(DataChunk::from_pretty(
594 "i F
595 2 6.1
596 3 8.9
597 6 3.4
598 8 3.5",
599 ));
600
601 executor.add(DataChunk::from_pretty(
602 " i F
603 9 7.5
604 10 .
605 11 8
606 12 .",
607 ));
608
609 executor.add(DataChunk::from_pretty(
610 " i F
611 20 5.7
612 30 9.6
613 100 .
614 200 8.18",
615 ));
616
617 Box::new(executor)
618 }
619
620 fn create_join_executor(&self, shutdown_rx: ShutdownToken) -> BoxedExecutor {
621 let join_type = self.join_type;
622
623 let left_child = self.create_left_executor();
624 let right_child = self.create_right_executor();
625
626 let output_indices = match self.join_type {
627 JoinType::LeftSemi | JoinType::LeftAnti => vec![0, 1],
628 JoinType::RightSemi | JoinType::RightAnti => vec![0, 1],
629 _ => vec![0, 1, 2, 3],
630 };
631
632 Box::new(NestedLoopJoinExecutor::new(
633 build_from_pretty("(equal:boolean $0:int4 $2:int4)"),
634 join_type,
635 output_indices,
636 left_child,
637 right_child,
638 "NestedLoopJoinExecutor".into(),
639 CHUNK_SIZE,
640 MemoryContext::none(),
641 shutdown_rx,
642 ))
643 }
644
645 async fn do_test(&self, expected: DataChunk) {
646 let join_executor = self.create_join_executor(ShutdownToken::empty());
647 let mut expected_mock_exec = MockExecutor::new(join_executor.schema().clone());
648 expected_mock_exec.add(expected);
649 diff_executor_output(join_executor, Box::new(expected_mock_exec)).await;
650 }
651
652 async fn do_test_shutdown(&self) {
653 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
654 let join_executor = self.create_join_executor(shutdown_rx);
655 shutdown_tx.cancel();
656 #[for_await]
657 for chunk in join_executor.execute() {
658 assert!(chunk.is_err());
659 break;
660 }
661
662 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
663 let join_executor = self.create_join_executor(shutdown_rx);
664 shutdown_tx.abort("test");
665 #[for_await]
666 for chunk in join_executor.execute() {
667 assert!(chunk.is_err());
668 break;
669 }
670 }
671 }
672
673 #[tokio::test]
675 async fn test_inner_join() {
676 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
677
678 let expected_chunk = DataChunk::from_pretty(
679 "i f i F
680 2 8.4 2 6.1
681 3 3.9 3 8.9
682 3 6.6 3 8.9
683 6 5.5 6 3.4
684 6 5.6 6 3.4
685 8 7.0 8 3.5",
686 );
687
688 test_fixture.do_test(expected_chunk).await;
689 }
690
691 #[tokio::test]
693 async fn test_left_outer_join() {
694 let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
695
696 let expected_chunk = DataChunk::from_pretty(
697 "i f i F
698 2 8.4 2 6.1
699 3 3.9 3 8.9
700 3 6.6 3 8.9
701 6 5.5 6 3.4
702 6 5.6 6 3.4
703 8 7.0 8 3.5
704 1 6.1 . .
705 4 0.7 . .",
706 );
707
708 test_fixture.do_test(expected_chunk).await;
709 }
710
711 #[tokio::test]
712 async fn test_left_semi_join() {
713 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
714
715 let expected_chunk = DataChunk::from_pretty(
716 "i f
717 2 8.4
718 3 3.9
719 3 6.6
720 6 5.5
721 6 5.6
722 8 7.0",
723 );
724
725 test_fixture.do_test(expected_chunk).await;
726 }
727
728 #[tokio::test]
729 async fn test_left_anti_join() {
730 let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
731
732 let expected_chunk = DataChunk::from_pretty(
733 "i f
734 1 6.1
735 4 0.7",
736 );
737
738 test_fixture.do_test(expected_chunk).await;
739 }
740
741 #[tokio::test]
742 async fn test_right_outer_join() {
743 let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
744
745 let expected_chunk = DataChunk::from_pretty(
746 "i f i F
747 2 8.4 2 6.1
748 3 3.9 3 8.9
749 3 6.6 3 8.9
750 6 5.5 6 3.4
751 6 5.6 6 3.4
752 8 7.0 8 3.5
753 . . 9 7.5
754 . . 10 .
755 . . 11 8
756 . . 12 .
757 . . 20 5.7
758 . . 30 9.6
759 . . 100 .
760 . . 200 8.18",
761 );
762
763 test_fixture.do_test(expected_chunk).await;
764 }
765
766 #[tokio::test]
767 async fn test_right_semi_join() {
768 let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
769
770 let expected_chunk = DataChunk::from_pretty(
771 "i F
772 2 6.1
773 3 8.9
774 6 3.4
775 8 3.5",
776 );
777
778 test_fixture.do_test(expected_chunk).await;
779 }
780
781 #[tokio::test]
782 async fn test_right_anti_join() {
783 let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
784
785 let expected_chunk = DataChunk::from_pretty(
786 " i F
787 9 7.5
788 10 .
789 11 8
790 12 .
791 20 5.7
792 30 9.6
793 100 .
794 200 8.18",
795 );
796
797 test_fixture.do_test(expected_chunk).await;
798 }
799
800 #[tokio::test]
801 async fn test_full_outer_join() {
802 let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
803
804 let expected_chunk = DataChunk::from_pretty(
805 "i f i F
806 2 8.4 2 6.1
807 3 3.9 3 8.9
808 3 6.6 3 8.9
809 6 5.5 6 3.4
810 6 5.6 6 3.4
811 8 7.0 8 3.5
812 . . 9 7.5
813 . . 10 .
814 . . 11 8
815 . . 12 .
816 . . 20 5.7
817 . . 30 9.6
818 . . 100 .
819 . . 200 8.18
820 1 6.1 . .
821 4 0.7 . .",
822 );
823
824 test_fixture.do_test(expected_chunk).await;
825 }
826
827 #[tokio::test]
828 async fn test_shutdown_rx() {
829 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
830 test_fixture.do_test_shutdown().await;
831 let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
832 test_fixture.do_test_shutdown().await;
833 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
834 test_fixture.do_test_shutdown().await;
835 let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
836 test_fixture.do_test_shutdown().await;
837 let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
838 test_fixture.do_test_shutdown().await;
839 let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
840 test_fixture.do_test_shutdown().await;
841 let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
842 test_fixture.do_test_shutdown().await;
843 }
844}