1use std::cmp::Ordering;
16use std::iter;
17use std::iter::empty;
18use std::marker::PhantomData;
19use std::sync::Arc;
20
21use bytes::Bytes;
22use futures_async_stream::try_stream;
23use itertools::Itertools;
24use risingwave_common::array::{Array, DataChunk, RowRef};
25use risingwave_common::bitmap::{Bitmap, BitmapBuilder, FilterByBitmap};
26use risingwave_common::catalog::Schema;
27use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher};
28use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc};
29use risingwave_common::row::{Row, RowExt, repeat_n};
30use risingwave_common::types::{DataType, Datum, DefaultOrd};
31use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
32use risingwave_common::util::iter_util::ZipEqFast;
33use risingwave_common_estimate_size::EstimateSize;
34use risingwave_expr::expr::{BoxedExpression, Expression, build_from_prost};
35use risingwave_pb::Message;
36use risingwave_pb::batch_plan::plan_node::NodeBody;
37use risingwave_pb::data::DataChunk as PbDataChunk;
38
39use super::{AsOfDesc, AsOfInequalityType, ChunkedData, JoinType, RowId};
40use crate::error::{BatchError, Result};
41use crate::executor::{
42 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
43 WrapStreamExecutor,
44};
45use crate::monitor::BatchSpillMetrics;
46use crate::risingwave_common::hash::NullBitmap;
47use crate::spill::spill_op::SpillBackend::Disk;
48use crate::spill::spill_op::{
49 DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY, SpillBackend, SpillBuildHasher, SpillOp,
50};
51use crate::task::ShutdownToken;
52
53pub struct HashJoinExecutor<K> {
63 join_type: JoinType,
65 #[expect(dead_code)]
67 original_schema: Schema,
68 schema: Schema,
70 output_indices: Vec<usize>,
72 probe_side_source: BoxedExecutor,
74 build_side_source: BoxedExecutor,
76 probe_key_idxs: Vec<usize>,
78 build_key_idxs: Vec<usize>,
80 cond: Option<Arc<BoxedExpression>>,
82 null_matched: Vec<bool>,
85 identity: String,
86 chunk_size: usize,
87 asof_desc: Option<AsOfDesc>,
89
90 spill_backend: Option<SpillBackend>,
91 spill_metrics: Arc<BatchSpillMetrics>,
92 memory_upper_bound: Option<u64>,
94
95 shutdown_rx: ShutdownToken,
96
97 mem_ctx: MemoryContext,
98 _phantom: PhantomData<K>,
99}
100
101impl<K: HashKey> Executor for HashJoinExecutor<K> {
102 fn schema(&self) -> &Schema {
103 &self.schema
104 }
105
106 fn identity(&self) -> &str {
107 &self.identity
108 }
109
110 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
111 self.do_execute()
112 }
113}
114
115pub type JoinHashMap<K> =
148 hashbrown::HashMap<K, RowId, PrecomputedBuildHasher, MonitoredGlobalAlloc>;
149
150struct RowIdIter<'a> {
151 current_row_id: Option<RowId>,
152 next_row_id: &'a ChunkedData<Option<RowId>>,
153}
154
155impl ChunkedData<Option<RowId>> {
156 fn row_id_iter(&self, begin: Option<RowId>) -> RowIdIter<'_> {
157 RowIdIter {
158 current_row_id: begin,
159 next_row_id: self,
160 }
161 }
162}
163
164impl Iterator for RowIdIter<'_> {
165 type Item = RowId;
166
167 fn next(&mut self) -> Option<Self::Item> {
168 self.current_row_id.inspect(|row_id| {
169 self.current_row_id = self.next_row_id[*row_id];
170 })
171 }
172}
173
174pub struct EquiJoinParams<K> {
175 probe_side: BoxedExecutor,
176 probe_data_types: Vec<DataType>,
177 probe_key_idxs: Vec<usize>,
178 build_side: Vec<DataChunk, MonitoredGlobalAlloc>,
179 build_data_types: Vec<DataType>,
180 full_data_types: Vec<DataType>,
181 hash_map: JoinHashMap<K>,
182 next_build_row_with_same_key: ChunkedData<Option<RowId>>,
183 chunk_size: usize,
184 shutdown_rx: ShutdownToken,
185 asof_desc: Option<AsOfDesc>,
186}
187
188impl<K> EquiJoinParams<K> {
189 #[allow(clippy::too_many_arguments)]
190 pub(super) fn new(
191 probe_side: BoxedExecutor,
192 probe_data_types: Vec<DataType>,
193 probe_key_idxs: Vec<usize>,
194 build_side: Vec<DataChunk, MonitoredGlobalAlloc>,
195 build_data_types: Vec<DataType>,
196 full_data_types: Vec<DataType>,
197 hash_map: JoinHashMap<K>,
198 next_build_row_with_same_key: ChunkedData<Option<RowId>>,
199 chunk_size: usize,
200 shutdown_rx: ShutdownToken,
201 asof_desc: Option<AsOfDesc>,
202 ) -> Self {
203 Self {
204 probe_side,
205 probe_data_types,
206 probe_key_idxs,
207 build_side,
208 build_data_types,
209 full_data_types,
210 hash_map,
211 next_build_row_with_same_key,
212 chunk_size,
213 shutdown_rx,
214 asof_desc,
215 }
216 }
217
218 pub(crate) fn is_asof_join(&self) -> bool {
219 self.asof_desc.is_some()
220 }
221}
222
223#[derive(Default)]
225struct LeftNonEquiJoinState {
226 probe_column_count: usize,
228 first_output_row_id: Vec<usize>,
231 has_more_output_rows: bool,
233 found_matched: bool,
236}
237
238#[derive(Default)]
240struct RightNonEquiJoinState {
241 build_row_ids: Vec<RowId>,
243 build_row_matched: ChunkedData<bool>,
245}
246
247pub struct JoinSpillManager {
248 op: SpillOp,
249 partition_num: usize,
250 probe_side_writers: Vec<opendal::Writer>,
251 build_side_writers: Vec<opendal::Writer>,
252 probe_side_chunk_builders: Vec<DataChunkBuilder>,
253 build_side_chunk_builders: Vec<DataChunkBuilder>,
254 spill_build_hasher: SpillBuildHasher,
255 probe_side_data_types: Vec<DataType>,
256 build_side_data_types: Vec<DataType>,
257 spill_chunk_size: usize,
258 spill_metrics: Arc<BatchSpillMetrics>,
259}
260
261impl JoinSpillManager {
276 pub fn new(
277 spill_backend: SpillBackend,
278 join_identity: &String,
279 partition_num: usize,
280 probe_side_data_types: Vec<DataType>,
281 build_side_data_types: Vec<DataType>,
282 spill_chunk_size: usize,
283 spill_metrics: Arc<BatchSpillMetrics>,
284 ) -> Result<Self> {
285 let suffix_uuid = uuid::Uuid::new_v4();
286 let dir = format!("/{}-{}/", join_identity, suffix_uuid);
287 let op = SpillOp::create(dir, spill_backend)?;
288 let probe_side_writers = Vec::with_capacity(partition_num);
289 let build_side_writers = Vec::with_capacity(partition_num);
290 let probe_side_chunk_builders = Vec::with_capacity(partition_num);
291 let build_side_chunk_builders = Vec::with_capacity(partition_num);
292 let spill_build_hasher = SpillBuildHasher(suffix_uuid.as_u64_pair().1);
293 Ok(Self {
294 op,
295 partition_num,
296 probe_side_writers,
297 build_side_writers,
298 probe_side_chunk_builders,
299 build_side_chunk_builders,
300 spill_build_hasher,
301 probe_side_data_types,
302 build_side_data_types,
303 spill_chunk_size,
304 spill_metrics,
305 })
306 }
307
308 pub async fn init_writers(&mut self) -> Result<()> {
309 for i in 0..self.partition_num {
310 let join_probe_side_partition_file_name = format!("join-probe-side-p{}", i);
311 let w = self
312 .op
313 .writer_with(&join_probe_side_partition_file_name)
314 .await?;
315 self.probe_side_writers.push(w);
316
317 let join_build_side_partition_file_name = format!("join-build-side-p{}", i);
318 let w = self
319 .op
320 .writer_with(&join_build_side_partition_file_name)
321 .await?;
322 self.build_side_writers.push(w);
323 self.probe_side_chunk_builders.push(DataChunkBuilder::new(
324 self.probe_side_data_types.clone(),
325 self.spill_chunk_size,
326 ));
327 self.build_side_chunk_builders.push(DataChunkBuilder::new(
328 self.build_side_data_types.clone(),
329 self.spill_chunk_size,
330 ));
331 }
332 Ok(())
333 }
334
335 pub async fn write_probe_side_chunk(
336 &mut self,
337 chunk: DataChunk,
338 hash_codes: Vec<u64>,
339 ) -> Result<()> {
340 let (columns, vis) = chunk.into_parts_v2();
341 for partition in 0..self.partition_num {
342 let new_vis = vis.clone()
343 & Bitmap::from_iter(
344 hash_codes
345 .iter()
346 .map(|hash_code| (*hash_code as usize % self.partition_num) == partition),
347 );
348 let new_chunk = DataChunk::from_parts(columns.clone(), new_vis);
349 for output_chunk in self.probe_side_chunk_builders[partition].append_chunk(new_chunk) {
350 let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
351 let buf = Message::encode_to_vec(&chunk_pb);
352 let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
353 self.spill_metrics
354 .batch_spill_write_bytes
355 .inc_by((buf.len() + len_bytes.len()) as u64);
356 self.probe_side_writers[partition].write(len_bytes).await?;
357 self.probe_side_writers[partition].write(buf).await?;
358 }
359 }
360 Ok(())
361 }
362
363 pub async fn write_build_side_chunk(
364 &mut self,
365 chunk: DataChunk,
366 hash_codes: Vec<u64>,
367 ) -> Result<()> {
368 let (columns, vis) = chunk.into_parts_v2();
369 for partition in 0..self.partition_num {
370 let new_vis = vis.clone()
371 & Bitmap::from_iter(
372 hash_codes
373 .iter()
374 .map(|hash_code| (*hash_code as usize % self.partition_num) == partition),
375 );
376 let new_chunk = DataChunk::from_parts(columns.clone(), new_vis);
377 for output_chunk in self.build_side_chunk_builders[partition].append_chunk(new_chunk) {
378 let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
379 let buf = Message::encode_to_vec(&chunk_pb);
380 let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
381 self.spill_metrics
382 .batch_spill_write_bytes
383 .inc_by((buf.len() + len_bytes.len()) as u64);
384 self.build_side_writers[partition].write(len_bytes).await?;
385 self.build_side_writers[partition].write(buf).await?;
386 }
387 }
388 Ok(())
389 }
390
391 pub async fn close_writers(&mut self) -> Result<()> {
392 for partition in 0..self.partition_num {
393 if let Some(output_chunk) = self.probe_side_chunk_builders[partition].consume_all() {
394 let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
395 let buf = Message::encode_to_vec(&chunk_pb);
396 let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
397 self.spill_metrics
398 .batch_spill_write_bytes
399 .inc_by((buf.len() + len_bytes.len()) as u64);
400 self.probe_side_writers[partition].write(len_bytes).await?;
401 self.probe_side_writers[partition].write(buf).await?;
402 }
403
404 if let Some(output_chunk) = self.build_side_chunk_builders[partition].consume_all() {
405 let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
406 let buf = Message::encode_to_vec(&chunk_pb);
407 let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
408 self.spill_metrics
409 .batch_spill_write_bytes
410 .inc_by((buf.len() + len_bytes.len()) as u64);
411 self.build_side_writers[partition].write(len_bytes).await?;
412 self.build_side_writers[partition].write(buf).await?;
413 }
414 }
415
416 for mut w in self.probe_side_writers.drain(..) {
417 w.close().await?;
418 }
419 for mut w in self.build_side_writers.drain(..) {
420 w.close().await?;
421 }
422 Ok(())
423 }
424
425 async fn read_probe_side_partition(
426 &mut self,
427 partition: usize,
428 ) -> Result<BoxedDataChunkStream> {
429 let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
430 let r = self
431 .op
432 .reader_with(&join_probe_side_partition_file_name)
433 .await?;
434 Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
435 }
436
437 async fn read_build_side_partition(
438 &mut self,
439 partition: usize,
440 ) -> Result<BoxedDataChunkStream> {
441 let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
442 let r = self
443 .op
444 .reader_with(&join_build_side_partition_file_name)
445 .await?;
446 Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
447 }
448
449 pub async fn estimate_partition_size(&self, partition: usize) -> Result<u64> {
450 let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
451 let probe_size = self
452 .op
453 .stat(&join_probe_side_partition_file_name)
454 .await?
455 .content_length();
456 let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
457 let build_size = self
458 .op
459 .stat(&join_build_side_partition_file_name)
460 .await?
461 .content_length();
462 Ok(probe_size + build_size)
463 }
464
465 async fn clear_partition(&mut self, partition: usize) -> Result<()> {
466 let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
467 self.op.delete(&join_probe_side_partition_file_name).await?;
468 let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
469 self.op.delete(&join_build_side_partition_file_name).await?;
470 Ok(())
471 }
472}
473
474impl<K: HashKey> HashJoinExecutor<K> {
475 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
476 async fn do_execute(self: Box<Self>) {
477 let mut need_to_spill = false;
478 let check_memory = match self.memory_upper_bound {
480 Some(upper_bound) => upper_bound > SPILL_AT_LEAST_MEMORY,
481 None => true,
482 };
483
484 let probe_schema = self.probe_side_source.schema().clone();
485 let build_schema = self.build_side_source.schema().clone();
486 let probe_data_types = self.probe_side_source.schema().data_types();
487 let build_data_types = self.build_side_source.schema().data_types();
488 let full_data_types = [probe_data_types.clone(), build_data_types.clone()].concat();
489
490 let mut build_side = Vec::new_in(self.mem_ctx.global_allocator());
491 let mut build_row_count = 0;
492 let mut build_side_stream = self.build_side_source.execute();
493 #[for_await]
494 for build_chunk in &mut build_side_stream {
495 let build_chunk = build_chunk?;
496 if build_chunk.cardinality() > 0 {
497 build_row_count += build_chunk.cardinality();
498 let chunk_estimated_heap_size = build_chunk.estimated_heap_size();
499 build_side.push(build_chunk);
501 if !self.mem_ctx.add(chunk_estimated_heap_size as i64) && check_memory {
502 if self.spill_backend.is_some() {
503 need_to_spill = true;
504 break;
505 } else {
506 Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
507 }
508 }
509 }
510 }
511 let mut hash_map = JoinHashMap::with_capacity_and_hasher_in(
512 build_row_count,
513 PrecomputedBuildHasher,
514 self.mem_ctx.global_allocator(),
515 );
516 let mut next_build_row_with_same_key =
517 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
518
519 let null_matched = K::Bitmap::from_bool_vec(self.null_matched.clone());
520
521 let mut mem_added_by_hash_table = 0;
522 if !need_to_spill {
523 for (build_chunk_id, build_chunk) in build_side.iter().enumerate() {
525 let build_keys = K::build_many(&self.build_key_idxs, build_chunk);
526
527 for (build_row_id, build_key) in build_keys
528 .into_iter()
529 .enumerate()
530 .filter_by_bitmap(build_chunk.visibility())
531 {
532 self.shutdown_rx.check()?;
533 if build_key.null_bitmap().is_subset(&null_matched) {
535 let row_id = RowId::new(build_chunk_id, build_row_id);
536 let build_key_size = build_key.estimated_heap_size() as i64;
537 mem_added_by_hash_table += build_key_size;
538 if !self.mem_ctx.add(build_key_size) && check_memory {
539 if self.spill_backend.is_some() {
540 need_to_spill = true;
541 break;
542 } else {
543 Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
544 }
545 }
546 next_build_row_with_same_key[row_id] = hash_map.insert(build_key, row_id);
547 }
548 }
549 }
550 }
551
552 if need_to_spill {
553 info!(
560 "batch hash join executor {} starts to spill out",
561 &self.identity
562 );
563 let mut join_spill_manager = JoinSpillManager::new(
564 self.spill_backend.clone().unwrap(),
565 &self.identity,
566 DEFAULT_SPILL_PARTITION_NUM,
567 probe_data_types.clone(),
568 build_data_types.clone(),
569 self.chunk_size,
570 self.spill_metrics.clone(),
571 )?;
572 join_spill_manager.init_writers().await?;
573
574 self.mem_ctx.add(-mem_added_by_hash_table);
576 drop(hash_map);
577 drop(next_build_row_with_same_key);
578
579 for chunk in build_side {
581 self.mem_ctx.add(-(chunk.estimated_heap_size() as i64));
583 let hash_codes = chunk.get_hash_values(
584 self.build_key_idxs.as_slice(),
585 join_spill_manager.spill_build_hasher,
586 );
587 join_spill_manager
588 .write_build_side_chunk(
589 chunk,
590 hash_codes
591 .into_iter()
592 .map(|hash_code| hash_code.value())
593 .collect(),
594 )
595 .await?;
596 }
597
598 #[for_await]
600 for chunk in build_side_stream {
601 let chunk = chunk?;
602 let hash_codes = chunk.get_hash_values(
603 self.build_key_idxs.as_slice(),
604 join_spill_manager.spill_build_hasher,
605 );
606 join_spill_manager
607 .write_build_side_chunk(
608 chunk,
609 hash_codes
610 .into_iter()
611 .map(|hash_code| hash_code.value())
612 .collect(),
613 )
614 .await?;
615 }
616
617 #[for_await]
619 for chunk in self.probe_side_source.execute() {
620 let chunk = chunk?;
621 let hash_codes = chunk.get_hash_values(
622 self.probe_key_idxs.as_slice(),
623 join_spill_manager.spill_build_hasher,
624 );
625 join_spill_manager
626 .write_probe_side_chunk(
627 chunk,
628 hash_codes
629 .into_iter()
630 .map(|hash_code| hash_code.value())
631 .collect(),
632 )
633 .await?;
634 }
635
636 join_spill_manager.close_writers().await?;
637
638 for i in 0..join_spill_manager.partition_num {
640 let partition_size = join_spill_manager.estimate_partition_size(i).await?;
641 let probe_side_stream = join_spill_manager.read_probe_side_partition(i).await?;
642 let build_side_stream = join_spill_manager.read_build_side_partition(i).await?;
643
644 let sub_hash_join_executor: HashJoinExecutor<K> = HashJoinExecutor::new_inner(
645 self.join_type,
646 self.output_indices.clone(),
647 Box::new(WrapStreamExecutor::new(
648 probe_schema.clone(),
649 probe_side_stream,
650 )),
651 Box::new(WrapStreamExecutor::new(
652 build_schema.clone(),
653 build_side_stream,
654 )),
655 self.probe_key_idxs.clone(),
656 self.build_key_idxs.clone(),
657 self.null_matched.clone(),
658 self.cond.clone(),
659 format!("{}-sub{}", self.identity.clone(), i),
660 self.chunk_size,
661 self.asof_desc.clone(),
662 self.spill_backend.clone(),
663 self.spill_metrics.clone(),
664 Some(partition_size),
665 self.shutdown_rx.clone(),
666 self.mem_ctx.clone(),
667 );
668
669 debug!(
670 "create sub_hash_join {} for hash_join {} to spill",
671 sub_hash_join_executor.identity, self.identity
672 );
673
674 let sub_hash_join_executor = Box::new(sub_hash_join_executor).execute();
675
676 #[for_await]
677 for chunk in sub_hash_join_executor {
678 let chunk = chunk?;
679 yield chunk;
680 }
681
682 join_spill_manager.clear_partition(i).await?;
684 }
685 } else {
686 let params = EquiJoinParams::new(
687 self.probe_side_source,
688 probe_data_types,
689 self.probe_key_idxs,
690 build_side,
691 build_data_types,
692 full_data_types,
693 hash_map,
694 next_build_row_with_same_key,
695 self.chunk_size,
696 self.shutdown_rx.clone(),
697 self.asof_desc,
698 );
699
700 if let Some(cond) = self.cond.as_ref()
701 && !params.is_asof_join()
702 {
703 let stream = match self.join_type {
704 JoinType::Inner => Self::do_inner_join_with_non_equi_condition(params, cond),
705 JoinType::LeftOuter => {
706 Self::do_left_outer_join_with_non_equi_condition(params, cond)
707 }
708 JoinType::LeftSemi => {
709 Self::do_left_semi_join_with_non_equi_condition(params, cond)
710 }
711 JoinType::LeftAnti => {
712 Self::do_left_anti_join_with_non_equi_condition(params, cond)
713 }
714 JoinType::RightOuter => {
715 Self::do_right_outer_join_with_non_equi_condition(params, cond)
716 }
717 JoinType::RightSemi => {
718 Self::do_right_semi_anti_join_with_non_equi_condition::<false>(params, cond)
719 }
720 JoinType::RightAnti => {
721 Self::do_right_semi_anti_join_with_non_equi_condition::<true>(params, cond)
722 }
723 JoinType::FullOuter => {
724 Self::do_full_outer_join_with_non_equi_condition(params, cond)
725 }
726 JoinType::AsOfInner | JoinType::AsOfLeftOuter => {
727 unreachable!("AsOf join should not reach here")
728 }
729 };
730 let mut output_chunk_builder =
732 DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
733 #[for_await]
734 for chunk in stream {
735 for output_chunk in
736 output_chunk_builder.append_chunk(chunk?.project(&self.output_indices))
737 {
738 yield output_chunk
739 }
740 }
741 if let Some(output_chunk) = output_chunk_builder.consume_all() {
742 yield output_chunk
743 }
744 } else {
745 let stream = match self.join_type {
746 JoinType::Inner | JoinType::AsOfInner => Self::do_inner_join(params),
747 JoinType::LeftOuter | JoinType::AsOfLeftOuter => {
748 Self::do_left_outer_join(params)
749 }
750 JoinType::LeftSemi => Self::do_left_semi_anti_join::<false>(params),
751 JoinType::LeftAnti => Self::do_left_semi_anti_join::<true>(params),
752 JoinType::RightOuter => Self::do_right_outer_join(params),
753 JoinType::RightSemi => Self::do_right_semi_anti_join::<false>(params),
754 JoinType::RightAnti => Self::do_right_semi_anti_join::<true>(params),
755 JoinType::FullOuter => Self::do_full_outer_join(params),
756 };
757 #[for_await]
758 for chunk in stream {
759 yield chunk?.project(&self.output_indices)
760 }
761 }
762 }
763 }
764
765 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
766 pub async fn do_inner_join(
767 EquiJoinParams {
768 probe_side,
769 probe_key_idxs,
770 build_side,
771 full_data_types,
772 hash_map,
773 next_build_row_with_same_key,
774 chunk_size,
775 shutdown_rx,
776 asof_desc,
777 ..
778 }: EquiJoinParams<K>,
779 ) {
780 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
781 #[for_await]
782 for probe_chunk in probe_side.execute() {
783 let probe_chunk = probe_chunk?;
784 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
785 for (probe_row_id, probe_key) in probe_keys
786 .iter()
787 .enumerate()
788 .filter_by_bitmap(probe_chunk.visibility())
789 {
790 let build_side_row_iter =
791 next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied());
792 if let Some(asof_desc) = &asof_desc {
793 if let Some(build_row_id) = Self::find_asof_matched_rows(
794 probe_chunk.row_at_unchecked_vis(probe_row_id),
795 &build_side,
796 build_side_row_iter,
797 asof_desc,
798 ) {
799 shutdown_rx.check()?;
800 if let Some(spilled) = Self::append_one_row(
801 &mut chunk_builder,
802 &probe_chunk,
803 probe_row_id,
804 &build_side[build_row_id.chunk_id()],
805 build_row_id.row_id(),
806 ) {
807 yield spilled
808 }
809 }
810 } else {
811 for build_row_id in build_side_row_iter {
812 shutdown_rx.check()?;
813 let build_chunk = &build_side[build_row_id.chunk_id()];
814 if let Some(spilled) = Self::append_one_row(
815 &mut chunk_builder,
816 &probe_chunk,
817 probe_row_id,
818 build_chunk,
819 build_row_id.row_id(),
820 ) {
821 yield spilled
822 }
823 }
824 }
825 }
826 }
827 if let Some(spilled) = chunk_builder.consume_all() {
828 yield spilled
829 }
830 }
831
832 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
833 pub async fn do_inner_join_with_non_equi_condition(
834 params: EquiJoinParams<K>,
835 cond: &BoxedExpression,
836 ) {
837 #[for_await]
838 for chunk in Self::do_inner_join(params) {
839 let mut chunk = chunk?;
840 chunk.set_visibility(cond.eval(&chunk).await?.as_bool().iter().collect());
841 yield chunk
842 }
843 }
844
845 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
846 pub async fn do_left_outer_join(
847 EquiJoinParams {
848 probe_side,
849 probe_key_idxs,
850 build_side,
851 build_data_types,
852 full_data_types,
853 hash_map,
854 next_build_row_with_same_key,
855 chunk_size,
856 shutdown_rx,
857 asof_desc,
858 ..
859 }: EquiJoinParams<K>,
860 ) {
861 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
862 #[for_await]
863 for probe_chunk in probe_side.execute() {
864 let probe_chunk = probe_chunk?;
865 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
866 for (probe_row_id, probe_key) in probe_keys
867 .iter()
868 .enumerate()
869 .filter_by_bitmap(probe_chunk.visibility())
870 {
871 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
872 let build_side_row_iter =
873 next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id));
874 if let Some(asof_desc) = &asof_desc {
875 if let Some(build_row_id) = Self::find_asof_matched_rows(
876 probe_chunk.row_at_unchecked_vis(probe_row_id),
877 &build_side,
878 build_side_row_iter,
879 asof_desc,
880 ) {
881 shutdown_rx.check()?;
882 if let Some(spilled) = Self::append_one_row(
883 &mut chunk_builder,
884 &probe_chunk,
885 probe_row_id,
886 &build_side[build_row_id.chunk_id()],
887 build_row_id.row_id(),
888 ) {
889 yield spilled
890 }
891 } else {
892 shutdown_rx.check()?;
893 let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
894 if let Some(spilled) = Self::append_one_row_with_null_build_side(
895 &mut chunk_builder,
896 probe_row,
897 build_data_types.len(),
898 ) {
899 yield spilled
900 }
901 }
902 } else {
903 for build_row_id in build_side_row_iter {
904 shutdown_rx.check()?;
905 let build_chunk = &build_side[build_row_id.chunk_id()];
906 if let Some(spilled) = Self::append_one_row(
907 &mut chunk_builder,
908 &probe_chunk,
909 probe_row_id,
910 build_chunk,
911 build_row_id.row_id(),
912 ) {
913 yield spilled
914 }
915 }
916 }
917 } else {
918 shutdown_rx.check()?;
919 let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
920 if let Some(spilled) = Self::append_one_row_with_null_build_side(
921 &mut chunk_builder,
922 probe_row,
923 build_data_types.len(),
924 ) {
925 yield spilled
926 }
927 }
928 }
929 }
930 if let Some(spilled) = chunk_builder.consume_all() {
931 yield spilled
932 }
933 }
934
935 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
936 pub async fn do_left_outer_join_with_non_equi_condition(
937 EquiJoinParams {
938 probe_side,
939 probe_data_types,
940 probe_key_idxs,
941 build_side,
942 build_data_types,
943 full_data_types,
944 hash_map,
945 next_build_row_with_same_key,
946 chunk_size,
947 shutdown_rx,
948 ..
949 }: EquiJoinParams<K>,
950 cond: &BoxedExpression,
951 ) {
952 let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size);
953 let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
954 let mut non_equi_state = LeftNonEquiJoinState {
955 probe_column_count: probe_data_types.len(),
956 ..Default::default()
957 };
958
959 #[for_await]
960 for probe_chunk in probe_side.execute() {
961 let probe_chunk = probe_chunk?;
962 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
963 for (probe_row_id, probe_key) in probe_keys
964 .iter()
965 .enumerate()
966 .filter_by_bitmap(probe_chunk.visibility())
967 {
968 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
969 non_equi_state
970 .first_output_row_id
971 .push(chunk_builder.buffered_count());
972
973 let mut build_row_id_iter = next_build_row_with_same_key
974 .row_id_iter(Some(*first_matched_build_row_id))
975 .peekable();
976 while let Some(build_row_id) = build_row_id_iter.next() {
977 shutdown_rx.check()?;
978 let build_chunk = &build_side[build_row_id.chunk_id()];
979 if let Some(spilled) = Self::append_one_row(
980 &mut chunk_builder,
981 &probe_chunk,
982 probe_row_id,
983 build_chunk,
984 build_row_id.row_id(),
985 ) {
986 non_equi_state.has_more_output_rows =
987 build_row_id_iter.peek().is_some();
988 yield Self::process_left_outer_join_non_equi_condition(
989 spilled,
990 cond.as_ref(),
991 &mut non_equi_state,
992 )
993 .await?
994 }
995 }
996 } else {
997 shutdown_rx.check()?;
998 let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
999 if let Some(spilled) = Self::append_one_row_with_null_build_side(
1000 &mut remaining_chunk_builder,
1001 probe_row,
1002 build_data_types.len(),
1003 ) {
1004 yield spilled
1005 }
1006 }
1007 }
1008 }
1009 non_equi_state.has_more_output_rows = false;
1010 if let Some(spilled) = chunk_builder.consume_all() {
1011 yield Self::process_left_outer_join_non_equi_condition(
1012 spilled,
1013 cond.as_ref(),
1014 &mut non_equi_state,
1015 )
1016 .await?
1017 }
1018
1019 if let Some(spilled) = remaining_chunk_builder.consume_all() {
1020 yield spilled
1021 }
1022 }
1023
1024 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1025 pub async fn do_left_semi_anti_join<const ANTI_JOIN: bool>(
1026 EquiJoinParams {
1027 probe_side,
1028 probe_data_types,
1029 probe_key_idxs,
1030 hash_map,
1031 chunk_size,
1032 shutdown_rx,
1033 ..
1034 }: EquiJoinParams<K>,
1035 ) {
1036 let mut chunk_builder = DataChunkBuilder::new(probe_data_types, chunk_size);
1037 #[for_await]
1038 for probe_chunk in probe_side.execute() {
1039 let probe_chunk = probe_chunk?;
1040 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1041 for (probe_row_id, probe_key) in probe_keys
1042 .iter()
1043 .enumerate()
1044 .filter_by_bitmap(probe_chunk.visibility())
1045 {
1046 shutdown_rx.check()?;
1047 if !ANTI_JOIN {
1048 if hash_map.contains_key(probe_key)
1049 && let Some(spilled) = Self::append_one_probe_row(
1050 &mut chunk_builder,
1051 &probe_chunk,
1052 probe_row_id,
1053 )
1054 {
1055 yield spilled
1056 }
1057 } else if hash_map.get(probe_key).is_none()
1058 && let Some(spilled) =
1059 Self::append_one_probe_row(&mut chunk_builder, &probe_chunk, probe_row_id)
1060 {
1061 yield spilled
1062 }
1063 }
1064 }
1065 if let Some(spilled) = chunk_builder.consume_all() {
1066 yield spilled
1067 }
1068 }
1069
1070 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1080 pub async fn do_left_semi_join_with_non_equi_condition<'a>(
1081 EquiJoinParams {
1082 probe_side,
1083 probe_key_idxs,
1084 build_side,
1085 full_data_types,
1086 hash_map,
1087 next_build_row_with_same_key,
1088 chunk_size,
1089 shutdown_rx,
1090 ..
1091 }: EquiJoinParams<K>,
1092 cond: &'a BoxedExpression,
1093 ) {
1094 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1095 let mut non_equi_state = LeftNonEquiJoinState::default();
1096
1097 #[for_await]
1098 for probe_chunk in probe_side.execute() {
1099 let probe_chunk = probe_chunk?;
1100 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1101 for (probe_row_id, probe_key) in probe_keys
1102 .iter()
1103 .enumerate()
1104 .filter_by_bitmap(probe_chunk.visibility())
1105 {
1106 non_equi_state.found_matched = false;
1107 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1108 non_equi_state
1109 .first_output_row_id
1110 .push(chunk_builder.buffered_count());
1111
1112 for build_row_id in
1113 next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id))
1114 {
1115 shutdown_rx.check()?;
1116 if non_equi_state.found_matched {
1117 break;
1118 }
1119 let build_chunk = &build_side[build_row_id.chunk_id()];
1120 if let Some(spilled) = Self::append_one_row(
1121 &mut chunk_builder,
1122 &probe_chunk,
1123 probe_row_id,
1124 build_chunk,
1125 build_row_id.row_id(),
1126 ) {
1127 yield Self::process_left_semi_anti_join_non_equi_condition::<false>(
1128 spilled,
1129 cond.as_ref(),
1130 &mut non_equi_state,
1131 )
1132 .await?
1133 }
1134 }
1135 }
1136 }
1137 }
1138
1139 if let Some(spilled) = chunk_builder.consume_all() {
1141 yield Self::process_left_semi_anti_join_non_equi_condition::<false>(
1142 spilled,
1143 cond.as_ref(),
1144 &mut non_equi_state,
1145 )
1146 .await?
1147 }
1148 }
1149
1150 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1151 pub async fn do_left_anti_join_with_non_equi_condition(
1152 EquiJoinParams {
1153 probe_side,
1154 probe_data_types,
1155 probe_key_idxs,
1156 build_side,
1157 full_data_types,
1158 hash_map,
1159 next_build_row_with_same_key,
1160 chunk_size,
1161 shutdown_rx,
1162 ..
1163 }: EquiJoinParams<K>,
1164 cond: &BoxedExpression,
1165 ) {
1166 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1167 let mut remaining_chunk_builder = DataChunkBuilder::new(probe_data_types, chunk_size);
1168 let mut non_equi_state = LeftNonEquiJoinState::default();
1169
1170 #[for_await]
1171 for probe_chunk in probe_side.execute() {
1172 let probe_chunk = probe_chunk?;
1173 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1174 for (probe_row_id, probe_key) in probe_keys
1175 .iter()
1176 .enumerate()
1177 .filter_by_bitmap(probe_chunk.visibility())
1178 {
1179 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1180 non_equi_state
1181 .first_output_row_id
1182 .push(chunk_builder.buffered_count());
1183 let mut build_row_id_iter = next_build_row_with_same_key
1184 .row_id_iter(Some(*first_matched_build_row_id))
1185 .peekable();
1186 while let Some(build_row_id) = build_row_id_iter.next() {
1187 shutdown_rx.check()?;
1188 let build_chunk = &build_side[build_row_id.chunk_id()];
1189 if let Some(spilled) = Self::append_one_row(
1190 &mut chunk_builder,
1191 &probe_chunk,
1192 probe_row_id,
1193 build_chunk,
1194 build_row_id.row_id(),
1195 ) {
1196 non_equi_state.has_more_output_rows =
1197 build_row_id_iter.peek().is_some();
1198 yield Self::process_left_semi_anti_join_non_equi_condition::<true>(
1199 spilled,
1200 cond.as_ref(),
1201 &mut non_equi_state,
1202 )
1203 .await?
1204 }
1205 }
1206 } else if let Some(spilled) = Self::append_one_probe_row(
1207 &mut remaining_chunk_builder,
1208 &probe_chunk,
1209 probe_row_id,
1210 ) {
1211 yield spilled
1212 }
1213 }
1214 }
1215 non_equi_state.has_more_output_rows = false;
1216 if let Some(spilled) = chunk_builder.consume_all() {
1217 yield Self::process_left_semi_anti_join_non_equi_condition::<true>(
1218 spilled,
1219 cond.as_ref(),
1220 &mut non_equi_state,
1221 )
1222 .await?
1223 }
1224 if let Some(spilled) = remaining_chunk_builder.consume_all() {
1225 yield spilled
1226 }
1227 }
1228
1229 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1230 pub async fn do_right_outer_join(
1231 EquiJoinParams {
1232 probe_side,
1233 probe_data_types,
1234 probe_key_idxs,
1235 build_side,
1236 full_data_types,
1237 hash_map,
1238 next_build_row_with_same_key,
1239 chunk_size,
1240 shutdown_rx,
1241 ..
1242 }: EquiJoinParams<K>,
1243 ) {
1244 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1245 let mut build_row_matched =
1246 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1247
1248 #[for_await]
1249 for probe_chunk in probe_side.execute() {
1250 let probe_chunk = probe_chunk?;
1251 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1252 for (probe_row_id, probe_key) in probe_keys
1253 .iter()
1254 .enumerate()
1255 .filter_by_bitmap(probe_chunk.visibility())
1256 {
1257 for build_row_id in
1258 next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1259 {
1260 shutdown_rx.check()?;
1261 build_row_matched[build_row_id] = true;
1262 let build_chunk = &build_side[build_row_id.chunk_id()];
1263 if let Some(spilled) = Self::append_one_row(
1264 &mut chunk_builder,
1265 &probe_chunk,
1266 probe_row_id,
1267 build_chunk,
1268 build_row_id.row_id(),
1269 ) {
1270 yield spilled
1271 }
1272 }
1273 }
1274 }
1275 #[for_await]
1276 for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1277 &mut chunk_builder,
1278 &build_side,
1279 &build_row_matched,
1280 probe_data_types.len(),
1281 ) {
1282 yield spilled?
1283 }
1284 }
1285
1286 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1287 pub async fn do_right_outer_join_with_non_equi_condition(
1288 EquiJoinParams {
1289 probe_side,
1290 probe_data_types,
1291 probe_key_idxs,
1292 build_side,
1293 full_data_types,
1294 hash_map,
1295 next_build_row_with_same_key,
1296 chunk_size,
1297 shutdown_rx,
1298 ..
1299 }: EquiJoinParams<K>,
1300 cond: &BoxedExpression,
1301 ) {
1302 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1303 let build_row_matched =
1304 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1305 let mut non_equi_state = RightNonEquiJoinState {
1306 build_row_matched,
1307 ..Default::default()
1308 };
1309
1310 #[for_await]
1311 for probe_chunk in probe_side.execute() {
1312 let probe_chunk = probe_chunk?;
1313 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1314 for (probe_row_id, probe_key) in probe_keys
1315 .iter()
1316 .enumerate()
1317 .filter_by_bitmap(probe_chunk.visibility())
1318 {
1319 for build_row_id in
1320 next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1321 {
1322 shutdown_rx.check()?;
1323 non_equi_state.build_row_ids.push(build_row_id);
1324 let build_chunk = &build_side[build_row_id.chunk_id()];
1325 if let Some(spilled) = Self::append_one_row(
1326 &mut chunk_builder,
1327 &probe_chunk,
1328 probe_row_id,
1329 build_chunk,
1330 build_row_id.row_id(),
1331 ) {
1332 yield Self::process_right_outer_join_non_equi_condition(
1333 spilled,
1334 cond.as_ref(),
1335 &mut non_equi_state,
1336 )
1337 .await?
1338 }
1339 }
1340 }
1341 }
1342 if let Some(spilled) = chunk_builder.consume_all() {
1343 yield Self::process_right_outer_join_non_equi_condition(
1344 spilled,
1345 cond.as_ref(),
1346 &mut non_equi_state,
1347 )
1348 .await?
1349 }
1350 #[for_await]
1351 for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1352 &mut chunk_builder,
1353 &build_side,
1354 &non_equi_state.build_row_matched,
1355 probe_data_types.len(),
1356 ) {
1357 yield spilled?
1358 }
1359 }
1360
1361 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1362 pub async fn do_right_semi_anti_join<const ANTI_JOIN: bool>(
1363 EquiJoinParams {
1364 probe_side,
1365 probe_key_idxs,
1366 build_side,
1367 build_data_types,
1368 hash_map,
1369 next_build_row_with_same_key,
1370 chunk_size,
1371 shutdown_rx,
1372 ..
1373 }: EquiJoinParams<K>,
1374 ) {
1375 let mut chunk_builder = DataChunkBuilder::new(build_data_types, chunk_size);
1376 let mut build_row_matched =
1377 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1378
1379 #[for_await]
1380 for probe_chunk in probe_side.execute() {
1381 let probe_chunk = probe_chunk?;
1382 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1383 for probe_key in probe_keys.iter().filter_by_bitmap(probe_chunk.visibility()) {
1384 for build_row_id in
1385 next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1386 {
1387 shutdown_rx.check()?;
1388 build_row_matched[build_row_id] = true;
1389 }
1390 }
1391 }
1392 #[for_await]
1393 for spilled in Self::handle_remaining_build_rows_for_right_semi_anti_join::<ANTI_JOIN>(
1394 &mut chunk_builder,
1395 &build_side,
1396 &build_row_matched,
1397 ) {
1398 yield spilled?
1399 }
1400 }
1401
1402 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1403 pub async fn do_right_semi_anti_join_with_non_equi_condition<const ANTI_JOIN: bool>(
1404 EquiJoinParams {
1405 probe_side,
1406 probe_key_idxs,
1407 build_side,
1408 build_data_types,
1409 full_data_types,
1410 hash_map,
1411 next_build_row_with_same_key,
1412 chunk_size,
1413 shutdown_rx,
1414 ..
1415 }: EquiJoinParams<K>,
1416 cond: &BoxedExpression,
1417 ) {
1418 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1419 let mut remaining_chunk_builder = DataChunkBuilder::new(build_data_types, chunk_size);
1420 let build_row_matched =
1421 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1422 let mut non_equi_state = RightNonEquiJoinState {
1423 build_row_matched,
1424 ..Default::default()
1425 };
1426
1427 #[for_await]
1428 for probe_chunk in probe_side.execute() {
1429 let probe_chunk = probe_chunk?;
1430 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1431 for (probe_row_id, probe_key) in probe_keys
1432 .iter()
1433 .enumerate()
1434 .filter_by_bitmap(probe_chunk.visibility())
1435 {
1436 for build_row_id in
1437 next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1438 {
1439 shutdown_rx.check()?;
1440 non_equi_state.build_row_ids.push(build_row_id);
1441 let build_chunk = &build_side[build_row_id.chunk_id()];
1442 if let Some(spilled) = Self::append_one_row(
1443 &mut chunk_builder,
1444 &probe_chunk,
1445 probe_row_id,
1446 build_chunk,
1447 build_row_id.row_id(),
1448 ) {
1449 Self::process_right_semi_anti_join_non_equi_condition(
1450 spilled,
1451 cond.as_ref(),
1452 &mut non_equi_state,
1453 )
1454 .await?
1455 }
1456 }
1457 }
1458 }
1459 if let Some(spilled) = chunk_builder.consume_all() {
1460 Self::process_right_semi_anti_join_non_equi_condition(
1461 spilled,
1462 cond.as_ref(),
1463 &mut non_equi_state,
1464 )
1465 .await?
1466 }
1467 #[for_await]
1468 for spilled in Self::handle_remaining_build_rows_for_right_semi_anti_join::<ANTI_JOIN>(
1469 &mut remaining_chunk_builder,
1470 &build_side,
1471 &non_equi_state.build_row_matched,
1472 ) {
1473 yield spilled?
1474 }
1475 }
1476
1477 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1478 pub async fn do_full_outer_join(
1479 EquiJoinParams {
1480 probe_side,
1481 probe_data_types,
1482 probe_key_idxs,
1483 build_side,
1484 build_data_types,
1485 full_data_types,
1486 hash_map,
1487 next_build_row_with_same_key,
1488 chunk_size,
1489 shutdown_rx,
1490 ..
1491 }: EquiJoinParams<K>,
1492 ) {
1493 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1494 let mut build_row_matched =
1495 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1496
1497 #[for_await]
1498 for probe_chunk in probe_side.execute() {
1499 let probe_chunk = probe_chunk?;
1500 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1501 for (probe_row_id, probe_key) in probe_keys
1502 .iter()
1503 .enumerate()
1504 .filter_by_bitmap(probe_chunk.visibility())
1505 {
1506 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1507 for build_row_id in
1508 next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id))
1509 {
1510 shutdown_rx.check()?;
1511 build_row_matched[build_row_id] = true;
1512 let build_chunk = &build_side[build_row_id.chunk_id()];
1513 if let Some(spilled) = Self::append_one_row(
1514 &mut chunk_builder,
1515 &probe_chunk,
1516 probe_row_id,
1517 build_chunk,
1518 build_row_id.row_id(),
1519 ) {
1520 yield spilled
1521 }
1522 }
1523 } else {
1524 let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
1525 if let Some(spilled) = Self::append_one_row_with_null_build_side(
1526 &mut chunk_builder,
1527 probe_row,
1528 build_data_types.len(),
1529 ) {
1530 yield spilled
1531 }
1532 }
1533 }
1534 }
1535 #[for_await]
1536 for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1537 &mut chunk_builder,
1538 &build_side,
1539 &build_row_matched,
1540 probe_data_types.len(),
1541 ) {
1542 yield spilled?
1543 }
1544 }
1545
1546 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1547 pub async fn do_full_outer_join_with_non_equi_condition(
1548 EquiJoinParams {
1549 probe_side,
1550 probe_data_types,
1551 probe_key_idxs,
1552 build_side,
1553 build_data_types,
1554 full_data_types,
1555 hash_map,
1556 next_build_row_with_same_key,
1557 chunk_size,
1558 shutdown_rx,
1559 ..
1560 }: EquiJoinParams<K>,
1561 cond: &BoxedExpression,
1562 ) {
1563 let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size);
1564 let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1565 let mut left_non_equi_state = LeftNonEquiJoinState {
1566 probe_column_count: probe_data_types.len(),
1567 ..Default::default()
1568 };
1569 let build_row_matched =
1570 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1571 let mut right_non_equi_state = RightNonEquiJoinState {
1572 build_row_matched,
1573 ..Default::default()
1574 };
1575
1576 #[for_await]
1577 for probe_chunk in probe_side.execute() {
1578 let probe_chunk = probe_chunk?;
1579 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1580 for (probe_row_id, probe_key) in probe_keys
1581 .iter()
1582 .enumerate()
1583 .filter_by_bitmap(probe_chunk.visibility())
1584 {
1585 left_non_equi_state.found_matched = false;
1586 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1587 left_non_equi_state
1588 .first_output_row_id
1589 .push(chunk_builder.buffered_count());
1590 let mut build_row_id_iter = next_build_row_with_same_key
1591 .row_id_iter(Some(*first_matched_build_row_id))
1592 .peekable();
1593 while let Some(build_row_id) = build_row_id_iter.next() {
1594 shutdown_rx.check()?;
1595 right_non_equi_state.build_row_ids.push(build_row_id);
1596 let build_chunk = &build_side[build_row_id.chunk_id()];
1597 if let Some(spilled) = Self::append_one_row(
1598 &mut chunk_builder,
1599 &probe_chunk,
1600 probe_row_id,
1601 build_chunk,
1602 build_row_id.row_id(),
1603 ) {
1604 left_non_equi_state.has_more_output_rows =
1605 build_row_id_iter.peek().is_some();
1606 yield Self::process_full_outer_join_non_equi_condition(
1607 spilled,
1608 cond.as_ref(),
1609 &mut left_non_equi_state,
1610 &mut right_non_equi_state,
1611 )
1612 .await?
1613 }
1614 }
1615 } else {
1616 shutdown_rx.check()?;
1617 let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
1618 if let Some(spilled) = Self::append_one_row_with_null_build_side(
1619 &mut remaining_chunk_builder,
1620 probe_row,
1621 build_data_types.len(),
1622 ) {
1623 yield spilled
1624 }
1625 }
1626 }
1627 }
1628 left_non_equi_state.has_more_output_rows = false;
1629 if let Some(spilled) = chunk_builder.consume_all() {
1630 yield Self::process_full_outer_join_non_equi_condition(
1631 spilled,
1632 cond.as_ref(),
1633 &mut left_non_equi_state,
1634 &mut right_non_equi_state,
1635 )
1636 .await?
1637 }
1638 #[for_await]
1639 for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1640 &mut remaining_chunk_builder,
1641 &build_side,
1642 &right_non_equi_state.build_row_matched,
1643 probe_data_types.len(),
1644 ) {
1645 yield spilled?
1646 }
1647 }
1648
1649 async fn process_left_outer_join_non_equi_condition(
1786 chunk: DataChunk,
1787 cond: &dyn Expression,
1788 LeftNonEquiJoinState {
1789 probe_column_count,
1790 first_output_row_id,
1791 has_more_output_rows,
1792 found_matched,
1793 }: &mut LeftNonEquiJoinState,
1794 ) -> Result<DataChunk> {
1795 let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1796 Ok(DataChunkMutator(chunk)
1797 .nullify_build_side_for_non_equi_condition(&filter, *probe_column_count)
1798 .remove_duplicate_rows_for_left_outer_join(
1799 &filter,
1800 first_output_row_id,
1801 *has_more_output_rows,
1802 found_matched,
1803 )
1804 .take())
1805 }
1806
1807 async fn process_left_semi_anti_join_non_equi_condition<const ANTI_JOIN: bool>(
1810 chunk: DataChunk,
1811 cond: &dyn Expression,
1812 LeftNonEquiJoinState {
1813 first_output_row_id,
1814 found_matched,
1815 has_more_output_rows,
1816 ..
1817 }: &mut LeftNonEquiJoinState,
1818 ) -> Result<DataChunk> {
1819 let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1820 Ok(DataChunkMutator(chunk)
1821 .remove_duplicate_rows_for_left_semi_anti_join::<ANTI_JOIN>(
1822 &filter,
1823 first_output_row_id,
1824 *has_more_output_rows,
1825 found_matched,
1826 )
1827 .take())
1828 }
1829
1830 async fn process_right_outer_join_non_equi_condition(
1831 chunk: DataChunk,
1832 cond: &dyn Expression,
1833 RightNonEquiJoinState {
1834 build_row_ids,
1835 build_row_matched,
1836 }: &mut RightNonEquiJoinState,
1837 ) -> Result<DataChunk> {
1838 let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1839 Ok(DataChunkMutator(chunk)
1840 .remove_duplicate_rows_for_right_outer_join(&filter, build_row_ids, build_row_matched)
1841 .take())
1842 }
1843
1844 async fn process_right_semi_anti_join_non_equi_condition(
1845 chunk: DataChunk,
1846 cond: &dyn Expression,
1847 RightNonEquiJoinState {
1848 build_row_ids,
1849 build_row_matched,
1850 }: &mut RightNonEquiJoinState,
1851 ) -> Result<()> {
1852 let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1853 DataChunkMutator(chunk).remove_duplicate_rows_for_right_semi_anti_join(
1854 &filter,
1855 build_row_ids,
1856 build_row_matched,
1857 );
1858 Ok(())
1859 }
1860
1861 async fn process_full_outer_join_non_equi_condition(
1862 chunk: DataChunk,
1863 cond: &dyn Expression,
1864 left_non_equi_state: &mut LeftNonEquiJoinState,
1865 right_non_equi_state: &mut RightNonEquiJoinState,
1866 ) -> Result<DataChunk> {
1867 let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1868 Ok(DataChunkMutator(chunk)
1869 .nullify_build_side_for_non_equi_condition(
1870 &filter,
1871 left_non_equi_state.probe_column_count,
1872 )
1873 .remove_duplicate_rows_for_full_outer_join(
1874 &filter,
1875 left_non_equi_state,
1876 right_non_equi_state,
1877 )
1878 .take())
1879 }
1880
1881 #[try_stream(ok = DataChunk, error = BatchError)]
1882 async fn handle_remaining_build_rows_for_right_outer_join<'a>(
1883 chunk_builder: &'a mut DataChunkBuilder,
1884 build_side: &'a [DataChunk],
1885 build_row_matched: &'a ChunkedData<bool>,
1886 probe_column_count: usize,
1887 ) {
1888 for build_row_id in build_row_matched
1889 .all_row_ids()
1890 .filter(|build_row_id| !build_row_matched[*build_row_id])
1891 {
1892 let build_row =
1893 build_side[build_row_id.chunk_id()].row_at_unchecked_vis(build_row_id.row_id());
1894 if let Some(spilled) = Self::append_one_row_with_null_probe_side(
1895 chunk_builder,
1896 build_row,
1897 probe_column_count,
1898 ) {
1899 yield spilled
1900 }
1901 }
1902 if let Some(spilled) = chunk_builder.consume_all() {
1903 yield spilled
1904 }
1905 }
1906
1907 #[try_stream(ok = DataChunk, error = BatchError)]
1908 async fn handle_remaining_build_rows_for_right_semi_anti_join<'a, const ANTI_JOIN: bool>(
1909 chunk_builder: &'a mut DataChunkBuilder,
1910 build_side: &'a [DataChunk],
1911 build_row_matched: &'a ChunkedData<bool>,
1912 ) {
1913 for build_row_id in build_row_matched.all_row_ids().filter(|build_row_id| {
1914 if !ANTI_JOIN {
1915 build_row_matched[*build_row_id]
1916 } else {
1917 !build_row_matched[*build_row_id]
1918 }
1919 }) {
1920 if let Some(spilled) = Self::append_one_build_row(
1921 chunk_builder,
1922 &build_side[build_row_id.chunk_id()],
1923 build_row_id.row_id(),
1924 ) {
1925 yield spilled
1926 }
1927 }
1928 if let Some(spilled) = chunk_builder.consume_all() {
1929 yield spilled
1930 }
1931 }
1932
1933 fn append_one_row(
1934 chunk_builder: &mut DataChunkBuilder,
1935 probe_chunk: &DataChunk,
1936 probe_row_id: usize,
1937 build_chunk: &DataChunk,
1938 build_row_id: usize,
1939 ) -> Option<DataChunk> {
1940 chunk_builder.append_one_row_from_array_elements(
1941 probe_chunk.columns().iter().map(|c| c.as_ref()),
1942 probe_row_id,
1943 build_chunk.columns().iter().map(|c| c.as_ref()),
1944 build_row_id,
1945 )
1946 }
1947
1948 fn append_one_probe_row(
1949 chunk_builder: &mut DataChunkBuilder,
1950 probe_chunk: &DataChunk,
1951 probe_row_id: usize,
1952 ) -> Option<DataChunk> {
1953 chunk_builder.append_one_row_from_array_elements(
1954 probe_chunk.columns().iter().map(|c| c.as_ref()),
1955 probe_row_id,
1956 empty(),
1957 0,
1958 )
1959 }
1960
1961 fn append_one_build_row(
1962 chunk_builder: &mut DataChunkBuilder,
1963 build_chunk: &DataChunk,
1964 build_row_id: usize,
1965 ) -> Option<DataChunk> {
1966 chunk_builder.append_one_row_from_array_elements(
1967 empty(),
1968 0,
1969 build_chunk.columns().iter().map(|c| c.as_ref()),
1970 build_row_id,
1971 )
1972 }
1973
1974 fn append_one_row_with_null_build_side(
1975 chunk_builder: &mut DataChunkBuilder,
1976 probe_row_ref: RowRef<'_>,
1977 build_column_count: usize,
1978 ) -> Option<DataChunk> {
1979 chunk_builder.append_one_row(probe_row_ref.chain(repeat_n(Datum::None, build_column_count)))
1980 }
1981
1982 fn append_one_row_with_null_probe_side(
1983 chunk_builder: &mut DataChunkBuilder,
1984 build_row_ref: RowRef<'_>,
1985 probe_column_count: usize,
1986 ) -> Option<DataChunk> {
1987 chunk_builder.append_one_row(repeat_n(Datum::None, probe_column_count).chain(build_row_ref))
1988 }
1989
1990 fn find_asof_matched_rows(
1991 probe_row_ref: RowRef<'_>,
1992 build_side: &[DataChunk],
1993 build_side_row_iter: RowIdIter<'_>,
1994 asof_join_condition: &AsOfDesc,
1995 ) -> Option<RowId> {
1996 let probe_inequality_value = probe_row_ref.datum_at(asof_join_condition.left_idx);
1997 if let Some(probe_inequality_scalar) = probe_inequality_value {
1998 let mut result_row_id: Option<RowId> = None;
1999 let mut build_row_ref;
2000
2001 for build_row_id in build_side_row_iter {
2002 build_row_ref =
2003 build_side[build_row_id.chunk_id()].row_at_unchecked_vis(build_row_id.row_id());
2004 let build_inequality_value = build_row_ref.datum_at(asof_join_condition.right_idx);
2005 if let Some(build_inequality_scalar) = build_inequality_value {
2006 let mut pick_result = |compare: fn(Ordering) -> bool| {
2007 if let Some(result_row_id_inner) = result_row_id {
2008 let result_row_ref = build_side[result_row_id_inner.chunk_id()]
2009 .row_at_unchecked_vis(result_row_id_inner.row_id());
2010 let result_inequality_scalar = result_row_ref
2011 .datum_at(asof_join_condition.right_idx)
2012 .unwrap();
2013 if compare(
2014 probe_inequality_scalar.default_cmp(&build_inequality_scalar),
2015 ) && compare(
2016 probe_inequality_scalar.default_cmp(&result_inequality_scalar),
2017 ) {
2018 result_row_id = Some(build_row_id);
2019 }
2020 } else if compare(
2021 probe_inequality_scalar.default_cmp(&build_inequality_scalar),
2022 ) {
2023 result_row_id = Some(build_row_id);
2024 }
2025 };
2026 match asof_join_condition.inequality_type {
2027 AsOfInequalityType::Lt => {
2028 pick_result(Ordering::is_lt);
2029 }
2030 AsOfInequalityType::Le => {
2031 pick_result(Ordering::is_le);
2032 }
2033 AsOfInequalityType::Gt => {
2034 pick_result(Ordering::is_gt);
2035 }
2036 AsOfInequalityType::Ge => {
2037 pick_result(Ordering::is_ge);
2038 }
2039 }
2040 }
2041 }
2042 result_row_id
2043 } else {
2044 None
2045 }
2046 }
2047}
2048
2049#[repr(transparent)]
2051struct DataChunkMutator(DataChunk);
2052
2053impl DataChunkMutator {
2054 fn nullify_build_side_for_non_equi_condition(
2055 self,
2056 filter: &Bitmap,
2057 probe_column_count: usize,
2058 ) -> Self {
2059 let (mut columns, vis) = self.0.into_parts();
2060
2061 for build_column in columns.split_off(probe_column_count) {
2062 let mut array = Arc::try_unwrap(build_column).unwrap();
2064 array.set_bitmap(array.null_bitmap() & filter);
2065 columns.push(array.into());
2066 }
2067
2068 Self(DataChunk::new(columns, vis))
2069 }
2070
2071 fn remove_duplicate_rows_for_left_outer_join(
2072 mut self,
2073 filter: &Bitmap,
2074 first_output_row_ids: &mut Vec<usize>,
2075 has_more_output_rows: bool,
2076 found_non_null: &mut bool,
2077 ) -> Self {
2078 let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2079
2080 for (&start_row_id, &end_row_id) in iter::once(&0)
2081 .chain(first_output_row_ids.iter())
2082 .tuple_windows()
2083 .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2084 {
2085 for row_id in start_row_id..end_row_id {
2086 if filter.is_set(row_id) {
2087 *found_non_null = true;
2088 new_visibility.set(row_id, true);
2089 }
2090 }
2091 if !*found_non_null {
2092 new_visibility.set(start_row_id, true);
2093 }
2094 *found_non_null = false;
2095 }
2096
2097 let start_row_id = first_output_row_ids.last().copied().unwrap_or_default();
2098 for row_id in start_row_id..filter.len() {
2099 if filter.is_set(row_id) {
2100 *found_non_null = true;
2101 new_visibility.set(row_id, true);
2102 }
2103 }
2104 if !has_more_output_rows {
2105 if !*found_non_null {
2106 new_visibility.set(start_row_id, true);
2107 }
2108 *found_non_null = false;
2109 }
2110
2111 first_output_row_ids.clear();
2112
2113 self.0
2114 .set_visibility(new_visibility.finish() & self.0.visibility());
2115 self
2116 }
2117
2118 fn remove_duplicate_rows_for_left_semi_anti_join<const ANTI_JOIN: bool>(
2122 mut self,
2123 filter: &Bitmap,
2124 first_output_row_ids: &mut Vec<usize>,
2125 has_more_output_rows: bool,
2126 found_matched: &mut bool,
2127 ) -> Self {
2128 let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2129
2130 for (&start_row_id, &end_row_id) in iter::once(&0)
2131 .chain(first_output_row_ids.iter())
2132 .tuple_windows()
2133 .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2134 {
2135 for row_id in start_row_id..end_row_id {
2136 if filter.is_set(row_id) {
2137 if !ANTI_JOIN && !*found_matched {
2138 new_visibility.set(row_id, true);
2139 }
2140 *found_matched = true;
2141 break;
2142 }
2143 }
2144 if ANTI_JOIN && !*found_matched {
2145 new_visibility.set(start_row_id, true);
2146 }
2147 *found_matched = false;
2148 }
2149
2150 let start_row_id = first_output_row_ids.last().copied().unwrap_or_default();
2151 for row_id in start_row_id..filter.len() {
2152 if filter.is_set(row_id) {
2153 if !ANTI_JOIN && !*found_matched {
2154 new_visibility.set(row_id, true);
2155 }
2156 *found_matched = true;
2157 break;
2158 }
2159 }
2160 if !has_more_output_rows && ANTI_JOIN {
2161 if !*found_matched {
2162 new_visibility.set(start_row_id, true);
2163 }
2164 *found_matched = false;
2165 }
2166
2167 first_output_row_ids.clear();
2168
2169 self.0
2170 .set_visibility(new_visibility.finish() & self.0.visibility());
2171 self
2172 }
2173
2174 fn remove_duplicate_rows_for_right_outer_join(
2175 mut self,
2176 filter: &Bitmap,
2177 build_row_ids: &mut Vec<RowId>,
2178 build_row_matched: &mut ChunkedData<bool>,
2179 ) -> Self {
2180 let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2181 for (output_row_id, (output_row_non_null, &build_row_id)) in
2182 filter.iter().zip_eq_fast(build_row_ids.iter()).enumerate()
2183 {
2184 if output_row_non_null {
2185 build_row_matched[build_row_id] = true;
2186 new_visibility.set(output_row_id, true);
2187 }
2188 }
2189
2190 build_row_ids.clear();
2191
2192 self.0
2193 .set_visibility(new_visibility.finish() & self.0.visibility());
2194 self
2195 }
2196
2197 fn remove_duplicate_rows_for_right_semi_anti_join(
2198 self,
2199 filter: &Bitmap,
2200 build_row_ids: &mut Vec<RowId>,
2201 build_row_matched: &mut ChunkedData<bool>,
2202 ) {
2203 for (output_row_non_null, &build_row_id) in filter.iter().zip_eq_fast(build_row_ids.iter())
2204 {
2205 if output_row_non_null {
2206 build_row_matched[build_row_id] = true;
2207 }
2208 }
2209
2210 build_row_ids.clear();
2211 }
2212
2213 fn remove_duplicate_rows_for_full_outer_join(
2214 mut self,
2215 filter: &Bitmap,
2216 LeftNonEquiJoinState {
2217 first_output_row_id,
2218 has_more_output_rows,
2219 found_matched,
2220 ..
2221 }: &mut LeftNonEquiJoinState,
2222 RightNonEquiJoinState {
2223 build_row_ids,
2224 build_row_matched,
2225 }: &mut RightNonEquiJoinState,
2226 ) -> Self {
2227 let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2228
2229 for (&start_row_id, &end_row_id) in iter::once(&0)
2230 .chain(first_output_row_id.iter())
2231 .tuple_windows()
2232 .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2233 {
2234 for row_id in start_row_id..end_row_id {
2235 if filter.is_set(row_id) {
2236 *found_matched = true;
2237 new_visibility.set(row_id, true);
2238 }
2239 }
2240 if !*found_matched {
2241 new_visibility.set(start_row_id, true);
2242 }
2243 *found_matched = false;
2244 }
2245
2246 let start_row_id = first_output_row_id.last().copied().unwrap_or_default();
2247 for row_id in start_row_id..filter.len() {
2248 if filter.is_set(row_id) {
2249 *found_matched = true;
2250 new_visibility.set(row_id, true);
2251 }
2252 }
2253 if !*has_more_output_rows && !*found_matched {
2254 new_visibility.set(start_row_id, true);
2255 }
2256
2257 first_output_row_id.clear();
2258
2259 for (output_row_id, (output_row_non_null, &build_row_id)) in
2260 filter.iter().zip_eq_fast(build_row_ids.iter()).enumerate()
2261 {
2262 if output_row_non_null {
2263 build_row_matched[build_row_id] = true;
2264 new_visibility.set(output_row_id, true);
2265 }
2266 }
2267
2268 build_row_ids.clear();
2269
2270 self.0
2271 .set_visibility(new_visibility.finish() & self.0.visibility());
2272 self
2273 }
2274
2275 fn take(self) -> DataChunk {
2276 self.0
2277 }
2278}
2279
2280impl BoxedExecutorBuilder for HashJoinExecutor<()> {
2281 async fn new_boxed_executor(
2282 context: &ExecutorBuilder<'_>,
2283 inputs: Vec<BoxedExecutor>,
2284 ) -> Result<BoxedExecutor> {
2285 let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap();
2286
2287 let hash_join_node = try_match_expand!(
2288 context.plan_node().get_node_body().unwrap(),
2289 NodeBody::HashJoin
2290 )?;
2291
2292 let join_type = JoinType::from_prost(hash_join_node.get_join_type()?);
2293
2294 let cond = match hash_join_node.get_condition() {
2295 Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
2296 Err(_) => None,
2297 };
2298
2299 let left_key_idxs = hash_join_node
2300 .get_left_key()
2301 .iter()
2302 .map(|&idx| idx as usize)
2303 .collect_vec();
2304 let right_key_idxs = hash_join_node
2305 .get_right_key()
2306 .iter()
2307 .map(|&idx| idx as usize)
2308 .collect_vec();
2309
2310 ensure!(left_key_idxs.len() == right_key_idxs.len());
2311
2312 let right_data_types = right_child.schema().data_types();
2313 let right_key_types = right_key_idxs
2314 .iter()
2315 .map(|&idx| right_data_types[idx].clone())
2316 .collect_vec();
2317
2318 let output_indices: Vec<usize> = hash_join_node
2319 .get_output_indices()
2320 .iter()
2321 .map(|&x| x as usize)
2322 .collect();
2323
2324 let identity = context.plan_node().get_identity().clone();
2325
2326 let asof_desc = hash_join_node
2327 .asof_desc
2328 .map(|desc| AsOfDesc::from_protobuf(&desc))
2329 .transpose()?;
2330
2331 Ok(HashJoinExecutorArgs {
2332 join_type,
2333 output_indices,
2334 probe_side_source: left_child,
2335 build_side_source: right_child,
2336 probe_key_idxs: left_key_idxs,
2337 build_key_idxs: right_key_idxs,
2338 null_matched: hash_join_node.get_null_safe().clone(),
2339 cond,
2340 identity: identity.clone(),
2341 right_key_types,
2342 chunk_size: context.context().get_config().developer.chunk_size,
2343 asof_desc,
2344 spill_backend: if context.context().get_config().enable_spill {
2345 Some(Disk)
2346 } else {
2347 None
2348 },
2349 spill_metrics: context.context().spill_metrics(),
2350 shutdown_rx: context.shutdown_rx().clone(),
2351 mem_ctx: context.context().create_executor_mem_context(&identity),
2352 }
2353 .dispatch())
2354 }
2355}
2356
2357struct HashJoinExecutorArgs {
2358 join_type: JoinType,
2359 output_indices: Vec<usize>,
2360 probe_side_source: BoxedExecutor,
2361 build_side_source: BoxedExecutor,
2362 probe_key_idxs: Vec<usize>,
2363 build_key_idxs: Vec<usize>,
2364 null_matched: Vec<bool>,
2365 cond: Option<BoxedExpression>,
2366 identity: String,
2367 right_key_types: Vec<DataType>,
2368 chunk_size: usize,
2369 asof_desc: Option<AsOfDesc>,
2370 spill_backend: Option<SpillBackend>,
2371 spill_metrics: Arc<BatchSpillMetrics>,
2372 shutdown_rx: ShutdownToken,
2373 mem_ctx: MemoryContext,
2374}
2375
2376impl HashKeyDispatcher for HashJoinExecutorArgs {
2377 type Output = BoxedExecutor;
2378
2379 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
2380 Box::new(HashJoinExecutor::<K>::new(
2381 self.join_type,
2382 self.output_indices,
2383 self.probe_side_source,
2384 self.build_side_source,
2385 self.probe_key_idxs,
2386 self.build_key_idxs,
2387 self.null_matched,
2388 self.cond.map(Arc::new),
2389 self.identity,
2390 self.chunk_size,
2391 self.asof_desc,
2392 self.spill_backend,
2393 self.spill_metrics,
2394 self.shutdown_rx,
2395 self.mem_ctx,
2396 ))
2397 }
2398
2399 fn data_types(&self) -> &[DataType] {
2400 &self.right_key_types
2401 }
2402}
2403
2404impl<K> HashJoinExecutor<K> {
2405 #[allow(clippy::too_many_arguments)]
2406 pub fn new(
2407 join_type: JoinType,
2408 output_indices: Vec<usize>,
2409 probe_side_source: BoxedExecutor,
2410 build_side_source: BoxedExecutor,
2411 probe_key_idxs: Vec<usize>,
2412 build_key_idxs: Vec<usize>,
2413 null_matched: Vec<bool>,
2414 cond: Option<Arc<BoxedExpression>>,
2415 identity: String,
2416 chunk_size: usize,
2417 asof_desc: Option<AsOfDesc>,
2418 spill_backend: Option<SpillBackend>,
2419 spill_metrics: Arc<BatchSpillMetrics>,
2420 shutdown_rx: ShutdownToken,
2421 mem_ctx: MemoryContext,
2422 ) -> Self {
2423 Self::new_inner(
2424 join_type,
2425 output_indices,
2426 probe_side_source,
2427 build_side_source,
2428 probe_key_idxs,
2429 build_key_idxs,
2430 null_matched,
2431 cond,
2432 identity,
2433 chunk_size,
2434 asof_desc,
2435 spill_backend,
2436 spill_metrics,
2437 None,
2438 shutdown_rx,
2439 mem_ctx,
2440 )
2441 }
2442
2443 #[allow(clippy::too_many_arguments)]
2444 fn new_inner(
2445 join_type: JoinType,
2446 output_indices: Vec<usize>,
2447 probe_side_source: BoxedExecutor,
2448 build_side_source: BoxedExecutor,
2449 probe_key_idxs: Vec<usize>,
2450 build_key_idxs: Vec<usize>,
2451 null_matched: Vec<bool>,
2452 cond: Option<Arc<BoxedExpression>>,
2453 identity: String,
2454 chunk_size: usize,
2455 asof_desc: Option<AsOfDesc>,
2456 spill_backend: Option<SpillBackend>,
2457 spill_metrics: Arc<BatchSpillMetrics>,
2458 memory_upper_bound: Option<u64>,
2459 shutdown_rx: ShutdownToken,
2460 mem_ctx: MemoryContext,
2461 ) -> Self {
2462 assert_eq!(probe_key_idxs.len(), build_key_idxs.len());
2463 assert_eq!(probe_key_idxs.len(), null_matched.len());
2464 let original_schema = match join_type {
2465 JoinType::LeftSemi | JoinType::LeftAnti => probe_side_source.schema().clone(),
2466 JoinType::RightSemi | JoinType::RightAnti => build_side_source.schema().clone(),
2467 _ => Schema::from_iter(
2468 probe_side_source
2469 .schema()
2470 .fields()
2471 .iter()
2472 .chain(build_side_source.schema().fields().iter())
2473 .cloned(),
2474 ),
2475 };
2476 let schema = Schema::from_iter(
2477 output_indices
2478 .iter()
2479 .map(|&idx| original_schema[idx].clone()),
2480 );
2481 Self {
2482 join_type,
2483 original_schema,
2484 schema,
2485 output_indices,
2486 probe_side_source,
2487 build_side_source,
2488 probe_key_idxs,
2489 build_key_idxs,
2490 null_matched,
2491 cond,
2492 identity,
2493 chunk_size,
2494 asof_desc,
2495 shutdown_rx,
2496 spill_backend,
2497 spill_metrics,
2498 memory_upper_bound,
2499 mem_ctx,
2500 _phantom: PhantomData,
2501 }
2502 }
2503}
2504
2505#[cfg(test)]
2506mod tests {
2507 use futures::StreamExt;
2508 use futures_async_stream::for_await;
2509 use itertools::Itertools;
2510 use risingwave_common::array::{ArrayBuilderImpl, DataChunk};
2511 use risingwave_common::catalog::{Field, Schema};
2512 use risingwave_common::hash::Key32;
2513 use risingwave_common::memory::MemoryContext;
2514 use risingwave_common::metrics::LabelGuardedIntGauge;
2515 use risingwave_common::test_prelude::DataChunkTestExt;
2516 use risingwave_common::types::DataType;
2517 use risingwave_common::util::iter_util::ZipEqDebug;
2518 use risingwave_common::util::memcmp_encoding::encode_chunk;
2519 use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
2520 use risingwave_expr::expr::{BoxedExpression, build_from_pretty};
2521
2522 use super::{
2523 ChunkedData, HashJoinExecutor, JoinType, LeftNonEquiJoinState, RightNonEquiJoinState, RowId,
2524 };
2525 use crate::error::Result;
2526 use crate::executor::BoxedExecutor;
2527 use crate::executor::test_utils::MockExecutor;
2528 use crate::monitor::BatchSpillMetrics;
2529 use crate::spill::spill_op::SpillBackend;
2530 use crate::task::ShutdownToken;
2531
2532 const CHUNK_SIZE: usize = 1024;
2533
2534 struct DataChunkMerger {
2535 array_builders: Vec<ArrayBuilderImpl>,
2536 array_len: usize,
2537 }
2538
2539 impl DataChunkMerger {
2540 fn new(data_types: Vec<DataType>) -> Result<Self> {
2541 let array_builders = data_types
2542 .iter()
2543 .map(|data_type| data_type.create_array_builder(CHUNK_SIZE))
2544 .collect();
2545
2546 Ok(Self {
2547 array_builders,
2548 array_len: 0,
2549 })
2550 }
2551
2552 fn append(&mut self, data_chunk: &DataChunk) -> Result<()> {
2553 ensure!(self.array_builders.len() == data_chunk.dimension());
2554 for idx in 0..self.array_builders.len() {
2555 self.array_builders[idx].append_array(data_chunk.column_at(idx));
2556 }
2557 self.array_len += data_chunk.capacity();
2558
2559 Ok(())
2560 }
2561
2562 fn finish(self) -> Result<DataChunk> {
2563 let columns = self
2564 .array_builders
2565 .into_iter()
2566 .map(|b| b.finish().into())
2567 .collect();
2568
2569 Ok(DataChunk::new(columns, self.array_len))
2570 }
2571 }
2572
2573 fn compare_data_chunk_with_rowsort(left: &DataChunk, right: &DataChunk) -> bool {
2575 assert!(left.is_compacted());
2576 assert!(right.is_compacted());
2577
2578 if left.cardinality() != right.cardinality() {
2579 return false;
2580 }
2581
2582 let column_orders = (0..left.columns().len())
2584 .map(|i| ColumnOrder::new(i, OrderType::ascending()))
2585 .collect_vec();
2586 let left_encoded_chunk = encode_chunk(left, &column_orders).unwrap();
2587 let mut sorted_left = left_encoded_chunk
2588 .into_iter()
2589 .enumerate()
2590 .map(|(row_id, row)| (left.row_at_unchecked_vis(row_id), row))
2591 .collect_vec();
2592 sorted_left.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
2593
2594 let right_encoded_chunk = encode_chunk(right, &column_orders).unwrap();
2595 let mut sorted_right = right_encoded_chunk
2596 .into_iter()
2597 .enumerate()
2598 .map(|(row_id, row)| (right.row_at_unchecked_vis(row_id), row))
2599 .collect_vec();
2600 sorted_right.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
2601
2602 sorted_left
2603 .into_iter()
2604 .map(|(row, _)| row)
2605 .zip_eq_debug(sorted_right.into_iter().map(|(row, _)| row))
2606 .all(|(row1, row2)| row1 == row2)
2607 }
2608
2609 struct TestFixture {
2610 left_types: Vec<DataType>,
2611 right_types: Vec<DataType>,
2612 join_type: JoinType,
2613 }
2614
2615 impl TestFixture {
2632 fn with_join_type(join_type: JoinType) -> Self {
2633 Self {
2634 left_types: vec![DataType::Int32, DataType::Float32],
2635 right_types: vec![DataType::Int32, DataType::Float64],
2636 join_type,
2637 }
2638 }
2639
2640 fn create_left_executor(&self) -> BoxedExecutor {
2641 let schema = Schema {
2642 fields: vec![
2643 Field::unnamed(DataType::Int32),
2644 Field::unnamed(DataType::Float32),
2645 ],
2646 };
2647 let mut executor = MockExecutor::new(schema);
2648
2649 executor.add(DataChunk::from_pretty(
2650 "i f
2651 1 6.1
2652 2 .
2653 . 8.4
2654 3 3.9
2655 . . ",
2656 ));
2657
2658 executor.add(DataChunk::from_pretty(
2659 "i f
2660 4 6.6
2661 3 .
2662 . 0.7
2663 5 .
2664 . 5.5",
2665 ));
2666
2667 Box::new(executor)
2668 }
2669
2670 fn create_right_executor(&self) -> BoxedExecutor {
2671 let schema = Schema {
2672 fields: vec![
2673 Field::unnamed(DataType::Int32),
2674 Field::unnamed(DataType::Float64),
2675 ],
2676 };
2677 let mut executor = MockExecutor::new(schema);
2678
2679 executor.add(DataChunk::from_pretty(
2680 "i F
2681 8 6.1
2682 2 .
2683 . 8.9
2684 3 .
2685 . 3.5
2686 6 . ",
2687 ));
2688
2689 executor.add(DataChunk::from_pretty(
2690 "i F
2691 4 7.5
2692 6 .
2693 . 8
2694 7 .
2695 . 9.1
2696 9 . ",
2697 ));
2698
2699 executor.add(DataChunk::from_pretty(
2700 " i F
2701 3 3.7
2702 9 .
2703 . 9.6
2704 100 .
2705 . 8.18
2706 200 . ",
2707 ));
2708
2709 Box::new(executor)
2710 }
2711
2712 fn output_data_types(&self) -> Vec<DataType> {
2713 let join_type = self.join_type;
2714 if join_type.keep_all() {
2715 [self.left_types.clone(), self.right_types.clone()].concat()
2716 } else if join_type.keep_left() {
2717 self.left_types.clone()
2718 } else if join_type.keep_right() {
2719 self.right_types.clone()
2720 } else {
2721 unreachable!()
2722 }
2723 }
2724
2725 fn create_cond() -> BoxedExpression {
2726 build_from_pretty("(less_than:boolean $1:float4 $3:float8)")
2727 }
2728
2729 fn create_join_executor_with_chunk_size_and_executors(
2730 &self,
2731 has_non_equi_cond: bool,
2732 null_safe: bool,
2733 chunk_size: usize,
2734 left_child: BoxedExecutor,
2735 right_child: BoxedExecutor,
2736 shutdown_rx: ShutdownToken,
2737 parent_mem_ctx: Option<MemoryContext>,
2738 test_spill: bool,
2739 ) -> BoxedExecutor {
2740 let join_type = self.join_type;
2741
2742 let output_indices = (0..match join_type {
2743 JoinType::LeftSemi | JoinType::LeftAnti => left_child.schema().fields().len(),
2744 JoinType::RightSemi | JoinType::RightAnti => right_child.schema().fields().len(),
2745 _ => left_child.schema().fields().len() + right_child.schema().fields().len(),
2746 })
2747 .collect();
2748
2749 let cond = if has_non_equi_cond {
2750 Some(Self::create_cond().into())
2751 } else {
2752 None
2753 };
2754
2755 let mem_ctx = if test_spill {
2756 MemoryContext::new_with_mem_limit(
2757 parent_mem_ctx,
2758 LabelGuardedIntGauge::test_int_gauge::<4>(),
2759 0,
2760 )
2761 } else {
2762 MemoryContext::new(parent_mem_ctx, LabelGuardedIntGauge::test_int_gauge::<4>())
2763 };
2764 Box::new(HashJoinExecutor::<Key32>::new(
2765 join_type,
2766 output_indices,
2767 left_child,
2768 right_child,
2769 vec![0],
2770 vec![0],
2771 vec![null_safe],
2772 cond,
2773 "HashJoinExecutor".to_owned(),
2774 chunk_size,
2775 None,
2776 if test_spill {
2777 Some(SpillBackend::Memory)
2778 } else {
2779 None
2780 },
2781 BatchSpillMetrics::for_test(),
2782 shutdown_rx,
2783 mem_ctx,
2784 ))
2785 }
2786
2787 async fn do_test(&self, expected: DataChunk, has_non_equi_cond: bool, null_safe: bool) {
2788 let left_executor = self.create_left_executor();
2789 let right_executor = self.create_right_executor();
2790 self.do_test_with_chunk_size_and_executors(
2791 expected.clone(),
2792 has_non_equi_cond,
2793 null_safe,
2794 self::CHUNK_SIZE,
2795 left_executor,
2796 right_executor,
2797 false,
2798 )
2799 .await;
2800
2801 let left_executor = self.create_left_executor();
2803 let right_executor = self.create_right_executor();
2804 self.do_test_with_chunk_size_and_executors(
2805 expected,
2806 has_non_equi_cond,
2807 null_safe,
2808 self::CHUNK_SIZE,
2809 left_executor,
2810 right_executor,
2811 true,
2812 )
2813 .await;
2814 }
2815
2816 async fn do_test_with_chunk_size_and_executors(
2817 &self,
2818 expected: DataChunk,
2819 has_non_equi_cond: bool,
2820 null_safe: bool,
2821 chunk_size: usize,
2822 left_executor: BoxedExecutor,
2823 right_executor: BoxedExecutor,
2824 test_spill: bool,
2825 ) {
2826 let parent_mem_context =
2827 MemoryContext::root(LabelGuardedIntGauge::test_int_gauge::<4>(), u64::MAX);
2828
2829 {
2830 let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2831 has_non_equi_cond,
2832 null_safe,
2833 chunk_size,
2834 left_executor,
2835 right_executor,
2836 ShutdownToken::empty(),
2837 Some(parent_mem_context.clone()),
2838 test_spill,
2839 );
2840
2841 let mut data_chunk_merger = DataChunkMerger::new(self.output_data_types()).unwrap();
2842
2843 let fields = &join_executor.schema().fields;
2844
2845 if self.join_type.keep_all() {
2846 assert_eq!(fields[1].data_type, DataType::Float32);
2847 assert_eq!(fields[3].data_type, DataType::Float64);
2848 } else if self.join_type.keep_left() {
2849 assert_eq!(fields[1].data_type, DataType::Float32);
2850 } else if self.join_type.keep_right() {
2851 assert_eq!(fields[1].data_type, DataType::Float64)
2852 } else {
2853 unreachable!()
2854 }
2855
2856 let mut stream = join_executor.execute();
2857
2858 while let Some(data_chunk) = stream.next().await {
2859 let data_chunk = data_chunk.unwrap();
2860 let data_chunk = data_chunk.compact();
2861 data_chunk_merger.append(&data_chunk).unwrap();
2862 }
2863
2864 let result_chunk = data_chunk_merger.finish().unwrap();
2865 println!("expected: {:?}", expected);
2866 println!("result: {:?}", result_chunk);
2867
2868 assert!(compare_data_chunk_with_rowsort(&expected, &result_chunk));
2871 }
2872
2873 assert_eq!(0, parent_mem_context.get_bytes_used());
2874 }
2875
2876 async fn do_test_shutdown(&self, has_non_equi_cond: bool) {
2877 let left_executor = self.create_left_executor();
2879 let right_executor = self.create_right_executor();
2880 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
2881 let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2882 has_non_equi_cond,
2883 false,
2884 self::CHUNK_SIZE,
2885 left_executor,
2886 right_executor,
2887 shutdown_rx,
2888 None,
2889 false,
2890 );
2891 shutdown_tx.cancel();
2892 #[for_await]
2893 for chunk in join_executor.execute() {
2894 assert!(chunk.is_err());
2895 break;
2896 }
2897
2898 let left_executor = self.create_left_executor();
2900 let right_executor = self.create_right_executor();
2901 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
2902 let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2903 has_non_equi_cond,
2904 false,
2905 self::CHUNK_SIZE,
2906 left_executor,
2907 right_executor,
2908 shutdown_rx,
2909 None,
2910 false,
2911 );
2912 shutdown_tx.abort("test");
2913 #[for_await]
2914 for chunk in join_executor.execute() {
2915 assert!(chunk.is_err());
2916 break;
2917 }
2918 }
2919 }
2920
2921 #[tokio::test]
2926 async fn test_inner_join() {
2927 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2928
2929 let expected_chunk = DataChunk::from_pretty(
2930 "i f i F
2931 2 . 2 .
2932 3 3.9 3 3.7
2933 3 3.9 3 .
2934 4 6.6 4 7.5
2935 3 . 3 3.7
2936 3 . 3 .",
2937 );
2938
2939 test_fixture.do_test(expected_chunk, false, false).await;
2940 }
2941
2942 #[tokio::test]
2947 async fn test_null_safe_inner_join() {
2948 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2949
2950 let expected_chunk = DataChunk::from_pretty(
2951 "i f i F
2952 2 . 2 .
2953 . 8.4 . 8.18
2954 . 8.4 . 9.6
2955 . 8.4 . 9.1
2956 . 8.4 . 8
2957 . 8.4 . 3.5
2958 . 8.4 . 8.9
2959 3 3.9 3 3.7
2960 3 3.9 3 .
2961 . . . 8.18
2962 . . . 9.6
2963 . . . 9.1
2964 . . . 8
2965 . . . 3.5
2966 . . . 8.9
2967 4 6.6 4 7.5
2968 3 . 3 3.7
2969 3 . 3 .
2970 . 0.7 . 8.18
2971 . 0.7 . 9.6
2972 . 0.7 . 9.1
2973 . 0.7 . 8
2974 . 0.7 . 3.5
2975 . 0.7 . 8.9
2976 . 5.5 . 8.18
2977 . 5.5 . 9.6
2978 . 5.5 . 9.1
2979 . 5.5 . 8
2980 . 5.5 . 3.5
2981 . 5.5 . 8.9",
2982 );
2983
2984 test_fixture.do_test(expected_chunk, false, true).await;
2985 }
2986
2987 #[tokio::test]
2992 async fn test_inner_join_with_non_equi_condition() {
2993 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2994
2995 let expected_chunk = DataChunk::from_pretty(
2996 "i f i F
2997 4 6.6 4 7.5",
2998 );
2999
3000 test_fixture.do_test(expected_chunk, true, false).await;
3001 }
3002
3003 #[tokio::test]
3008 async fn test_left_outer_join() {
3009 let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
3010
3011 let expected_chunk = DataChunk::from_pretty(
3012 "i f i F
3013 1 6.1 . .
3014 2 . 2 .
3015 . 8.4 . .
3016 3 3.9 3 3.7
3017 3 3.9 3 .
3018 . . . .
3019 4 6.6 4 7.5
3020 3 . 3 3.7
3021 3 . 3 .
3022 . 0.7 . .
3023 5 . . .
3024 . 5.5 . .",
3025 );
3026
3027 test_fixture.do_test(expected_chunk, false, false).await;
3028 }
3029
3030 #[tokio::test]
3035 async fn test_left_outer_join_with_non_equi_condition() {
3036 let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
3037
3038 let expected_chunk = DataChunk::from_pretty(
3039 "i f i F
3040 2 . . .
3041 3 3.9 . .
3042 4 6.6 4 7.5
3043 3 . . .
3044 1 6.1 . .
3045 . 8.4 . .
3046 . . . .
3047 . 0.7 . .
3048 5 . . .
3049 . 5.5 . .",
3050 );
3051
3052 test_fixture.do_test(expected_chunk, true, false).await;
3053 }
3054
3055 #[tokio::test]
3060 async fn test_right_outer_join() {
3061 let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
3062
3063 let expected_chunk = DataChunk::from_pretty(
3064 "i f i F
3065 2 . 2 .
3066 3 3.9 3 3.7
3067 3 3.9 3 .
3068 4 6.6 4 7.5
3069 3 . 3 3.7
3070 3 . 3 .
3071 . . 8 6.1
3072 . . . 8.9
3073 . . . 3.5
3074 . . 6 .
3075 . . 6 .
3076 . . . 8
3077 . . 7 .
3078 . . . 9.1
3079 . . 9 .
3080 . . 9 .
3081 . . . 9.6
3082 . . 100 .
3083 . . . 8.18
3084 . . 200 .",
3085 );
3086
3087 test_fixture.do_test(expected_chunk, false, false).await;
3088 }
3089
3090 #[tokio::test]
3095 async fn test_right_outer_join_with_non_equi_condition() {
3096 let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
3097
3098 let expected_chunk = DataChunk::from_pretty(
3099 "i f i F
3100 4 6.6 4 7.5
3101 . . 8 6.1
3102 . . 2 .
3103 . . . 8.9
3104 . . 3 .
3105 . . . 3.5
3106 . . 6 .
3107 . . 6 .
3108 . . . 8
3109 . . 7 .
3110 . . . 9.1
3111 . . 9 .
3112 . . 3 3.7
3113 . . 9 .
3114 . . . 9.6
3115 . . 100 .
3116 . . . 8.18
3117 . . 200 .",
3118 );
3119
3120 test_fixture.do_test(expected_chunk, true, false).await;
3121 }
3122
3123 #[tokio::test]
3127 async fn test_full_outer_join() {
3128 let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
3129
3130 let expected_chunk = DataChunk::from_pretty(
3131 "i f i F
3132 1 6.1 . .
3133 2 . 2 .
3134 . 8.4 . .
3135 3 3.9 3 3.7
3136 3 3.9 3 .
3137 . . . .
3138 4 6.6 4 7.5
3139 3 . 3 3.7
3140 3 . 3 .
3141 . 0.7 . .
3142 5 . . .
3143 . 5.5 . .
3144 . . 8 6.1
3145 . . . 8.9
3146 . . . 3.5
3147 . . 6 .
3148 . . 6 .
3149 . . . 8
3150 . . 7 .
3151 . . . 9.1
3152 . . 9 .
3153 . . 9 .
3154 . . . 9.6
3155 . . 100 .
3156 . . . 8.18
3157 . . 200 .",
3158 );
3159
3160 test_fixture.do_test(expected_chunk, false, false).await;
3161 }
3162
3163 #[tokio::test]
3167 async fn test_full_outer_join_with_non_equi_condition() {
3168 let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
3169
3170 let expected_chunk = DataChunk::from_pretty(
3171 "i f i F
3172 2 . . .
3173 3 3.9 . .
3174 4 6.6 4 7.5
3175 3 . . .
3176 1 6.1 . .
3177 . 8.4 . .
3178 . . . .
3179 . 0.7 . .
3180 5 . . .
3181 . 5.5 . .
3182 . . 8 6.1
3183 . . 2 .
3184 . . . 8.9
3185 . . 3 .
3186 . . . 3.5
3187 . . 6 .
3188 . . 6 .
3189 . . . 8
3190 . . 7 .
3191 . . . 9.1
3192 . . 9 .
3193 . . 3 3.7
3194 . . 9 .
3195 . . . 9.6
3196 . . 100 .
3197 . . . 8.18
3198 . . 200 .",
3199 );
3200
3201 test_fixture.do_test(expected_chunk, true, false).await;
3202 }
3203
3204 #[tokio::test]
3205 async fn test_left_anti_join() {
3206 let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
3207
3208 let expected_chunk = DataChunk::from_pretty(
3209 "i f
3210 1 6.1
3211 . 8.4
3212 . .
3213 . 0.7
3214 5 .
3215 . 5.5",
3216 );
3217
3218 test_fixture.do_test(expected_chunk, false, false).await;
3219 }
3220
3221 #[tokio::test]
3222 async fn test_left_anti_join_with_non_equi_condition() {
3223 let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
3224
3225 let expected_chunk = DataChunk::from_pretty(
3226 "i f
3227 2 .
3228 3 3.9
3229 3 .
3230 1 6.1
3231 . 8.4
3232 . .
3233 . 0.7
3234 5 .
3235 . 5.5",
3236 );
3237
3238 test_fixture.do_test(expected_chunk, true, false).await;
3239 }
3240
3241 #[tokio::test]
3242 async fn test_left_semi_join() {
3243 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3244
3245 let expected_chunk = DataChunk::from_pretty(
3246 "i f
3247 2 .
3248 3 3.9
3249 4 6.6
3250 3 .",
3251 );
3252
3253 test_fixture.do_test(expected_chunk, false, false).await;
3254 }
3255
3256 #[tokio::test]
3257 async fn test_left_semi_join_with_non_equi_condition() {
3258 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3259
3260 let expected_chunk = DataChunk::from_pretty(
3261 "i f
3262 4 6.6",
3263 );
3264
3265 test_fixture.do_test(expected_chunk, true, false).await;
3266 }
3267
3268 #[tokio::test]
3273 async fn test_left_semi_join_with_non_equi_condition_duplicates() {
3274 let schema = Schema {
3275 fields: vec![
3276 Field::unnamed(DataType::Int32),
3277 Field::unnamed(DataType::Float32),
3278 ],
3279 };
3280
3281 let mut left_executor = MockExecutor::new(schema);
3283 left_executor.add(DataChunk::from_pretty(
3284 "i f
3285 1 1.0
3286 1 1.0
3287 1 1.0
3288 1 1.0
3289 2 1.0",
3290 ));
3291
3292 let schema = Schema {
3294 fields: vec![
3295 Field::unnamed(DataType::Int32),
3296 Field::unnamed(DataType::Float64),
3297 ],
3298 };
3299 let mut right_executor = MockExecutor::new(schema);
3300 right_executor.add(DataChunk::from_pretty(
3301 "i F
3302 1 2.0
3303 1 2.0
3304 1 2.0
3305 1 2.0
3306 2 2.0",
3307 ));
3308
3309 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3310 let expected_chunk = DataChunk::from_pretty(
3311 "i f
3312 1 1.0
3313 1 1.0
3314 1 1.0
3315 1 1.0
3316 2 1.0",
3317 );
3318
3319 test_fixture
3320 .do_test_with_chunk_size_and_executors(
3321 expected_chunk,
3322 true,
3323 false,
3324 3,
3325 Box::new(left_executor),
3326 Box::new(right_executor),
3327 false,
3328 )
3329 .await;
3330 }
3331
3332 #[tokio::test]
3333 async fn test_right_anti_join() {
3334 let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
3335
3336 let expected_chunk = DataChunk::from_pretty(
3337 "i F
3338 8 6.1
3339 . 8.9
3340 . 3.5
3341 6 .
3342 6 .
3343 . 8.0
3344 7 .
3345 . 9.1
3346 9 .
3347 9 .
3348 . 9.6
3349 100 .
3350 . 8.18
3351 200 .",
3352 );
3353
3354 test_fixture.do_test(expected_chunk, false, false).await;
3355 }
3356
3357 #[tokio::test]
3358 async fn test_right_anti_join_with_non_equi_condition() {
3359 let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
3360
3361 let expected_chunk = DataChunk::from_pretty(
3362 "i F
3363 8 6.1
3364 2 .
3365 . 8.9
3366 3 .
3367 . 3.5
3368 6 .
3369 6 .
3370 . 8
3371 7 .
3372 . 9.1
3373 9 .
3374 3 3.7
3375 9 .
3376 . 9.6
3377 100 .
3378 . 8.18
3379 200 .",
3380 );
3381
3382 test_fixture.do_test(expected_chunk, true, false).await;
3383 }
3384
3385 #[tokio::test]
3386 async fn test_right_semi_join() {
3387 let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
3388
3389 let expected_chunk = DataChunk::from_pretty(
3390 "i F
3391 2 .
3392 3 .
3393 4 7.5
3394 3 3.7",
3395 );
3396
3397 test_fixture.do_test(expected_chunk, false, false).await;
3398 }
3399
3400 #[tokio::test]
3401 async fn test_right_semi_join_with_non_equi_condition() {
3402 let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
3403
3404 let expected_chunk = DataChunk::from_pretty(
3405 "i F
3406 4 7.5",
3407 );
3408
3409 test_fixture.do_test(expected_chunk, true, false).await;
3410 }
3411
3412 #[tokio::test]
3413 async fn test_process_left_outer_join_non_equi_condition() {
3414 let chunk = DataChunk::from_pretty(
3415 "i f i F
3416 1 3.5 1 5.5
3417 1 3.5 1 2.5
3418 2 4.0 . .
3419 3 5.0 3 4.0
3420 3 5.0 3 3.0
3421 3 5.0 3 4.0
3422 3 5.0 3 3.0
3423 4 1.0 4 0
3424 4 1.0 4 9.0",
3425 );
3426 let expect = DataChunk::from_pretty(
3427 "i f i F
3428 1 3.5 1 5.5
3429 2 4.0 . .
3430 3 5.0 . .
3431 3 5.0 . .
3432 4 1.0 4 9.0",
3433 );
3434 let cond = TestFixture::create_cond();
3435 let mut state = LeftNonEquiJoinState {
3436 probe_column_count: 2,
3437 first_output_row_id: vec![0, 2, 3, 5, 7],
3438 has_more_output_rows: true,
3439 found_matched: false,
3440 };
3441 assert!(compare_data_chunk_with_rowsort(
3442 &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3443 chunk,
3444 cond.as_ref(),
3445 &mut state
3446 )
3447 .await
3448 .unwrap()
3449 .compact(),
3450 &expect
3451 ));
3452 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3453 assert!(state.found_matched);
3454
3455 let chunk = DataChunk::from_pretty(
3456 "i f i F
3457 4 1.0 4 0.6
3458 4 1.0 4 2.0
3459 5 4.0 5 .
3460 6 7.0 6 .
3461 6 7.0 6 5.0",
3462 );
3463 let expect = DataChunk::from_pretty(
3464 "i f i F
3465 4 1.0 4 2.0
3466 5 4.0 . .
3467 6 7.0 . .",
3468 );
3469 state.first_output_row_id = vec![2, 3];
3470 state.has_more_output_rows = false;
3471 assert!(compare_data_chunk_with_rowsort(
3472 &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3473 chunk,
3474 cond.as_ref(),
3475 &mut state
3476 )
3477 .await
3478 .unwrap()
3479 .compact(),
3480 &expect
3481 ));
3482 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3483 assert!(!state.found_matched);
3484
3485 let chunk = DataChunk::from_pretty(
3486 "i f i F
3487 4 1.0 4 0.6
3488 4 1.0 4 1.0
3489 5 4.0 5 .
3490 6 7.0 6 .
3491 6 7.0 6 8.0",
3492 );
3493 let expect = DataChunk::from_pretty(
3494 "i f i F
3495 4 1.0 . .
3496 5 4.0 . .
3497 6 7.0 6 8.0",
3498 );
3499 state.first_output_row_id = vec![2, 3];
3500 state.has_more_output_rows = false;
3501 assert!(compare_data_chunk_with_rowsort(
3502 &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3503 chunk,
3504 cond.as_ref(),
3505 &mut state
3506 )
3507 .await
3508 .unwrap()
3509 .compact(),
3510 &expect
3511 ));
3512 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3513 assert!(!state.found_matched);
3514 }
3515
3516 #[tokio::test]
3517 async fn test_process_left_semi_join_non_equi_condition() {
3518 let chunk = DataChunk::from_pretty(
3519 "i f i F
3520 1 3.5 1 5.5
3521 1 3.5 1 2.5
3522 2 4.0 . .
3523 3 5.0 3 4.0
3524 3 5.0 3 3.0
3525 3 5.0 3 4.0
3526 3 5.0 3 3.0
3527 4 1.0 4 0
3528 4 1.0 4 0.5",
3529 );
3530 let expect = DataChunk::from_pretty(
3531 "i f i F
3532 1 3.5 1 5.5",
3533 );
3534 let cond = TestFixture::create_cond();
3535 let mut state = LeftNonEquiJoinState {
3536 probe_column_count: 2,
3537 first_output_row_id: vec![0, 2, 3, 5, 7],
3538 found_matched: false,
3539 ..Default::default()
3540 };
3541 assert!(compare_data_chunk_with_rowsort(
3542 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3543 chunk,
3544 cond.as_ref(),
3545 &mut state
3546 )
3547 .await
3548 .unwrap()
3549 .compact(),
3550 &expect
3551 ));
3552 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3553 assert!(!state.found_matched);
3554
3555 let chunk = DataChunk::from_pretty(
3556 "i f i F
3557 4 1.0 4 0.6
3558 4 1.0 4 2.0
3559 5 4.0 5 .
3560 6 7.0 6 .
3561 6 7.0 6 5.0",
3562 );
3563 let expect = DataChunk::from_pretty(
3564 "i f i F
3565 4 1.0 4 2.0",
3566 );
3567 state.first_output_row_id = vec![2, 3];
3568 assert!(compare_data_chunk_with_rowsort(
3569 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3570 chunk,
3571 cond.as_ref(),
3572 &mut state
3573 )
3574 .await
3575 .unwrap()
3576 .compact(),
3577 &expect
3578 ));
3579 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3580 assert!(!state.found_matched);
3581
3582 let chunk = DataChunk::from_pretty(
3583 "i f i F
3584 4 1.0 4 0.6
3585 4 1.0 4 1.0
3586 5 4.0 5 .
3587 6 7.0 6 .
3588 6 7.0 6 8.0",
3589 );
3590 let expect = DataChunk::from_pretty(
3591 "i f i F
3592 6 7.0 6 8.0",
3593 );
3594 state.first_output_row_id = vec![2, 3];
3595 assert!(compare_data_chunk_with_rowsort(
3596 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3597 chunk,
3598 cond.as_ref(),
3599 &mut state
3600 )
3601 .await
3602 .unwrap()
3603 .compact(),
3604 &expect
3605 ));
3606 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3607 }
3608
3609 #[tokio::test]
3610 async fn test_process_left_anti_join_non_equi_condition() {
3611 let chunk = DataChunk::from_pretty(
3612 "i f i F
3613 1 3.5 1 5.5
3614 1 3.5 1 2.5
3615 2 4.0 . .
3616 3 5.0 3 4.0
3617 3 5.0 3 3.0
3618 3 5.0 3 4.0
3619 3 5.0 3 3.0
3620 4 1.0 4 0
3621 4 1.0 4 0.5",
3622 );
3623 let expect = DataChunk::from_pretty(
3624 "i f i F
3625 2 4.0 . .
3626 3 5.0 3 4.0
3627 3 5.0 3 4.0",
3628 );
3629 let cond = TestFixture::create_cond();
3630 let mut state = LeftNonEquiJoinState {
3631 probe_column_count: 2,
3632 first_output_row_id: vec![0, 2, 3, 5, 7],
3633 has_more_output_rows: true,
3634 found_matched: false,
3635 };
3636 assert!(compare_data_chunk_with_rowsort(
3637 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3638 chunk,
3639 cond.as_ref(),
3640 &mut state
3641 )
3642 .await
3643 .unwrap()
3644 .compact(),
3645 &expect
3646 ));
3647 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3648 assert!(!state.found_matched);
3649
3650 let chunk = DataChunk::from_pretty(
3651 "i f i F
3652 4 1.0 4 0.6
3653 4 1.0 4 2.0
3654 5 4.0 5 .
3655 6 7.0 6 .
3656 6 7.0 6 5.0",
3657 );
3658 let expect = DataChunk::from_pretty(
3659 "i f i F
3660 5 4.0 5 .
3661 6 7.0 6 .",
3662 );
3663 state.first_output_row_id = vec![2, 3];
3664 state.has_more_output_rows = false;
3665 assert!(compare_data_chunk_with_rowsort(
3666 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3667 chunk,
3668 cond.as_ref(),
3669 &mut state
3670 )
3671 .await
3672 .unwrap()
3673 .compact(),
3674 &expect
3675 ));
3676 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3677 assert!(!state.found_matched);
3678
3679 let chunk = DataChunk::from_pretty(
3680 "i f i F
3681 4 1.0 4 0.6
3682 4 1.0 4 1.0
3683 5 4.0 5 .
3684 6 7.0 6 .
3685 6 7.0 6 8.0",
3686 );
3687 let expect = DataChunk::from_pretty(
3688 "i f i F
3689 4 1.0 4 0.6
3690 5 4.0 5 .",
3691 );
3692 state.first_output_row_id = vec![2, 3];
3693 state.has_more_output_rows = false;
3694 assert!(compare_data_chunk_with_rowsort(
3695 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3696 chunk,
3697 cond.as_ref(),
3698 &mut state
3699 )
3700 .await
3701 .unwrap()
3702 .compact(),
3703 &expect
3704 ));
3705 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3706 }
3707
3708 #[tokio::test]
3709 async fn test_process_right_outer_join_non_equi_condition() {
3710 let chunk = DataChunk::from_pretty(
3711 "i f i F
3712 1 3.5 1 5.5
3713 1 3.5 1 2.5
3714 3 5.0 3 4.0
3715 3 5.0 3 3.0
3716 3 5.0 3 4.0
3717 3 5.0 3 3.0
3718 4 1.0 4 0
3719 4 1.0 4 0.5",
3720 );
3721 let expect = DataChunk::from_pretty(
3722 "i f i F
3723 1 3.5 1 5.5",
3724 );
3725 let cond = TestFixture::create_cond();
3726 let build_row_matched = ChunkedData::with_chunk_sizes([14].into_iter()).unwrap();
3744 let mut state = RightNonEquiJoinState {
3745 build_row_ids: vec![
3746 RowId::new(0, 0),
3747 RowId::new(0, 1),
3748 RowId::new(0, 3),
3749 RowId::new(0, 4),
3750 RowId::new(0, 3),
3751 RowId::new(0, 4),
3752 RowId::new(0, 5),
3753 RowId::new(0, 7),
3754 ],
3755 build_row_matched,
3756 };
3757 assert!(compare_data_chunk_with_rowsort(
3758 &HashJoinExecutor::<Key32>::process_right_outer_join_non_equi_condition(
3759 chunk,
3760 cond.as_ref(),
3761 &mut state
3762 )
3763 .await
3764 .unwrap()
3765 .compact(),
3766 &expect
3767 ));
3768 assert_eq!(state.build_row_ids, Vec::new());
3769 assert_eq!(
3770 state.build_row_matched,
3771 ChunkedData::try_from(vec![{
3772 let mut v = vec![false; 14];
3773 v[0] = true;
3774 v
3775 }])
3776 .unwrap()
3777 );
3778
3779 let chunk = DataChunk::from_pretty(
3780 "i f i F
3781 4 1.0 4 0.6
3782 4 1.0 4 2.0
3783 5 4.0 5 .
3784 6 7.0 6 .
3785 6 7.0 6 5.0",
3786 );
3787 let expect = DataChunk::from_pretty(
3788 "i f i F
3789 4 1.0 4 2.0",
3790 );
3791 state.build_row_ids = vec![
3792 RowId::new(0, 8),
3793 RowId::new(0, 9),
3794 RowId::new(0, 10),
3795 RowId::new(0, 12),
3796 RowId::new(0, 13),
3797 ];
3798 assert!(compare_data_chunk_with_rowsort(
3799 &HashJoinExecutor::<Key32>::process_right_outer_join_non_equi_condition(
3800 chunk,
3801 cond.as_ref(),
3802 &mut state
3803 )
3804 .await
3805 .unwrap()
3806 .compact(),
3807 &expect
3808 ));
3809 assert_eq!(state.build_row_ids, Vec::new());
3810 assert_eq!(
3811 state.build_row_matched,
3812 ChunkedData::try_from(vec![{
3813 let mut v = vec![false; 14];
3814 v[0] = true;
3815 v[9] = true;
3816 v
3817 }])
3818 .unwrap()
3819 );
3820 }
3821
3822 #[tokio::test]
3823 async fn test_process_right_semi_anti_join_non_equi_condition() {
3824 let chunk = DataChunk::from_pretty(
3825 "i f i F
3826 1 3.5 1 5.5
3827 1 3.5 1 2.5
3828 3 5.0 3 4.0
3829 3 5.0 3 3.0
3830 3 5.0 3 4.0
3831 3 5.0 3 3.0
3832 4 1.0 4 0
3833 4 1.0 4 0.5",
3834 );
3835 let cond = TestFixture::create_cond();
3836 let build_row_matched = ChunkedData::with_chunk_sizes([14].into_iter()).unwrap();
3837 let mut state = RightNonEquiJoinState {
3838 build_row_ids: vec![
3839 RowId::new(0, 0),
3840 RowId::new(0, 1),
3841 RowId::new(0, 3),
3842 RowId::new(0, 4),
3843 RowId::new(0, 3),
3844 RowId::new(0, 4),
3845 RowId::new(0, 5),
3846 RowId::new(0, 7),
3847 ],
3848 build_row_matched,
3849 };
3850
3851 assert!(
3852 HashJoinExecutor::<Key32>::process_right_semi_anti_join_non_equi_condition(
3853 chunk,
3854 cond.as_ref(),
3855 &mut state
3856 )
3857 .await
3858 .is_ok()
3859 );
3860 assert_eq!(state.build_row_ids, Vec::new());
3861 assert_eq!(
3862 state.build_row_matched,
3863 ChunkedData::try_from(vec![{
3864 let mut v = vec![false; 14];
3865 v[0] = true;
3866 v
3867 }])
3868 .unwrap()
3869 );
3870
3871 let chunk = DataChunk::from_pretty(
3872 "i f i F
3873 4 1.0 4 0.6
3874 4 1.0 4 2.0
3875 5 4.0 5 .
3876 6 7.0 6 .
3877 6 7.0 6 5.0",
3878 );
3879 state.build_row_ids = vec![
3880 RowId::new(0, 8),
3881 RowId::new(0, 9),
3882 RowId::new(0, 10),
3883 RowId::new(0, 12),
3884 RowId::new(0, 13),
3885 ];
3886 assert!(
3887 HashJoinExecutor::<Key32>::process_right_semi_anti_join_non_equi_condition(
3888 chunk,
3889 cond.as_ref(),
3890 &mut state
3891 )
3892 .await
3893 .is_ok()
3894 );
3895 assert_eq!(state.build_row_ids, Vec::new());
3896 assert_eq!(
3897 state.build_row_matched,
3898 ChunkedData::try_from(vec![{
3899 let mut v = vec![false; 14];
3900 v[0] = true;
3901 v[9] = true;
3902 v
3903 }])
3904 .unwrap()
3905 );
3906 }
3907
3908 #[tokio::test]
3909 async fn test_process_full_outer_join_non_equi_condition() {
3910 let chunk = DataChunk::from_pretty(
3911 "i f i F
3912 1 3.5 1 5.5
3913 1 3.5 1 2.5
3914 3 5.0 3 4.0
3915 3 5.0 3 3.0
3916 3 5.0 3 4.0
3917 3 5.0 3 3.0
3918 4 1.0 4 0
3919 4 1.0 4 0.5",
3920 );
3921 let expect = DataChunk::from_pretty(
3922 "i f i F
3923 1 3.5 1 5.5
3924 3 5.0 . .
3925 3 5.0 . .",
3926 );
3927 let cond = TestFixture::create_cond();
3928 let mut left_state = LeftNonEquiJoinState {
3929 probe_column_count: 2,
3930 first_output_row_id: vec![0, 2, 4, 6],
3931 has_more_output_rows: true,
3932 found_matched: false,
3933 };
3934 let mut right_state = RightNonEquiJoinState {
3935 build_row_ids: vec![
3936 RowId::new(0, 0),
3937 RowId::new(0, 1),
3938 RowId::new(0, 3),
3939 RowId::new(0, 4),
3940 RowId::new(0, 3),
3941 RowId::new(0, 4),
3942 RowId::new(0, 5),
3943 RowId::new(0, 7),
3944 ],
3945 build_row_matched: ChunkedData::with_chunk_sizes([14].into_iter()).unwrap(),
3946 };
3947 assert!(compare_data_chunk_with_rowsort(
3948 &HashJoinExecutor::<Key32>::process_full_outer_join_non_equi_condition(
3949 chunk,
3950 cond.as_ref(),
3951 &mut left_state,
3952 &mut right_state,
3953 )
3954 .await
3955 .unwrap()
3956 .compact(),
3957 &expect
3958 ));
3959 assert_eq!(left_state.first_output_row_id, Vec::<usize>::new());
3960 assert!(!left_state.found_matched);
3961 assert_eq!(right_state.build_row_ids, Vec::new());
3962 assert_eq!(
3963 right_state.build_row_matched,
3964 ChunkedData::try_from(vec![{
3965 let mut v = vec![false; 14];
3966 v[0] = true;
3967 v
3968 }])
3969 .unwrap()
3970 );
3971
3972 let chunk = DataChunk::from_pretty(
3973 "i f i F
3974 4 1.0 4 0.6
3975 4 1.0 4 2.0
3976 5 4.0 5 .
3977 6 7.0 6 .
3978 6 7.0 6 8.0",
3979 );
3980 let expect = DataChunk::from_pretty(
3981 "i f i F
3982 4 1.0 4 2.0
3983 5 4.0 . .
3984 6 7.0 6 8.0",
3985 );
3986 left_state.first_output_row_id = vec![2, 3];
3987 left_state.has_more_output_rows = false;
3988 right_state.build_row_ids = vec![
3989 RowId::new(0, 8),
3990 RowId::new(0, 9),
3991 RowId::new(0, 10),
3992 RowId::new(0, 12),
3993 RowId::new(0, 13),
3994 ];
3995 assert!(compare_data_chunk_with_rowsort(
3996 &HashJoinExecutor::<Key32>::process_full_outer_join_non_equi_condition(
3997 chunk,
3998 cond.as_ref(),
3999 &mut left_state,
4000 &mut right_state,
4001 )
4002 .await
4003 .unwrap()
4004 .compact(),
4005 &expect
4006 ));
4007 assert_eq!(left_state.first_output_row_id, Vec::<usize>::new());
4008 assert!(left_state.found_matched);
4009 assert_eq!(right_state.build_row_ids, Vec::new());
4010 assert_eq!(
4011 right_state.build_row_matched,
4012 ChunkedData::try_from(vec![{
4013 let mut v = vec![false; 14];
4014 v[0] = true;
4015 v[9] = true;
4016 v[13] = true;
4017 v
4018 }])
4019 .unwrap()
4020 );
4021 }
4022
4023 #[tokio::test]
4024 async fn test_shutdown() {
4025 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
4026 test_fixture.do_test_shutdown(false).await;
4027 test_fixture.do_test_shutdown(true).await;
4028
4029 let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
4030 test_fixture.do_test_shutdown(false).await;
4031 test_fixture.do_test_shutdown(true).await;
4032
4033 let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
4034 test_fixture.do_test_shutdown(false).await;
4035 test_fixture.do_test_shutdown(true).await;
4036
4037 let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
4038 test_fixture.do_test_shutdown(false).await;
4039 test_fixture.do_test_shutdown(true).await;
4040
4041 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
4042 test_fixture.do_test_shutdown(false).await;
4043 test_fixture.do_test_shutdown(true).await;
4044
4045 let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
4046 test_fixture.do_test_shutdown(false).await;
4047 test_fixture.do_test_shutdown(true).await;
4048
4049 let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
4050 test_fixture.do_test_shutdown(false).await;
4051 test_fixture.do_test_shutdown(true).await;
4052
4053 let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
4054 test_fixture.do_test_shutdown(false).await;
4055 test_fixture.do_test_shutdown(true).await;
4056 }
4057}