1use std::cmp::Ordering;
16use std::iter;
17use std::iter::empty;
18use std::marker::PhantomData;
19use std::sync::Arc;
20
21use bytes::Bytes;
22use futures_async_stream::try_stream;
23use itertools::Itertools;
24use risingwave_common::array::{Array, DataChunk, RowRef};
25use risingwave_common::bitmap::{Bitmap, BitmapBuilder, FilterByBitmap};
26use risingwave_common::catalog::Schema;
27use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher};
28use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc};
29use risingwave_common::row::{Row, RowExt, repeat_n};
30use risingwave_common::types::{DataType, Datum, DefaultOrd};
31use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
32use risingwave_common::util::iter_util::ZipEqFast;
33use risingwave_common_estimate_size::EstimateSize;
34use risingwave_expr::expr::{BoxedExpression, Expression, build_from_prost};
35use risingwave_pb::Message;
36use risingwave_pb::batch_plan::plan_node::NodeBody;
37use risingwave_pb::data::DataChunk as PbDataChunk;
38
39use super::{AsOfDesc, AsOfInequalityType, ChunkedData, JoinType, RowId};
40use crate::error::{BatchError, Result};
41use crate::executor::{
42 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
43 WrapStreamExecutor,
44};
45use crate::monitor::BatchSpillMetrics;
46use crate::risingwave_common::hash::NullBitmap;
47use crate::spill::spill_op::SpillBackend::Disk;
48use crate::spill::spill_op::{
49 DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY, SpillBackend, SpillBuildHasher, SpillOp,
50};
51use crate::task::ShutdownToken;
52
53pub struct HashJoinExecutor<K> {
63 join_type: JoinType,
65 #[expect(dead_code)]
67 original_schema: Schema,
68 schema: Schema,
70 output_indices: Vec<usize>,
72 probe_side_source: BoxedExecutor,
74 build_side_source: BoxedExecutor,
76 probe_key_idxs: Vec<usize>,
78 build_key_idxs: Vec<usize>,
80 cond: Option<Arc<BoxedExpression>>,
82 null_matched: Vec<bool>,
85 identity: String,
86 chunk_size: usize,
87 asof_desc: Option<AsOfDesc>,
89
90 spill_backend: Option<SpillBackend>,
91 spill_metrics: Arc<BatchSpillMetrics>,
92 memory_upper_bound: Option<u64>,
94
95 shutdown_rx: ShutdownToken,
96
97 mem_ctx: MemoryContext,
98 _phantom: PhantomData<K>,
99}
100
101impl<K: HashKey> Executor for HashJoinExecutor<K> {
102 fn schema(&self) -> &Schema {
103 &self.schema
104 }
105
106 fn identity(&self) -> &str {
107 &self.identity
108 }
109
110 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
111 self.do_execute()
112 }
113}
114
115pub type JoinHashMap<K> =
148 hashbrown::HashMap<K, RowId, PrecomputedBuildHasher, MonitoredGlobalAlloc>;
149
150struct RowIdIter<'a> {
151 current_row_id: Option<RowId>,
152 next_row_id: &'a ChunkedData<Option<RowId>>,
153}
154
155impl ChunkedData<Option<RowId>> {
156 fn row_id_iter(&self, begin: Option<RowId>) -> RowIdIter<'_> {
157 RowIdIter {
158 current_row_id: begin,
159 next_row_id: self,
160 }
161 }
162}
163
164impl Iterator for RowIdIter<'_> {
165 type Item = RowId;
166
167 fn next(&mut self) -> Option<Self::Item> {
168 self.current_row_id.inspect(|row_id| {
169 self.current_row_id = self.next_row_id[*row_id];
170 })
171 }
172}
173
174pub struct EquiJoinParams<K> {
175 probe_side: BoxedExecutor,
176 probe_data_types: Vec<DataType>,
177 probe_key_idxs: Vec<usize>,
178 build_side: Vec<DataChunk, MonitoredGlobalAlloc>,
179 build_data_types: Vec<DataType>,
180 full_data_types: Vec<DataType>,
181 hash_map: JoinHashMap<K>,
182 next_build_row_with_same_key: ChunkedData<Option<RowId>>,
183 chunk_size: usize,
184 shutdown_rx: ShutdownToken,
185 asof_desc: Option<AsOfDesc>,
186}
187
188impl<K> EquiJoinParams<K> {
189 #[allow(clippy::too_many_arguments)]
190 pub(super) fn new(
191 probe_side: BoxedExecutor,
192 probe_data_types: Vec<DataType>,
193 probe_key_idxs: Vec<usize>,
194 build_side: Vec<DataChunk, MonitoredGlobalAlloc>,
195 build_data_types: Vec<DataType>,
196 full_data_types: Vec<DataType>,
197 hash_map: JoinHashMap<K>,
198 next_build_row_with_same_key: ChunkedData<Option<RowId>>,
199 chunk_size: usize,
200 shutdown_rx: ShutdownToken,
201 asof_desc: Option<AsOfDesc>,
202 ) -> Self {
203 Self {
204 probe_side,
205 probe_data_types,
206 probe_key_idxs,
207 build_side,
208 build_data_types,
209 full_data_types,
210 hash_map,
211 next_build_row_with_same_key,
212 chunk_size,
213 shutdown_rx,
214 asof_desc,
215 }
216 }
217
218 pub(crate) fn is_asof_join(&self) -> bool {
219 self.asof_desc.is_some()
220 }
221}
222
223#[derive(Default)]
225struct LeftNonEquiJoinState {
226 probe_column_count: usize,
228 first_output_row_id: Vec<usize>,
231 has_more_output_rows: bool,
233 found_matched: bool,
236}
237
238#[derive(Default)]
240struct RightNonEquiJoinState {
241 build_row_ids: Vec<RowId>,
243 build_row_matched: ChunkedData<bool>,
245}
246
247pub struct JoinSpillManager {
248 op: SpillOp,
249 partition_num: usize,
250 probe_side_writers: Vec<opendal::Writer>,
251 build_side_writers: Vec<opendal::Writer>,
252 probe_side_chunk_builders: Vec<DataChunkBuilder>,
253 build_side_chunk_builders: Vec<DataChunkBuilder>,
254 spill_build_hasher: SpillBuildHasher,
255 probe_side_data_types: Vec<DataType>,
256 build_side_data_types: Vec<DataType>,
257 spill_chunk_size: usize,
258 spill_metrics: Arc<BatchSpillMetrics>,
259}
260
261impl JoinSpillManager {
276 pub fn new(
277 spill_backend: SpillBackend,
278 join_identity: &String,
279 partition_num: usize,
280 probe_side_data_types: Vec<DataType>,
281 build_side_data_types: Vec<DataType>,
282 spill_chunk_size: usize,
283 spill_metrics: Arc<BatchSpillMetrics>,
284 ) -> Result<Self> {
285 let suffix_uuid = uuid::Uuid::new_v4();
286 let dir = format!("/{}-{}/", join_identity, suffix_uuid);
287 let op = SpillOp::create(dir, spill_backend)?;
288 let probe_side_writers = Vec::with_capacity(partition_num);
289 let build_side_writers = Vec::with_capacity(partition_num);
290 let probe_side_chunk_builders = Vec::with_capacity(partition_num);
291 let build_side_chunk_builders = Vec::with_capacity(partition_num);
292 let spill_build_hasher = SpillBuildHasher(suffix_uuid.as_u64_pair().1);
293 Ok(Self {
294 op,
295 partition_num,
296 probe_side_writers,
297 build_side_writers,
298 probe_side_chunk_builders,
299 build_side_chunk_builders,
300 spill_build_hasher,
301 probe_side_data_types,
302 build_side_data_types,
303 spill_chunk_size,
304 spill_metrics,
305 })
306 }
307
308 pub async fn init_writers(&mut self) -> Result<()> {
309 for i in 0..self.partition_num {
310 let join_probe_side_partition_file_name = format!("join-probe-side-p{}", i);
311 let w = self
312 .op
313 .writer_with(&join_probe_side_partition_file_name)
314 .await?;
315 self.probe_side_writers.push(w);
316
317 let join_build_side_partition_file_name = format!("join-build-side-p{}", i);
318 let w = self
319 .op
320 .writer_with(&join_build_side_partition_file_name)
321 .await?;
322 self.build_side_writers.push(w);
323 self.probe_side_chunk_builders.push(DataChunkBuilder::new(
324 self.probe_side_data_types.clone(),
325 self.spill_chunk_size,
326 ));
327 self.build_side_chunk_builders.push(DataChunkBuilder::new(
328 self.build_side_data_types.clone(),
329 self.spill_chunk_size,
330 ));
331 }
332 Ok(())
333 }
334
335 pub async fn write_probe_side_chunk(
336 &mut self,
337 chunk: DataChunk,
338 hash_codes: Vec<u64>,
339 ) -> Result<()> {
340 let (columns, vis) = chunk.into_parts_v2();
341 for partition in 0..self.partition_num {
342 let new_vis = vis.clone()
343 & Bitmap::from_iter(
344 hash_codes
345 .iter()
346 .map(|hash_code| (*hash_code as usize % self.partition_num) == partition),
347 );
348 let new_chunk = DataChunk::from_parts(columns.clone(), new_vis);
349 for output_chunk in self.probe_side_chunk_builders[partition].append_chunk(new_chunk) {
350 let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
351 let buf = Message::encode_to_vec(&chunk_pb);
352 let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
353 self.spill_metrics
354 .batch_spill_write_bytes
355 .inc_by((buf.len() + len_bytes.len()) as u64);
356 self.probe_side_writers[partition].write(len_bytes).await?;
357 self.probe_side_writers[partition].write(buf).await?;
358 }
359 }
360 Ok(())
361 }
362
363 pub async fn write_build_side_chunk(
364 &mut self,
365 chunk: DataChunk,
366 hash_codes: Vec<u64>,
367 ) -> Result<()> {
368 let (columns, vis) = chunk.into_parts_v2();
369 for partition in 0..self.partition_num {
370 let new_vis = vis.clone()
371 & Bitmap::from_iter(
372 hash_codes
373 .iter()
374 .map(|hash_code| (*hash_code as usize % self.partition_num) == partition),
375 );
376 let new_chunk = DataChunk::from_parts(columns.clone(), new_vis);
377 for output_chunk in self.build_side_chunk_builders[partition].append_chunk(new_chunk) {
378 let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
379 let buf = Message::encode_to_vec(&chunk_pb);
380 let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
381 self.spill_metrics
382 .batch_spill_write_bytes
383 .inc_by((buf.len() + len_bytes.len()) as u64);
384 self.build_side_writers[partition].write(len_bytes).await?;
385 self.build_side_writers[partition].write(buf).await?;
386 }
387 }
388 Ok(())
389 }
390
391 pub async fn close_writers(&mut self) -> Result<()> {
392 for partition in 0..self.partition_num {
393 if let Some(output_chunk) = self.probe_side_chunk_builders[partition].consume_all() {
394 let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
395 let buf = Message::encode_to_vec(&chunk_pb);
396 let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
397 self.spill_metrics
398 .batch_spill_write_bytes
399 .inc_by((buf.len() + len_bytes.len()) as u64);
400 self.probe_side_writers[partition].write(len_bytes).await?;
401 self.probe_side_writers[partition].write(buf).await?;
402 }
403
404 if let Some(output_chunk) = self.build_side_chunk_builders[partition].consume_all() {
405 let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
406 let buf = Message::encode_to_vec(&chunk_pb);
407 let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
408 self.spill_metrics
409 .batch_spill_write_bytes
410 .inc_by((buf.len() + len_bytes.len()) as u64);
411 self.build_side_writers[partition].write(len_bytes).await?;
412 self.build_side_writers[partition].write(buf).await?;
413 }
414 }
415
416 for mut w in self.probe_side_writers.drain(..) {
417 w.close().await?;
418 }
419 for mut w in self.build_side_writers.drain(..) {
420 w.close().await?;
421 }
422 Ok(())
423 }
424
425 async fn read_probe_side_partition(
426 &mut self,
427 partition: usize,
428 ) -> Result<BoxedDataChunkStream> {
429 let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
430 let r = self
431 .op
432 .reader_with(&join_probe_side_partition_file_name)
433 .await?;
434 Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
435 }
436
437 async fn read_build_side_partition(
438 &mut self,
439 partition: usize,
440 ) -> Result<BoxedDataChunkStream> {
441 let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
442 let r = self
443 .op
444 .reader_with(&join_build_side_partition_file_name)
445 .await?;
446 Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
447 }
448
449 pub async fn estimate_partition_size(&self, partition: usize) -> Result<u64> {
450 let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
451 let probe_size = self
452 .op
453 .stat(&join_probe_side_partition_file_name)
454 .await?
455 .content_length();
456 let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
457 let build_size = self
458 .op
459 .stat(&join_build_side_partition_file_name)
460 .await?
461 .content_length();
462 Ok(probe_size + build_size)
463 }
464
465 async fn clear_partition(&mut self, partition: usize) -> Result<()> {
466 let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
467 self.op.delete(&join_probe_side_partition_file_name).await?;
468 let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
469 self.op.delete(&join_build_side_partition_file_name).await?;
470 Ok(())
471 }
472}
473
474impl<K: HashKey> HashJoinExecutor<K> {
475 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
476 async fn do_execute(self: Box<Self>) {
477 let mut need_to_spill = false;
478 let check_memory = match self.memory_upper_bound {
480 Some(upper_bound) => upper_bound > SPILL_AT_LEAST_MEMORY,
481 None => true,
482 };
483
484 let probe_schema = self.probe_side_source.schema().clone();
485 let build_schema = self.build_side_source.schema().clone();
486 let probe_data_types = self.probe_side_source.schema().data_types();
487 let build_data_types = self.build_side_source.schema().data_types();
488 let full_data_types = [probe_data_types.clone(), build_data_types.clone()].concat();
489
490 let mut build_side = Vec::new_in(self.mem_ctx.global_allocator());
491 let mut build_row_count = 0;
492 let mut build_side_stream = self.build_side_source.execute();
493 #[for_await]
494 for build_chunk in &mut build_side_stream {
495 let build_chunk = build_chunk?;
496 if build_chunk.cardinality() > 0 {
497 build_row_count += build_chunk.cardinality();
498 let chunk_estimated_heap_size = build_chunk.estimated_heap_size();
499 build_side.push(build_chunk);
501 if !self.mem_ctx.add(chunk_estimated_heap_size as i64) && check_memory {
502 if self.spill_backend.is_some() {
503 need_to_spill = true;
504 break;
505 } else {
506 Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
507 }
508 }
509 }
510 }
511 let mut hash_map = JoinHashMap::with_capacity_and_hasher_in(
512 build_row_count,
513 PrecomputedBuildHasher,
514 self.mem_ctx.global_allocator(),
515 );
516 let mut next_build_row_with_same_key =
517 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
518
519 let null_matched = K::Bitmap::from_bool_vec(self.null_matched.clone());
520
521 let mut mem_added_by_hash_table = 0;
522 if !need_to_spill {
523 for (build_chunk_id, build_chunk) in build_side.iter().enumerate() {
525 let build_keys = K::build_many(&self.build_key_idxs, build_chunk);
526
527 for (build_row_id, build_key) in build_keys
528 .into_iter()
529 .enumerate()
530 .filter_by_bitmap(build_chunk.visibility())
531 {
532 self.shutdown_rx.check()?;
533 if build_key.null_bitmap().is_subset(&null_matched) {
535 let row_id = RowId::new(build_chunk_id, build_row_id);
536 let build_key_size = build_key.estimated_heap_size() as i64;
537 mem_added_by_hash_table += build_key_size;
538 if !self.mem_ctx.add(build_key_size) && check_memory {
539 if self.spill_backend.is_some() {
540 need_to_spill = true;
541 break;
542 } else {
543 Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
544 }
545 }
546 next_build_row_with_same_key[row_id] = hash_map.insert(build_key, row_id);
547 }
548 }
549 }
550 }
551
552 if need_to_spill {
553 info!(
560 "batch hash join executor {} starts to spill out",
561 &self.identity
562 );
563 let mut join_spill_manager = JoinSpillManager::new(
564 self.spill_backend.clone().unwrap(),
565 &self.identity,
566 DEFAULT_SPILL_PARTITION_NUM,
567 probe_data_types.clone(),
568 build_data_types.clone(),
569 self.chunk_size,
570 self.spill_metrics.clone(),
571 )?;
572 join_spill_manager.init_writers().await?;
573
574 self.mem_ctx.add(-mem_added_by_hash_table);
576 drop(hash_map);
577 drop(next_build_row_with_same_key);
578
579 for chunk in build_side {
581 self.mem_ctx.add(-(chunk.estimated_heap_size() as i64));
583 let hash_codes = chunk.get_hash_values(
584 self.build_key_idxs.as_slice(),
585 join_spill_manager.spill_build_hasher,
586 );
587 join_spill_manager
588 .write_build_side_chunk(
589 chunk,
590 hash_codes
591 .into_iter()
592 .map(|hash_code| hash_code.value())
593 .collect(),
594 )
595 .await?;
596 }
597
598 #[for_await]
600 for chunk in build_side_stream {
601 let chunk = chunk?;
602 let hash_codes = chunk.get_hash_values(
603 self.build_key_idxs.as_slice(),
604 join_spill_manager.spill_build_hasher,
605 );
606 join_spill_manager
607 .write_build_side_chunk(
608 chunk,
609 hash_codes
610 .into_iter()
611 .map(|hash_code| hash_code.value())
612 .collect(),
613 )
614 .await?;
615 }
616
617 #[for_await]
619 for chunk in self.probe_side_source.execute() {
620 let chunk = chunk?;
621 let hash_codes = chunk.get_hash_values(
622 self.probe_key_idxs.as_slice(),
623 join_spill_manager.spill_build_hasher,
624 );
625 join_spill_manager
626 .write_probe_side_chunk(
627 chunk,
628 hash_codes
629 .into_iter()
630 .map(|hash_code| hash_code.value())
631 .collect(),
632 )
633 .await?;
634 }
635
636 join_spill_manager.close_writers().await?;
637
638 for i in 0..join_spill_manager.partition_num {
640 let partition_size = join_spill_manager.estimate_partition_size(i).await?;
641 let probe_side_stream = join_spill_manager.read_probe_side_partition(i).await?;
642 let build_side_stream = join_spill_manager.read_build_side_partition(i).await?;
643
644 let sub_hash_join_executor: HashJoinExecutor<K> = HashJoinExecutor::new_inner(
645 self.join_type,
646 self.output_indices.clone(),
647 Box::new(WrapStreamExecutor::new(
648 probe_schema.clone(),
649 probe_side_stream,
650 )),
651 Box::new(WrapStreamExecutor::new(
652 build_schema.clone(),
653 build_side_stream,
654 )),
655 self.probe_key_idxs.clone(),
656 self.build_key_idxs.clone(),
657 self.null_matched.clone(),
658 self.cond.clone(),
659 format!("{}-sub{}", self.identity.clone(), i),
660 self.chunk_size,
661 self.asof_desc.clone(),
662 self.spill_backend.clone(),
663 self.spill_metrics.clone(),
664 Some(partition_size),
665 self.shutdown_rx.clone(),
666 self.mem_ctx.clone(),
667 );
668
669 debug!(
670 "create sub_hash_join {} for hash_join {} to spill",
671 sub_hash_join_executor.identity, self.identity
672 );
673
674 let sub_hash_join_executor = Box::new(sub_hash_join_executor).execute();
675
676 #[for_await]
677 for chunk in sub_hash_join_executor {
678 let chunk = chunk?;
679 yield chunk;
680 }
681
682 join_spill_manager.clear_partition(i).await?;
684 }
685 } else {
686 let params = EquiJoinParams::new(
687 self.probe_side_source,
688 probe_data_types,
689 self.probe_key_idxs,
690 build_side,
691 build_data_types,
692 full_data_types,
693 hash_map,
694 next_build_row_with_same_key,
695 self.chunk_size,
696 self.shutdown_rx.clone(),
697 self.asof_desc,
698 );
699
700 if let Some(cond) = self.cond.as_ref()
701 && !params.is_asof_join()
702 {
703 let stream = match self.join_type {
704 JoinType::Inner => Self::do_inner_join_with_non_equi_condition(params, cond),
705 JoinType::LeftOuter => {
706 Self::do_left_outer_join_with_non_equi_condition(params, cond)
707 }
708 JoinType::LeftSemi => {
709 Self::do_left_semi_join_with_non_equi_condition(params, cond)
710 }
711 JoinType::LeftAnti => {
712 Self::do_left_anti_join_with_non_equi_condition(params, cond)
713 }
714 JoinType::RightOuter => {
715 Self::do_right_outer_join_with_non_equi_condition(params, cond)
716 }
717 JoinType::RightSemi => {
718 Self::do_right_semi_anti_join_with_non_equi_condition::<false>(params, cond)
719 }
720 JoinType::RightAnti => {
721 Self::do_right_semi_anti_join_with_non_equi_condition::<true>(params, cond)
722 }
723 JoinType::FullOuter => {
724 Self::do_full_outer_join_with_non_equi_condition(params, cond)
725 }
726 JoinType::AsOfInner | JoinType::AsOfLeftOuter => {
727 unreachable!("AsOf join should not reach here")
728 }
729 };
730 let mut output_chunk_builder =
732 DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
733 #[for_await]
734 for chunk in stream {
735 for output_chunk in
736 output_chunk_builder.append_chunk(chunk?.project(&self.output_indices))
737 {
738 yield output_chunk
739 }
740 }
741 if let Some(output_chunk) = output_chunk_builder.consume_all() {
742 yield output_chunk
743 }
744 } else {
745 let stream = match self.join_type {
746 JoinType::Inner | JoinType::AsOfInner => Self::do_inner_join(params),
747 JoinType::LeftOuter | JoinType::AsOfLeftOuter => {
748 Self::do_left_outer_join(params)
749 }
750 JoinType::LeftSemi => Self::do_left_semi_anti_join::<false>(params),
751 JoinType::LeftAnti => Self::do_left_semi_anti_join::<true>(params),
752 JoinType::RightOuter => Self::do_right_outer_join(params),
753 JoinType::RightSemi => Self::do_right_semi_anti_join::<false>(params),
754 JoinType::RightAnti => Self::do_right_semi_anti_join::<true>(params),
755 JoinType::FullOuter => Self::do_full_outer_join(params),
756 };
757 #[for_await]
758 for chunk in stream {
759 yield chunk?.project(&self.output_indices)
760 }
761 }
762 }
763 }
764
765 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
766 pub async fn do_inner_join(
767 EquiJoinParams {
768 probe_side,
769 probe_key_idxs,
770 build_side,
771 full_data_types,
772 hash_map,
773 next_build_row_with_same_key,
774 chunk_size,
775 shutdown_rx,
776 asof_desc,
777 ..
778 }: EquiJoinParams<K>,
779 ) {
780 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
781 #[for_await]
782 for probe_chunk in probe_side.execute() {
783 let probe_chunk = probe_chunk?;
784 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
785 for (probe_row_id, probe_key) in probe_keys
786 .iter()
787 .enumerate()
788 .filter_by_bitmap(probe_chunk.visibility())
789 {
790 let build_side_row_iter =
791 next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied());
792 if let Some(asof_desc) = &asof_desc {
793 if let Some(build_row_id) = Self::find_asof_matched_rows(
794 probe_chunk.row_at_unchecked_vis(probe_row_id),
795 &build_side,
796 build_side_row_iter,
797 asof_desc,
798 ) {
799 shutdown_rx.check()?;
800 if let Some(spilled) = Self::append_one_row(
801 &mut chunk_builder,
802 &probe_chunk,
803 probe_row_id,
804 &build_side[build_row_id.chunk_id()],
805 build_row_id.row_id(),
806 ) {
807 yield spilled
808 }
809 }
810 } else {
811 for build_row_id in build_side_row_iter {
812 shutdown_rx.check()?;
813 let build_chunk = &build_side[build_row_id.chunk_id()];
814 if let Some(spilled) = Self::append_one_row(
815 &mut chunk_builder,
816 &probe_chunk,
817 probe_row_id,
818 build_chunk,
819 build_row_id.row_id(),
820 ) {
821 yield spilled
822 }
823 }
824 }
825 }
826 }
827 if let Some(spilled) = chunk_builder.consume_all() {
828 yield spilled
829 }
830 }
831
832 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
833 pub async fn do_inner_join_with_non_equi_condition(
834 params: EquiJoinParams<K>,
835 cond: &BoxedExpression,
836 ) {
837 #[for_await]
838 for chunk in Self::do_inner_join(params) {
839 let mut chunk = chunk?;
840 chunk.set_visibility(cond.eval(&chunk).await?.as_bool().iter().collect());
841 yield chunk
842 }
843 }
844
845 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
846 pub async fn do_left_outer_join(
847 EquiJoinParams {
848 probe_side,
849 probe_key_idxs,
850 build_side,
851 build_data_types,
852 full_data_types,
853 hash_map,
854 next_build_row_with_same_key,
855 chunk_size,
856 shutdown_rx,
857 asof_desc,
858 ..
859 }: EquiJoinParams<K>,
860 ) {
861 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
862 #[for_await]
863 for probe_chunk in probe_side.execute() {
864 let probe_chunk = probe_chunk?;
865 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
866 for (probe_row_id, probe_key) in probe_keys
867 .iter()
868 .enumerate()
869 .filter_by_bitmap(probe_chunk.visibility())
870 {
871 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
872 let build_side_row_iter =
873 next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id));
874 if let Some(asof_desc) = &asof_desc {
875 if let Some(build_row_id) = Self::find_asof_matched_rows(
876 probe_chunk.row_at_unchecked_vis(probe_row_id),
877 &build_side,
878 build_side_row_iter,
879 asof_desc,
880 ) {
881 shutdown_rx.check()?;
882 if let Some(spilled) = Self::append_one_row(
883 &mut chunk_builder,
884 &probe_chunk,
885 probe_row_id,
886 &build_side[build_row_id.chunk_id()],
887 build_row_id.row_id(),
888 ) {
889 yield spilled
890 }
891 } else {
892 shutdown_rx.check()?;
893 let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
894 if let Some(spilled) = Self::append_one_row_with_null_build_side(
895 &mut chunk_builder,
896 probe_row,
897 build_data_types.len(),
898 ) {
899 yield spilled
900 }
901 }
902 } else {
903 for build_row_id in build_side_row_iter {
904 shutdown_rx.check()?;
905 let build_chunk = &build_side[build_row_id.chunk_id()];
906 if let Some(spilled) = Self::append_one_row(
907 &mut chunk_builder,
908 &probe_chunk,
909 probe_row_id,
910 build_chunk,
911 build_row_id.row_id(),
912 ) {
913 yield spilled
914 }
915 }
916 }
917 } else {
918 shutdown_rx.check()?;
919 let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
920 if let Some(spilled) = Self::append_one_row_with_null_build_side(
921 &mut chunk_builder,
922 probe_row,
923 build_data_types.len(),
924 ) {
925 yield spilled
926 }
927 }
928 }
929 }
930 if let Some(spilled) = chunk_builder.consume_all() {
931 yield spilled
932 }
933 }
934
935 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
936 pub async fn do_left_outer_join_with_non_equi_condition(
937 EquiJoinParams {
938 probe_side,
939 probe_data_types,
940 probe_key_idxs,
941 build_side,
942 build_data_types,
943 full_data_types,
944 hash_map,
945 next_build_row_with_same_key,
946 chunk_size,
947 shutdown_rx,
948 ..
949 }: EquiJoinParams<K>,
950 cond: &BoxedExpression,
951 ) {
952 let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size);
953 let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
954 let mut non_equi_state = LeftNonEquiJoinState {
955 probe_column_count: probe_data_types.len(),
956 ..Default::default()
957 };
958
959 #[for_await]
960 for probe_chunk in probe_side.execute() {
961 let probe_chunk = probe_chunk?;
962 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
963 for (probe_row_id, probe_key) in probe_keys
964 .iter()
965 .enumerate()
966 .filter_by_bitmap(probe_chunk.visibility())
967 {
968 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
969 non_equi_state
970 .first_output_row_id
971 .push(chunk_builder.buffered_count());
972
973 let mut build_row_id_iter = next_build_row_with_same_key
974 .row_id_iter(Some(*first_matched_build_row_id))
975 .peekable();
976 while let Some(build_row_id) = build_row_id_iter.next() {
977 shutdown_rx.check()?;
978 let build_chunk = &build_side[build_row_id.chunk_id()];
979 if let Some(spilled) = Self::append_one_row(
980 &mut chunk_builder,
981 &probe_chunk,
982 probe_row_id,
983 build_chunk,
984 build_row_id.row_id(),
985 ) {
986 non_equi_state.has_more_output_rows =
987 build_row_id_iter.peek().is_some();
988 yield Self::process_left_outer_join_non_equi_condition(
989 spilled,
990 cond.as_ref(),
991 &mut non_equi_state,
992 )
993 .await?
994 }
995 }
996 } else {
997 shutdown_rx.check()?;
998 let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
999 if let Some(spilled) = Self::append_one_row_with_null_build_side(
1000 &mut remaining_chunk_builder,
1001 probe_row,
1002 build_data_types.len(),
1003 ) {
1004 yield spilled
1005 }
1006 }
1007 }
1008 }
1009 non_equi_state.has_more_output_rows = false;
1010 if let Some(spilled) = chunk_builder.consume_all() {
1011 yield Self::process_left_outer_join_non_equi_condition(
1012 spilled,
1013 cond.as_ref(),
1014 &mut non_equi_state,
1015 )
1016 .await?
1017 }
1018
1019 if let Some(spilled) = remaining_chunk_builder.consume_all() {
1020 yield spilled
1021 }
1022 }
1023
1024 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1025 pub async fn do_left_semi_anti_join<const ANTI_JOIN: bool>(
1026 EquiJoinParams {
1027 probe_side,
1028 probe_data_types,
1029 probe_key_idxs,
1030 hash_map,
1031 chunk_size,
1032 shutdown_rx,
1033 ..
1034 }: EquiJoinParams<K>,
1035 ) {
1036 let mut chunk_builder = DataChunkBuilder::new(probe_data_types, chunk_size);
1037 #[for_await]
1038 for probe_chunk in probe_side.execute() {
1039 let probe_chunk = probe_chunk?;
1040 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1041 for (probe_row_id, probe_key) in probe_keys
1042 .iter()
1043 .enumerate()
1044 .filter_by_bitmap(probe_chunk.visibility())
1045 {
1046 shutdown_rx.check()?;
1047 if !ANTI_JOIN {
1048 if hash_map.contains_key(probe_key) {
1049 if let Some(spilled) = Self::append_one_probe_row(
1050 &mut chunk_builder,
1051 &probe_chunk,
1052 probe_row_id,
1053 ) {
1054 yield spilled
1055 }
1056 }
1057 } else if hash_map.get(probe_key).is_none() {
1058 if let Some(spilled) =
1059 Self::append_one_probe_row(&mut chunk_builder, &probe_chunk, probe_row_id)
1060 {
1061 yield spilled
1062 }
1063 }
1064 }
1065 }
1066 if let Some(spilled) = chunk_builder.consume_all() {
1067 yield spilled
1068 }
1069 }
1070
1071 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1081 pub async fn do_left_semi_join_with_non_equi_condition<'a>(
1082 EquiJoinParams {
1083 probe_side,
1084 probe_key_idxs,
1085 build_side,
1086 full_data_types,
1087 hash_map,
1088 next_build_row_with_same_key,
1089 chunk_size,
1090 shutdown_rx,
1091 ..
1092 }: EquiJoinParams<K>,
1093 cond: &'a BoxedExpression,
1094 ) {
1095 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1096 let mut non_equi_state = LeftNonEquiJoinState::default();
1097
1098 #[for_await]
1099 for probe_chunk in probe_side.execute() {
1100 let probe_chunk = probe_chunk?;
1101 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1102 for (probe_row_id, probe_key) in probe_keys
1103 .iter()
1104 .enumerate()
1105 .filter_by_bitmap(probe_chunk.visibility())
1106 {
1107 non_equi_state.found_matched = false;
1108 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1109 non_equi_state
1110 .first_output_row_id
1111 .push(chunk_builder.buffered_count());
1112
1113 for build_row_id in
1114 next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id))
1115 {
1116 shutdown_rx.check()?;
1117 if non_equi_state.found_matched {
1118 break;
1119 }
1120 let build_chunk = &build_side[build_row_id.chunk_id()];
1121 if let Some(spilled) = Self::append_one_row(
1122 &mut chunk_builder,
1123 &probe_chunk,
1124 probe_row_id,
1125 build_chunk,
1126 build_row_id.row_id(),
1127 ) {
1128 yield Self::process_left_semi_anti_join_non_equi_condition::<false>(
1129 spilled,
1130 cond.as_ref(),
1131 &mut non_equi_state,
1132 )
1133 .await?
1134 }
1135 }
1136 }
1137 }
1138 }
1139
1140 if let Some(spilled) = chunk_builder.consume_all() {
1142 yield Self::process_left_semi_anti_join_non_equi_condition::<false>(
1143 spilled,
1144 cond.as_ref(),
1145 &mut non_equi_state,
1146 )
1147 .await?
1148 }
1149 }
1150
1151 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1152 pub async fn do_left_anti_join_with_non_equi_condition(
1153 EquiJoinParams {
1154 probe_side,
1155 probe_data_types,
1156 probe_key_idxs,
1157 build_side,
1158 full_data_types,
1159 hash_map,
1160 next_build_row_with_same_key,
1161 chunk_size,
1162 shutdown_rx,
1163 ..
1164 }: EquiJoinParams<K>,
1165 cond: &BoxedExpression,
1166 ) {
1167 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1168 let mut remaining_chunk_builder = DataChunkBuilder::new(probe_data_types, chunk_size);
1169 let mut non_equi_state = LeftNonEquiJoinState::default();
1170
1171 #[for_await]
1172 for probe_chunk in probe_side.execute() {
1173 let probe_chunk = probe_chunk?;
1174 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1175 for (probe_row_id, probe_key) in probe_keys
1176 .iter()
1177 .enumerate()
1178 .filter_by_bitmap(probe_chunk.visibility())
1179 {
1180 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1181 non_equi_state
1182 .first_output_row_id
1183 .push(chunk_builder.buffered_count());
1184 let mut build_row_id_iter = next_build_row_with_same_key
1185 .row_id_iter(Some(*first_matched_build_row_id))
1186 .peekable();
1187 while let Some(build_row_id) = build_row_id_iter.next() {
1188 shutdown_rx.check()?;
1189 let build_chunk = &build_side[build_row_id.chunk_id()];
1190 if let Some(spilled) = Self::append_one_row(
1191 &mut chunk_builder,
1192 &probe_chunk,
1193 probe_row_id,
1194 build_chunk,
1195 build_row_id.row_id(),
1196 ) {
1197 non_equi_state.has_more_output_rows =
1198 build_row_id_iter.peek().is_some();
1199 yield Self::process_left_semi_anti_join_non_equi_condition::<true>(
1200 spilled,
1201 cond.as_ref(),
1202 &mut non_equi_state,
1203 )
1204 .await?
1205 }
1206 }
1207 } else if let Some(spilled) = Self::append_one_probe_row(
1208 &mut remaining_chunk_builder,
1209 &probe_chunk,
1210 probe_row_id,
1211 ) {
1212 yield spilled
1213 }
1214 }
1215 }
1216 non_equi_state.has_more_output_rows = false;
1217 if let Some(spilled) = chunk_builder.consume_all() {
1218 yield Self::process_left_semi_anti_join_non_equi_condition::<true>(
1219 spilled,
1220 cond.as_ref(),
1221 &mut non_equi_state,
1222 )
1223 .await?
1224 }
1225 if let Some(spilled) = remaining_chunk_builder.consume_all() {
1226 yield spilled
1227 }
1228 }
1229
1230 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1231 pub async fn do_right_outer_join(
1232 EquiJoinParams {
1233 probe_side,
1234 probe_data_types,
1235 probe_key_idxs,
1236 build_side,
1237 full_data_types,
1238 hash_map,
1239 next_build_row_with_same_key,
1240 chunk_size,
1241 shutdown_rx,
1242 ..
1243 }: EquiJoinParams<K>,
1244 ) {
1245 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1246 let mut build_row_matched =
1247 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1248
1249 #[for_await]
1250 for probe_chunk in probe_side.execute() {
1251 let probe_chunk = probe_chunk?;
1252 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1253 for (probe_row_id, probe_key) in probe_keys
1254 .iter()
1255 .enumerate()
1256 .filter_by_bitmap(probe_chunk.visibility())
1257 {
1258 for build_row_id in
1259 next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1260 {
1261 shutdown_rx.check()?;
1262 build_row_matched[build_row_id] = true;
1263 let build_chunk = &build_side[build_row_id.chunk_id()];
1264 if let Some(spilled) = Self::append_one_row(
1265 &mut chunk_builder,
1266 &probe_chunk,
1267 probe_row_id,
1268 build_chunk,
1269 build_row_id.row_id(),
1270 ) {
1271 yield spilled
1272 }
1273 }
1274 }
1275 }
1276 #[for_await]
1277 for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1278 &mut chunk_builder,
1279 &build_side,
1280 &build_row_matched,
1281 probe_data_types.len(),
1282 ) {
1283 yield spilled?
1284 }
1285 }
1286
1287 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1288 pub async fn do_right_outer_join_with_non_equi_condition(
1289 EquiJoinParams {
1290 probe_side,
1291 probe_data_types,
1292 probe_key_idxs,
1293 build_side,
1294 full_data_types,
1295 hash_map,
1296 next_build_row_with_same_key,
1297 chunk_size,
1298 shutdown_rx,
1299 ..
1300 }: EquiJoinParams<K>,
1301 cond: &BoxedExpression,
1302 ) {
1303 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1304 let build_row_matched =
1305 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1306 let mut non_equi_state = RightNonEquiJoinState {
1307 build_row_matched,
1308 ..Default::default()
1309 };
1310
1311 #[for_await]
1312 for probe_chunk in probe_side.execute() {
1313 let probe_chunk = probe_chunk?;
1314 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1315 for (probe_row_id, probe_key) in probe_keys
1316 .iter()
1317 .enumerate()
1318 .filter_by_bitmap(probe_chunk.visibility())
1319 {
1320 for build_row_id in
1321 next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1322 {
1323 shutdown_rx.check()?;
1324 non_equi_state.build_row_ids.push(build_row_id);
1325 let build_chunk = &build_side[build_row_id.chunk_id()];
1326 if let Some(spilled) = Self::append_one_row(
1327 &mut chunk_builder,
1328 &probe_chunk,
1329 probe_row_id,
1330 build_chunk,
1331 build_row_id.row_id(),
1332 ) {
1333 yield Self::process_right_outer_join_non_equi_condition(
1334 spilled,
1335 cond.as_ref(),
1336 &mut non_equi_state,
1337 )
1338 .await?
1339 }
1340 }
1341 }
1342 }
1343 if let Some(spilled) = chunk_builder.consume_all() {
1344 yield Self::process_right_outer_join_non_equi_condition(
1345 spilled,
1346 cond.as_ref(),
1347 &mut non_equi_state,
1348 )
1349 .await?
1350 }
1351 #[for_await]
1352 for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1353 &mut chunk_builder,
1354 &build_side,
1355 &non_equi_state.build_row_matched,
1356 probe_data_types.len(),
1357 ) {
1358 yield spilled?
1359 }
1360 }
1361
1362 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1363 pub async fn do_right_semi_anti_join<const ANTI_JOIN: bool>(
1364 EquiJoinParams {
1365 probe_side,
1366 probe_key_idxs,
1367 build_side,
1368 build_data_types,
1369 hash_map,
1370 next_build_row_with_same_key,
1371 chunk_size,
1372 shutdown_rx,
1373 ..
1374 }: EquiJoinParams<K>,
1375 ) {
1376 let mut chunk_builder = DataChunkBuilder::new(build_data_types, chunk_size);
1377 let mut build_row_matched =
1378 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1379
1380 #[for_await]
1381 for probe_chunk in probe_side.execute() {
1382 let probe_chunk = probe_chunk?;
1383 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1384 for probe_key in probe_keys.iter().filter_by_bitmap(probe_chunk.visibility()) {
1385 for build_row_id in
1386 next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1387 {
1388 shutdown_rx.check()?;
1389 build_row_matched[build_row_id] = true;
1390 }
1391 }
1392 }
1393 #[for_await]
1394 for spilled in Self::handle_remaining_build_rows_for_right_semi_anti_join::<ANTI_JOIN>(
1395 &mut chunk_builder,
1396 &build_side,
1397 &build_row_matched,
1398 ) {
1399 yield spilled?
1400 }
1401 }
1402
1403 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1404 pub async fn do_right_semi_anti_join_with_non_equi_condition<const ANTI_JOIN: bool>(
1405 EquiJoinParams {
1406 probe_side,
1407 probe_key_idxs,
1408 build_side,
1409 build_data_types,
1410 full_data_types,
1411 hash_map,
1412 next_build_row_with_same_key,
1413 chunk_size,
1414 shutdown_rx,
1415 ..
1416 }: EquiJoinParams<K>,
1417 cond: &BoxedExpression,
1418 ) {
1419 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1420 let mut remaining_chunk_builder = DataChunkBuilder::new(build_data_types, chunk_size);
1421 let build_row_matched =
1422 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1423 let mut non_equi_state = RightNonEquiJoinState {
1424 build_row_matched,
1425 ..Default::default()
1426 };
1427
1428 #[for_await]
1429 for probe_chunk in probe_side.execute() {
1430 let probe_chunk = probe_chunk?;
1431 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1432 for (probe_row_id, probe_key) in probe_keys
1433 .iter()
1434 .enumerate()
1435 .filter_by_bitmap(probe_chunk.visibility())
1436 {
1437 for build_row_id in
1438 next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1439 {
1440 shutdown_rx.check()?;
1441 non_equi_state.build_row_ids.push(build_row_id);
1442 let build_chunk = &build_side[build_row_id.chunk_id()];
1443 if let Some(spilled) = Self::append_one_row(
1444 &mut chunk_builder,
1445 &probe_chunk,
1446 probe_row_id,
1447 build_chunk,
1448 build_row_id.row_id(),
1449 ) {
1450 Self::process_right_semi_anti_join_non_equi_condition(
1451 spilled,
1452 cond.as_ref(),
1453 &mut non_equi_state,
1454 )
1455 .await?
1456 }
1457 }
1458 }
1459 }
1460 if let Some(spilled) = chunk_builder.consume_all() {
1461 Self::process_right_semi_anti_join_non_equi_condition(
1462 spilled,
1463 cond.as_ref(),
1464 &mut non_equi_state,
1465 )
1466 .await?
1467 }
1468 #[for_await]
1469 for spilled in Self::handle_remaining_build_rows_for_right_semi_anti_join::<ANTI_JOIN>(
1470 &mut remaining_chunk_builder,
1471 &build_side,
1472 &non_equi_state.build_row_matched,
1473 ) {
1474 yield spilled?
1475 }
1476 }
1477
1478 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1479 pub async fn do_full_outer_join(
1480 EquiJoinParams {
1481 probe_side,
1482 probe_data_types,
1483 probe_key_idxs,
1484 build_side,
1485 build_data_types,
1486 full_data_types,
1487 hash_map,
1488 next_build_row_with_same_key,
1489 chunk_size,
1490 shutdown_rx,
1491 ..
1492 }: EquiJoinParams<K>,
1493 ) {
1494 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1495 let mut build_row_matched =
1496 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1497
1498 #[for_await]
1499 for probe_chunk in probe_side.execute() {
1500 let probe_chunk = probe_chunk?;
1501 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1502 for (probe_row_id, probe_key) in probe_keys
1503 .iter()
1504 .enumerate()
1505 .filter_by_bitmap(probe_chunk.visibility())
1506 {
1507 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1508 for build_row_id in
1509 next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id))
1510 {
1511 shutdown_rx.check()?;
1512 build_row_matched[build_row_id] = true;
1513 let build_chunk = &build_side[build_row_id.chunk_id()];
1514 if let Some(spilled) = Self::append_one_row(
1515 &mut chunk_builder,
1516 &probe_chunk,
1517 probe_row_id,
1518 build_chunk,
1519 build_row_id.row_id(),
1520 ) {
1521 yield spilled
1522 }
1523 }
1524 } else {
1525 let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
1526 if let Some(spilled) = Self::append_one_row_with_null_build_side(
1527 &mut chunk_builder,
1528 probe_row,
1529 build_data_types.len(),
1530 ) {
1531 yield spilled
1532 }
1533 }
1534 }
1535 }
1536 #[for_await]
1537 for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1538 &mut chunk_builder,
1539 &build_side,
1540 &build_row_matched,
1541 probe_data_types.len(),
1542 ) {
1543 yield spilled?
1544 }
1545 }
1546
1547 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1548 pub async fn do_full_outer_join_with_non_equi_condition(
1549 EquiJoinParams {
1550 probe_side,
1551 probe_data_types,
1552 probe_key_idxs,
1553 build_side,
1554 build_data_types,
1555 full_data_types,
1556 hash_map,
1557 next_build_row_with_same_key,
1558 chunk_size,
1559 shutdown_rx,
1560 ..
1561 }: EquiJoinParams<K>,
1562 cond: &BoxedExpression,
1563 ) {
1564 let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size);
1565 let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1566 let mut left_non_equi_state = LeftNonEquiJoinState {
1567 probe_column_count: probe_data_types.len(),
1568 ..Default::default()
1569 };
1570 let build_row_matched =
1571 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1572 let mut right_non_equi_state = RightNonEquiJoinState {
1573 build_row_matched,
1574 ..Default::default()
1575 };
1576
1577 #[for_await]
1578 for probe_chunk in probe_side.execute() {
1579 let probe_chunk = probe_chunk?;
1580 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1581 for (probe_row_id, probe_key) in probe_keys
1582 .iter()
1583 .enumerate()
1584 .filter_by_bitmap(probe_chunk.visibility())
1585 {
1586 left_non_equi_state.found_matched = false;
1587 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1588 left_non_equi_state
1589 .first_output_row_id
1590 .push(chunk_builder.buffered_count());
1591 let mut build_row_id_iter = next_build_row_with_same_key
1592 .row_id_iter(Some(*first_matched_build_row_id))
1593 .peekable();
1594 while let Some(build_row_id) = build_row_id_iter.next() {
1595 shutdown_rx.check()?;
1596 right_non_equi_state.build_row_ids.push(build_row_id);
1597 let build_chunk = &build_side[build_row_id.chunk_id()];
1598 if let Some(spilled) = Self::append_one_row(
1599 &mut chunk_builder,
1600 &probe_chunk,
1601 probe_row_id,
1602 build_chunk,
1603 build_row_id.row_id(),
1604 ) {
1605 left_non_equi_state.has_more_output_rows =
1606 build_row_id_iter.peek().is_some();
1607 yield Self::process_full_outer_join_non_equi_condition(
1608 spilled,
1609 cond.as_ref(),
1610 &mut left_non_equi_state,
1611 &mut right_non_equi_state,
1612 )
1613 .await?
1614 }
1615 }
1616 } else {
1617 shutdown_rx.check()?;
1618 let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
1619 if let Some(spilled) = Self::append_one_row_with_null_build_side(
1620 &mut remaining_chunk_builder,
1621 probe_row,
1622 build_data_types.len(),
1623 ) {
1624 yield spilled
1625 }
1626 }
1627 }
1628 }
1629 left_non_equi_state.has_more_output_rows = false;
1630 if let Some(spilled) = chunk_builder.consume_all() {
1631 yield Self::process_full_outer_join_non_equi_condition(
1632 spilled,
1633 cond.as_ref(),
1634 &mut left_non_equi_state,
1635 &mut right_non_equi_state,
1636 )
1637 .await?
1638 }
1639 #[for_await]
1640 for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1641 &mut remaining_chunk_builder,
1642 &build_side,
1643 &right_non_equi_state.build_row_matched,
1644 probe_data_types.len(),
1645 ) {
1646 yield spilled?
1647 }
1648 }
1649
1650 async fn process_left_outer_join_non_equi_condition(
1787 chunk: DataChunk,
1788 cond: &dyn Expression,
1789 LeftNonEquiJoinState {
1790 probe_column_count,
1791 first_output_row_id,
1792 has_more_output_rows,
1793 found_matched,
1794 }: &mut LeftNonEquiJoinState,
1795 ) -> Result<DataChunk> {
1796 let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1797 Ok(DataChunkMutator(chunk)
1798 .nullify_build_side_for_non_equi_condition(&filter, *probe_column_count)
1799 .remove_duplicate_rows_for_left_outer_join(
1800 &filter,
1801 first_output_row_id,
1802 *has_more_output_rows,
1803 found_matched,
1804 )
1805 .take())
1806 }
1807
1808 async fn process_left_semi_anti_join_non_equi_condition<const ANTI_JOIN: bool>(
1811 chunk: DataChunk,
1812 cond: &dyn Expression,
1813 LeftNonEquiJoinState {
1814 first_output_row_id,
1815 found_matched,
1816 has_more_output_rows,
1817 ..
1818 }: &mut LeftNonEquiJoinState,
1819 ) -> Result<DataChunk> {
1820 let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1821 Ok(DataChunkMutator(chunk)
1822 .remove_duplicate_rows_for_left_semi_anti_join::<ANTI_JOIN>(
1823 &filter,
1824 first_output_row_id,
1825 *has_more_output_rows,
1826 found_matched,
1827 )
1828 .take())
1829 }
1830
1831 async fn process_right_outer_join_non_equi_condition(
1832 chunk: DataChunk,
1833 cond: &dyn Expression,
1834 RightNonEquiJoinState {
1835 build_row_ids,
1836 build_row_matched,
1837 }: &mut RightNonEquiJoinState,
1838 ) -> Result<DataChunk> {
1839 let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1840 Ok(DataChunkMutator(chunk)
1841 .remove_duplicate_rows_for_right_outer_join(&filter, build_row_ids, build_row_matched)
1842 .take())
1843 }
1844
1845 async fn process_right_semi_anti_join_non_equi_condition(
1846 chunk: DataChunk,
1847 cond: &dyn Expression,
1848 RightNonEquiJoinState {
1849 build_row_ids,
1850 build_row_matched,
1851 }: &mut RightNonEquiJoinState,
1852 ) -> Result<()> {
1853 let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1854 DataChunkMutator(chunk).remove_duplicate_rows_for_right_semi_anti_join(
1855 &filter,
1856 build_row_ids,
1857 build_row_matched,
1858 );
1859 Ok(())
1860 }
1861
1862 async fn process_full_outer_join_non_equi_condition(
1863 chunk: DataChunk,
1864 cond: &dyn Expression,
1865 left_non_equi_state: &mut LeftNonEquiJoinState,
1866 right_non_equi_state: &mut RightNonEquiJoinState,
1867 ) -> Result<DataChunk> {
1868 let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1869 Ok(DataChunkMutator(chunk)
1870 .nullify_build_side_for_non_equi_condition(
1871 &filter,
1872 left_non_equi_state.probe_column_count,
1873 )
1874 .remove_duplicate_rows_for_full_outer_join(
1875 &filter,
1876 left_non_equi_state,
1877 right_non_equi_state,
1878 )
1879 .take())
1880 }
1881
1882 #[try_stream(ok = DataChunk, error = BatchError)]
1883 async fn handle_remaining_build_rows_for_right_outer_join<'a>(
1884 chunk_builder: &'a mut DataChunkBuilder,
1885 build_side: &'a [DataChunk],
1886 build_row_matched: &'a ChunkedData<bool>,
1887 probe_column_count: usize,
1888 ) {
1889 for build_row_id in build_row_matched
1890 .all_row_ids()
1891 .filter(|build_row_id| !build_row_matched[*build_row_id])
1892 {
1893 let build_row =
1894 build_side[build_row_id.chunk_id()].row_at_unchecked_vis(build_row_id.row_id());
1895 if let Some(spilled) = Self::append_one_row_with_null_probe_side(
1896 chunk_builder,
1897 build_row,
1898 probe_column_count,
1899 ) {
1900 yield spilled
1901 }
1902 }
1903 if let Some(spilled) = chunk_builder.consume_all() {
1904 yield spilled
1905 }
1906 }
1907
1908 #[try_stream(ok = DataChunk, error = BatchError)]
1909 async fn handle_remaining_build_rows_for_right_semi_anti_join<'a, const ANTI_JOIN: bool>(
1910 chunk_builder: &'a mut DataChunkBuilder,
1911 build_side: &'a [DataChunk],
1912 build_row_matched: &'a ChunkedData<bool>,
1913 ) {
1914 for build_row_id in build_row_matched.all_row_ids().filter(|build_row_id| {
1915 if !ANTI_JOIN {
1916 build_row_matched[*build_row_id]
1917 } else {
1918 !build_row_matched[*build_row_id]
1919 }
1920 }) {
1921 if let Some(spilled) = Self::append_one_build_row(
1922 chunk_builder,
1923 &build_side[build_row_id.chunk_id()],
1924 build_row_id.row_id(),
1925 ) {
1926 yield spilled
1927 }
1928 }
1929 if let Some(spilled) = chunk_builder.consume_all() {
1930 yield spilled
1931 }
1932 }
1933
1934 fn append_one_row(
1935 chunk_builder: &mut DataChunkBuilder,
1936 probe_chunk: &DataChunk,
1937 probe_row_id: usize,
1938 build_chunk: &DataChunk,
1939 build_row_id: usize,
1940 ) -> Option<DataChunk> {
1941 chunk_builder.append_one_row_from_array_elements(
1942 probe_chunk.columns().iter().map(|c| c.as_ref()),
1943 probe_row_id,
1944 build_chunk.columns().iter().map(|c| c.as_ref()),
1945 build_row_id,
1946 )
1947 }
1948
1949 fn append_one_probe_row(
1950 chunk_builder: &mut DataChunkBuilder,
1951 probe_chunk: &DataChunk,
1952 probe_row_id: usize,
1953 ) -> Option<DataChunk> {
1954 chunk_builder.append_one_row_from_array_elements(
1955 probe_chunk.columns().iter().map(|c| c.as_ref()),
1956 probe_row_id,
1957 empty(),
1958 0,
1959 )
1960 }
1961
1962 fn append_one_build_row(
1963 chunk_builder: &mut DataChunkBuilder,
1964 build_chunk: &DataChunk,
1965 build_row_id: usize,
1966 ) -> Option<DataChunk> {
1967 chunk_builder.append_one_row_from_array_elements(
1968 empty(),
1969 0,
1970 build_chunk.columns().iter().map(|c| c.as_ref()),
1971 build_row_id,
1972 )
1973 }
1974
1975 fn append_one_row_with_null_build_side(
1976 chunk_builder: &mut DataChunkBuilder,
1977 probe_row_ref: RowRef<'_>,
1978 build_column_count: usize,
1979 ) -> Option<DataChunk> {
1980 chunk_builder.append_one_row(probe_row_ref.chain(repeat_n(Datum::None, build_column_count)))
1981 }
1982
1983 fn append_one_row_with_null_probe_side(
1984 chunk_builder: &mut DataChunkBuilder,
1985 build_row_ref: RowRef<'_>,
1986 probe_column_count: usize,
1987 ) -> Option<DataChunk> {
1988 chunk_builder.append_one_row(repeat_n(Datum::None, probe_column_count).chain(build_row_ref))
1989 }
1990
1991 fn find_asof_matched_rows(
1992 probe_row_ref: RowRef<'_>,
1993 build_side: &[DataChunk],
1994 build_side_row_iter: RowIdIter<'_>,
1995 asof_join_condition: &AsOfDesc,
1996 ) -> Option<RowId> {
1997 let probe_inequality_value = probe_row_ref.datum_at(asof_join_condition.left_idx);
1998 if let Some(probe_inequality_scalar) = probe_inequality_value {
1999 let mut result_row_id: Option<RowId> = None;
2000 let mut build_row_ref;
2001
2002 for build_row_id in build_side_row_iter {
2003 build_row_ref =
2004 build_side[build_row_id.chunk_id()].row_at_unchecked_vis(build_row_id.row_id());
2005 let build_inequality_value = build_row_ref.datum_at(asof_join_condition.right_idx);
2006 if let Some(build_inequality_scalar) = build_inequality_value {
2007 let mut pick_result = |compare: fn(Ordering) -> bool| {
2008 if let Some(result_row_id_inner) = result_row_id {
2009 let result_row_ref = build_side[result_row_id_inner.chunk_id()]
2010 .row_at_unchecked_vis(result_row_id_inner.row_id());
2011 let result_inequality_scalar = result_row_ref
2012 .datum_at(asof_join_condition.right_idx)
2013 .unwrap();
2014 if compare(
2015 probe_inequality_scalar.default_cmp(&build_inequality_scalar),
2016 ) && compare(
2017 probe_inequality_scalar.default_cmp(&result_inequality_scalar),
2018 ) {
2019 result_row_id = Some(build_row_id);
2020 }
2021 } else if compare(
2022 probe_inequality_scalar.default_cmp(&build_inequality_scalar),
2023 ) {
2024 result_row_id = Some(build_row_id);
2025 }
2026 };
2027 match asof_join_condition.inequality_type {
2028 AsOfInequalityType::Lt => {
2029 pick_result(Ordering::is_lt);
2030 }
2031 AsOfInequalityType::Le => {
2032 pick_result(Ordering::is_le);
2033 }
2034 AsOfInequalityType::Gt => {
2035 pick_result(Ordering::is_gt);
2036 }
2037 AsOfInequalityType::Ge => {
2038 pick_result(Ordering::is_ge);
2039 }
2040 }
2041 }
2042 }
2043 result_row_id
2044 } else {
2045 None
2046 }
2047 }
2048}
2049
2050#[repr(transparent)]
2052struct DataChunkMutator(DataChunk);
2053
2054impl DataChunkMutator {
2055 fn nullify_build_side_for_non_equi_condition(
2056 self,
2057 filter: &Bitmap,
2058 probe_column_count: usize,
2059 ) -> Self {
2060 let (mut columns, vis) = self.0.into_parts();
2061
2062 for build_column in columns.split_off(probe_column_count) {
2063 let mut array = Arc::try_unwrap(build_column).unwrap();
2065 array.set_bitmap(array.null_bitmap() & filter);
2066 columns.push(array.into());
2067 }
2068
2069 Self(DataChunk::new(columns, vis))
2070 }
2071
2072 fn remove_duplicate_rows_for_left_outer_join(
2073 mut self,
2074 filter: &Bitmap,
2075 first_output_row_ids: &mut Vec<usize>,
2076 has_more_output_rows: bool,
2077 found_non_null: &mut bool,
2078 ) -> Self {
2079 let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2080
2081 for (&start_row_id, &end_row_id) in iter::once(&0)
2082 .chain(first_output_row_ids.iter())
2083 .tuple_windows()
2084 .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2085 {
2086 for row_id in start_row_id..end_row_id {
2087 if filter.is_set(row_id) {
2088 *found_non_null = true;
2089 new_visibility.set(row_id, true);
2090 }
2091 }
2092 if !*found_non_null {
2093 new_visibility.set(start_row_id, true);
2094 }
2095 *found_non_null = false;
2096 }
2097
2098 let start_row_id = first_output_row_ids.last().copied().unwrap_or_default();
2099 for row_id in start_row_id..filter.len() {
2100 if filter.is_set(row_id) {
2101 *found_non_null = true;
2102 new_visibility.set(row_id, true);
2103 }
2104 }
2105 if !has_more_output_rows {
2106 if !*found_non_null {
2107 new_visibility.set(start_row_id, true);
2108 }
2109 *found_non_null = false;
2110 }
2111
2112 first_output_row_ids.clear();
2113
2114 self.0
2115 .set_visibility(new_visibility.finish() & self.0.visibility());
2116 self
2117 }
2118
2119 fn remove_duplicate_rows_for_left_semi_anti_join<const ANTI_JOIN: bool>(
2123 mut self,
2124 filter: &Bitmap,
2125 first_output_row_ids: &mut Vec<usize>,
2126 has_more_output_rows: bool,
2127 found_matched: &mut bool,
2128 ) -> Self {
2129 let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2130
2131 for (&start_row_id, &end_row_id) in iter::once(&0)
2132 .chain(first_output_row_ids.iter())
2133 .tuple_windows()
2134 .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2135 {
2136 for row_id in start_row_id..end_row_id {
2137 if filter.is_set(row_id) {
2138 if !ANTI_JOIN && !*found_matched {
2139 new_visibility.set(row_id, true);
2140 }
2141 *found_matched = true;
2142 break;
2143 }
2144 }
2145 if ANTI_JOIN && !*found_matched {
2146 new_visibility.set(start_row_id, true);
2147 }
2148 *found_matched = false;
2149 }
2150
2151 let start_row_id = first_output_row_ids.last().copied().unwrap_or_default();
2152 for row_id in start_row_id..filter.len() {
2153 if filter.is_set(row_id) {
2154 if !ANTI_JOIN && !*found_matched {
2155 new_visibility.set(row_id, true);
2156 }
2157 *found_matched = true;
2158 break;
2159 }
2160 }
2161 if !has_more_output_rows && ANTI_JOIN {
2162 if !*found_matched {
2163 new_visibility.set(start_row_id, true);
2164 }
2165 *found_matched = false;
2166 }
2167
2168 first_output_row_ids.clear();
2169
2170 self.0
2171 .set_visibility(new_visibility.finish() & self.0.visibility());
2172 self
2173 }
2174
2175 fn remove_duplicate_rows_for_right_outer_join(
2176 mut self,
2177 filter: &Bitmap,
2178 build_row_ids: &mut Vec<RowId>,
2179 build_row_matched: &mut ChunkedData<bool>,
2180 ) -> Self {
2181 let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2182 for (output_row_id, (output_row_non_null, &build_row_id)) in
2183 filter.iter().zip_eq_fast(build_row_ids.iter()).enumerate()
2184 {
2185 if output_row_non_null {
2186 build_row_matched[build_row_id] = true;
2187 new_visibility.set(output_row_id, true);
2188 }
2189 }
2190
2191 build_row_ids.clear();
2192
2193 self.0
2194 .set_visibility(new_visibility.finish() & self.0.visibility());
2195 self
2196 }
2197
2198 fn remove_duplicate_rows_for_right_semi_anti_join(
2199 self,
2200 filter: &Bitmap,
2201 build_row_ids: &mut Vec<RowId>,
2202 build_row_matched: &mut ChunkedData<bool>,
2203 ) {
2204 for (output_row_non_null, &build_row_id) in filter.iter().zip_eq_fast(build_row_ids.iter())
2205 {
2206 if output_row_non_null {
2207 build_row_matched[build_row_id] = true;
2208 }
2209 }
2210
2211 build_row_ids.clear();
2212 }
2213
2214 fn remove_duplicate_rows_for_full_outer_join(
2215 mut self,
2216 filter: &Bitmap,
2217 LeftNonEquiJoinState {
2218 first_output_row_id,
2219 has_more_output_rows,
2220 found_matched,
2221 ..
2222 }: &mut LeftNonEquiJoinState,
2223 RightNonEquiJoinState {
2224 build_row_ids,
2225 build_row_matched,
2226 }: &mut RightNonEquiJoinState,
2227 ) -> Self {
2228 let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2229
2230 for (&start_row_id, &end_row_id) in iter::once(&0)
2231 .chain(first_output_row_id.iter())
2232 .tuple_windows()
2233 .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2234 {
2235 for row_id in start_row_id..end_row_id {
2236 if filter.is_set(row_id) {
2237 *found_matched = true;
2238 new_visibility.set(row_id, true);
2239 }
2240 }
2241 if !*found_matched {
2242 new_visibility.set(start_row_id, true);
2243 }
2244 *found_matched = false;
2245 }
2246
2247 let start_row_id = first_output_row_id.last().copied().unwrap_or_default();
2248 for row_id in start_row_id..filter.len() {
2249 if filter.is_set(row_id) {
2250 *found_matched = true;
2251 new_visibility.set(row_id, true);
2252 }
2253 }
2254 if !*has_more_output_rows && !*found_matched {
2255 new_visibility.set(start_row_id, true);
2256 }
2257
2258 first_output_row_id.clear();
2259
2260 for (output_row_id, (output_row_non_null, &build_row_id)) in
2261 filter.iter().zip_eq_fast(build_row_ids.iter()).enumerate()
2262 {
2263 if output_row_non_null {
2264 build_row_matched[build_row_id] = true;
2265 new_visibility.set(output_row_id, true);
2266 }
2267 }
2268
2269 build_row_ids.clear();
2270
2271 self.0
2272 .set_visibility(new_visibility.finish() & self.0.visibility());
2273 self
2274 }
2275
2276 fn take(self) -> DataChunk {
2277 self.0
2278 }
2279}
2280
2281impl BoxedExecutorBuilder for HashJoinExecutor<()> {
2282 async fn new_boxed_executor(
2283 context: &ExecutorBuilder<'_>,
2284 inputs: Vec<BoxedExecutor>,
2285 ) -> Result<BoxedExecutor> {
2286 let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap();
2287
2288 let hash_join_node = try_match_expand!(
2289 context.plan_node().get_node_body().unwrap(),
2290 NodeBody::HashJoin
2291 )?;
2292
2293 let join_type = JoinType::from_prost(hash_join_node.get_join_type()?);
2294
2295 let cond = match hash_join_node.get_condition() {
2296 Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
2297 Err(_) => None,
2298 };
2299
2300 let left_key_idxs = hash_join_node
2301 .get_left_key()
2302 .iter()
2303 .map(|&idx| idx as usize)
2304 .collect_vec();
2305 let right_key_idxs = hash_join_node
2306 .get_right_key()
2307 .iter()
2308 .map(|&idx| idx as usize)
2309 .collect_vec();
2310
2311 ensure!(left_key_idxs.len() == right_key_idxs.len());
2312
2313 let right_data_types = right_child.schema().data_types();
2314 let right_key_types = right_key_idxs
2315 .iter()
2316 .map(|&idx| right_data_types[idx].clone())
2317 .collect_vec();
2318
2319 let output_indices: Vec<usize> = hash_join_node
2320 .get_output_indices()
2321 .iter()
2322 .map(|&x| x as usize)
2323 .collect();
2324
2325 let identity = context.plan_node().get_identity().clone();
2326
2327 let asof_desc = hash_join_node
2328 .asof_desc
2329 .map(|desc| AsOfDesc::from_protobuf(&desc))
2330 .transpose()?;
2331
2332 Ok(HashJoinExecutorArgs {
2333 join_type,
2334 output_indices,
2335 probe_side_source: left_child,
2336 build_side_source: right_child,
2337 probe_key_idxs: left_key_idxs,
2338 build_key_idxs: right_key_idxs,
2339 null_matched: hash_join_node.get_null_safe().clone(),
2340 cond,
2341 identity: identity.clone(),
2342 right_key_types,
2343 chunk_size: context.context().get_config().developer.chunk_size,
2344 asof_desc,
2345 spill_backend: if context.context().get_config().enable_spill {
2346 Some(Disk)
2347 } else {
2348 None
2349 },
2350 spill_metrics: context.context().spill_metrics(),
2351 shutdown_rx: context.shutdown_rx().clone(),
2352 mem_ctx: context.context().create_executor_mem_context(&identity),
2353 }
2354 .dispatch())
2355 }
2356}
2357
2358struct HashJoinExecutorArgs {
2359 join_type: JoinType,
2360 output_indices: Vec<usize>,
2361 probe_side_source: BoxedExecutor,
2362 build_side_source: BoxedExecutor,
2363 probe_key_idxs: Vec<usize>,
2364 build_key_idxs: Vec<usize>,
2365 null_matched: Vec<bool>,
2366 cond: Option<BoxedExpression>,
2367 identity: String,
2368 right_key_types: Vec<DataType>,
2369 chunk_size: usize,
2370 asof_desc: Option<AsOfDesc>,
2371 spill_backend: Option<SpillBackend>,
2372 spill_metrics: Arc<BatchSpillMetrics>,
2373 shutdown_rx: ShutdownToken,
2374 mem_ctx: MemoryContext,
2375}
2376
2377impl HashKeyDispatcher for HashJoinExecutorArgs {
2378 type Output = BoxedExecutor;
2379
2380 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
2381 Box::new(HashJoinExecutor::<K>::new(
2382 self.join_type,
2383 self.output_indices,
2384 self.probe_side_source,
2385 self.build_side_source,
2386 self.probe_key_idxs,
2387 self.build_key_idxs,
2388 self.null_matched,
2389 self.cond.map(Arc::new),
2390 self.identity,
2391 self.chunk_size,
2392 self.asof_desc,
2393 self.spill_backend,
2394 self.spill_metrics,
2395 self.shutdown_rx,
2396 self.mem_ctx,
2397 ))
2398 }
2399
2400 fn data_types(&self) -> &[DataType] {
2401 &self.right_key_types
2402 }
2403}
2404
2405impl<K> HashJoinExecutor<K> {
2406 #[allow(clippy::too_many_arguments)]
2407 pub fn new(
2408 join_type: JoinType,
2409 output_indices: Vec<usize>,
2410 probe_side_source: BoxedExecutor,
2411 build_side_source: BoxedExecutor,
2412 probe_key_idxs: Vec<usize>,
2413 build_key_idxs: Vec<usize>,
2414 null_matched: Vec<bool>,
2415 cond: Option<Arc<BoxedExpression>>,
2416 identity: String,
2417 chunk_size: usize,
2418 asof_desc: Option<AsOfDesc>,
2419 spill_backend: Option<SpillBackend>,
2420 spill_metrics: Arc<BatchSpillMetrics>,
2421 shutdown_rx: ShutdownToken,
2422 mem_ctx: MemoryContext,
2423 ) -> Self {
2424 Self::new_inner(
2425 join_type,
2426 output_indices,
2427 probe_side_source,
2428 build_side_source,
2429 probe_key_idxs,
2430 build_key_idxs,
2431 null_matched,
2432 cond,
2433 identity,
2434 chunk_size,
2435 asof_desc,
2436 spill_backend,
2437 spill_metrics,
2438 None,
2439 shutdown_rx,
2440 mem_ctx,
2441 )
2442 }
2443
2444 #[allow(clippy::too_many_arguments)]
2445 fn new_inner(
2446 join_type: JoinType,
2447 output_indices: Vec<usize>,
2448 probe_side_source: BoxedExecutor,
2449 build_side_source: BoxedExecutor,
2450 probe_key_idxs: Vec<usize>,
2451 build_key_idxs: Vec<usize>,
2452 null_matched: Vec<bool>,
2453 cond: Option<Arc<BoxedExpression>>,
2454 identity: String,
2455 chunk_size: usize,
2456 asof_desc: Option<AsOfDesc>,
2457 spill_backend: Option<SpillBackend>,
2458 spill_metrics: Arc<BatchSpillMetrics>,
2459 memory_upper_bound: Option<u64>,
2460 shutdown_rx: ShutdownToken,
2461 mem_ctx: MemoryContext,
2462 ) -> Self {
2463 assert_eq!(probe_key_idxs.len(), build_key_idxs.len());
2464 assert_eq!(probe_key_idxs.len(), null_matched.len());
2465 let original_schema = match join_type {
2466 JoinType::LeftSemi | JoinType::LeftAnti => probe_side_source.schema().clone(),
2467 JoinType::RightSemi | JoinType::RightAnti => build_side_source.schema().clone(),
2468 _ => Schema::from_iter(
2469 probe_side_source
2470 .schema()
2471 .fields()
2472 .iter()
2473 .chain(build_side_source.schema().fields().iter())
2474 .cloned(),
2475 ),
2476 };
2477 let schema = Schema::from_iter(
2478 output_indices
2479 .iter()
2480 .map(|&idx| original_schema[idx].clone()),
2481 );
2482 Self {
2483 join_type,
2484 original_schema,
2485 schema,
2486 output_indices,
2487 probe_side_source,
2488 build_side_source,
2489 probe_key_idxs,
2490 build_key_idxs,
2491 null_matched,
2492 cond,
2493 identity,
2494 chunk_size,
2495 asof_desc,
2496 shutdown_rx,
2497 spill_backend,
2498 spill_metrics,
2499 memory_upper_bound,
2500 mem_ctx,
2501 _phantom: PhantomData,
2502 }
2503 }
2504}
2505
2506#[cfg(test)]
2507mod tests {
2508 use futures::StreamExt;
2509 use futures_async_stream::for_await;
2510 use itertools::Itertools;
2511 use risingwave_common::array::{ArrayBuilderImpl, DataChunk};
2512 use risingwave_common::catalog::{Field, Schema};
2513 use risingwave_common::hash::Key32;
2514 use risingwave_common::memory::MemoryContext;
2515 use risingwave_common::metrics::LabelGuardedIntGauge;
2516 use risingwave_common::test_prelude::DataChunkTestExt;
2517 use risingwave_common::types::DataType;
2518 use risingwave_common::util::iter_util::ZipEqDebug;
2519 use risingwave_common::util::memcmp_encoding::encode_chunk;
2520 use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
2521 use risingwave_expr::expr::{BoxedExpression, build_from_pretty};
2522
2523 use super::{
2524 ChunkedData, HashJoinExecutor, JoinType, LeftNonEquiJoinState, RightNonEquiJoinState, RowId,
2525 };
2526 use crate::error::Result;
2527 use crate::executor::BoxedExecutor;
2528 use crate::executor::test_utils::MockExecutor;
2529 use crate::monitor::BatchSpillMetrics;
2530 use crate::spill::spill_op::SpillBackend;
2531 use crate::task::ShutdownToken;
2532
2533 const CHUNK_SIZE: usize = 1024;
2534
2535 struct DataChunkMerger {
2536 array_builders: Vec<ArrayBuilderImpl>,
2537 array_len: usize,
2538 }
2539
2540 impl DataChunkMerger {
2541 fn new(data_types: Vec<DataType>) -> Result<Self> {
2542 let array_builders = data_types
2543 .iter()
2544 .map(|data_type| data_type.create_array_builder(CHUNK_SIZE))
2545 .collect();
2546
2547 Ok(Self {
2548 array_builders,
2549 array_len: 0,
2550 })
2551 }
2552
2553 fn append(&mut self, data_chunk: &DataChunk) -> Result<()> {
2554 ensure!(self.array_builders.len() == data_chunk.dimension());
2555 for idx in 0..self.array_builders.len() {
2556 self.array_builders[idx].append_array(data_chunk.column_at(idx));
2557 }
2558 self.array_len += data_chunk.capacity();
2559
2560 Ok(())
2561 }
2562
2563 fn finish(self) -> Result<DataChunk> {
2564 let columns = self
2565 .array_builders
2566 .into_iter()
2567 .map(|b| b.finish().into())
2568 .collect();
2569
2570 Ok(DataChunk::new(columns, self.array_len))
2571 }
2572 }
2573
2574 fn compare_data_chunk_with_rowsort(left: &DataChunk, right: &DataChunk) -> bool {
2576 assert!(left.is_compacted());
2577 assert!(right.is_compacted());
2578
2579 if left.cardinality() != right.cardinality() {
2580 return false;
2581 }
2582
2583 let column_orders = (0..left.columns().len())
2585 .map(|i| ColumnOrder::new(i, OrderType::ascending()))
2586 .collect_vec();
2587 let left_encoded_chunk = encode_chunk(left, &column_orders).unwrap();
2588 let mut sorted_left = left_encoded_chunk
2589 .into_iter()
2590 .enumerate()
2591 .map(|(row_id, row)| (left.row_at_unchecked_vis(row_id), row))
2592 .collect_vec();
2593 sorted_left.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
2594
2595 let right_encoded_chunk = encode_chunk(right, &column_orders).unwrap();
2596 let mut sorted_right = right_encoded_chunk
2597 .into_iter()
2598 .enumerate()
2599 .map(|(row_id, row)| (right.row_at_unchecked_vis(row_id), row))
2600 .collect_vec();
2601 sorted_right.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
2602
2603 sorted_left
2604 .into_iter()
2605 .map(|(row, _)| row)
2606 .zip_eq_debug(sorted_right.into_iter().map(|(row, _)| row))
2607 .all(|(row1, row2)| row1 == row2)
2608 }
2609
2610 struct TestFixture {
2611 left_types: Vec<DataType>,
2612 right_types: Vec<DataType>,
2613 join_type: JoinType,
2614 }
2615
2616 impl TestFixture {
2633 fn with_join_type(join_type: JoinType) -> Self {
2634 Self {
2635 left_types: vec![DataType::Int32, DataType::Float32],
2636 right_types: vec![DataType::Int32, DataType::Float64],
2637 join_type,
2638 }
2639 }
2640
2641 fn create_left_executor(&self) -> BoxedExecutor {
2642 let schema = Schema {
2643 fields: vec![
2644 Field::unnamed(DataType::Int32),
2645 Field::unnamed(DataType::Float32),
2646 ],
2647 };
2648 let mut executor = MockExecutor::new(schema);
2649
2650 executor.add(DataChunk::from_pretty(
2651 "i f
2652 1 6.1
2653 2 .
2654 . 8.4
2655 3 3.9
2656 . . ",
2657 ));
2658
2659 executor.add(DataChunk::from_pretty(
2660 "i f
2661 4 6.6
2662 3 .
2663 . 0.7
2664 5 .
2665 . 5.5",
2666 ));
2667
2668 Box::new(executor)
2669 }
2670
2671 fn create_right_executor(&self) -> BoxedExecutor {
2672 let schema = Schema {
2673 fields: vec![
2674 Field::unnamed(DataType::Int32),
2675 Field::unnamed(DataType::Float64),
2676 ],
2677 };
2678 let mut executor = MockExecutor::new(schema);
2679
2680 executor.add(DataChunk::from_pretty(
2681 "i F
2682 8 6.1
2683 2 .
2684 . 8.9
2685 3 .
2686 . 3.5
2687 6 . ",
2688 ));
2689
2690 executor.add(DataChunk::from_pretty(
2691 "i F
2692 4 7.5
2693 6 .
2694 . 8
2695 7 .
2696 . 9.1
2697 9 . ",
2698 ));
2699
2700 executor.add(DataChunk::from_pretty(
2701 " i F
2702 3 3.7
2703 9 .
2704 . 9.6
2705 100 .
2706 . 8.18
2707 200 . ",
2708 ));
2709
2710 Box::new(executor)
2711 }
2712
2713 fn output_data_types(&self) -> Vec<DataType> {
2714 let join_type = self.join_type;
2715 if join_type.keep_all() {
2716 [self.left_types.clone(), self.right_types.clone()].concat()
2717 } else if join_type.keep_left() {
2718 self.left_types.clone()
2719 } else if join_type.keep_right() {
2720 self.right_types.clone()
2721 } else {
2722 unreachable!()
2723 }
2724 }
2725
2726 fn create_cond() -> BoxedExpression {
2727 build_from_pretty("(less_than:boolean $1:float4 $3:float8)")
2728 }
2729
2730 fn create_join_executor_with_chunk_size_and_executors(
2731 &self,
2732 has_non_equi_cond: bool,
2733 null_safe: bool,
2734 chunk_size: usize,
2735 left_child: BoxedExecutor,
2736 right_child: BoxedExecutor,
2737 shutdown_rx: ShutdownToken,
2738 parent_mem_ctx: Option<MemoryContext>,
2739 test_spill: bool,
2740 ) -> BoxedExecutor {
2741 let join_type = self.join_type;
2742
2743 let output_indices = (0..match join_type {
2744 JoinType::LeftSemi | JoinType::LeftAnti => left_child.schema().fields().len(),
2745 JoinType::RightSemi | JoinType::RightAnti => right_child.schema().fields().len(),
2746 _ => left_child.schema().fields().len() + right_child.schema().fields().len(),
2747 })
2748 .collect();
2749
2750 let cond = if has_non_equi_cond {
2751 Some(Self::create_cond().into())
2752 } else {
2753 None
2754 };
2755
2756 let mem_ctx = if test_spill {
2757 MemoryContext::new_with_mem_limit(
2758 parent_mem_ctx,
2759 LabelGuardedIntGauge::<4>::test_int_gauge(),
2760 0,
2761 )
2762 } else {
2763 MemoryContext::new(parent_mem_ctx, LabelGuardedIntGauge::<4>::test_int_gauge())
2764 };
2765 Box::new(HashJoinExecutor::<Key32>::new(
2766 join_type,
2767 output_indices,
2768 left_child,
2769 right_child,
2770 vec![0],
2771 vec![0],
2772 vec![null_safe],
2773 cond,
2774 "HashJoinExecutor".to_owned(),
2775 chunk_size,
2776 None,
2777 if test_spill {
2778 Some(SpillBackend::Memory)
2779 } else {
2780 None
2781 },
2782 BatchSpillMetrics::for_test(),
2783 shutdown_rx,
2784 mem_ctx,
2785 ))
2786 }
2787
2788 async fn do_test(&self, expected: DataChunk, has_non_equi_cond: bool, null_safe: bool) {
2789 let left_executor = self.create_left_executor();
2790 let right_executor = self.create_right_executor();
2791 self.do_test_with_chunk_size_and_executors(
2792 expected.clone(),
2793 has_non_equi_cond,
2794 null_safe,
2795 self::CHUNK_SIZE,
2796 left_executor,
2797 right_executor,
2798 false,
2799 )
2800 .await;
2801
2802 let left_executor = self.create_left_executor();
2804 let right_executor = self.create_right_executor();
2805 self.do_test_with_chunk_size_and_executors(
2806 expected,
2807 has_non_equi_cond,
2808 null_safe,
2809 self::CHUNK_SIZE,
2810 left_executor,
2811 right_executor,
2812 true,
2813 )
2814 .await;
2815 }
2816
2817 async fn do_test_with_chunk_size_and_executors(
2818 &self,
2819 expected: DataChunk,
2820 has_non_equi_cond: bool,
2821 null_safe: bool,
2822 chunk_size: usize,
2823 left_executor: BoxedExecutor,
2824 right_executor: BoxedExecutor,
2825 test_spill: bool,
2826 ) {
2827 let parent_mem_context =
2828 MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge(), u64::MAX);
2829
2830 {
2831 let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2832 has_non_equi_cond,
2833 null_safe,
2834 chunk_size,
2835 left_executor,
2836 right_executor,
2837 ShutdownToken::empty(),
2838 Some(parent_mem_context.clone()),
2839 test_spill,
2840 );
2841
2842 let mut data_chunk_merger = DataChunkMerger::new(self.output_data_types()).unwrap();
2843
2844 let fields = &join_executor.schema().fields;
2845
2846 if self.join_type.keep_all() {
2847 assert_eq!(fields[1].data_type, DataType::Float32);
2848 assert_eq!(fields[3].data_type, DataType::Float64);
2849 } else if self.join_type.keep_left() {
2850 assert_eq!(fields[1].data_type, DataType::Float32);
2851 } else if self.join_type.keep_right() {
2852 assert_eq!(fields[1].data_type, DataType::Float64)
2853 } else {
2854 unreachable!()
2855 }
2856
2857 let mut stream = join_executor.execute();
2858
2859 while let Some(data_chunk) = stream.next().await {
2860 let data_chunk = data_chunk.unwrap();
2861 let data_chunk = data_chunk.compact();
2862 data_chunk_merger.append(&data_chunk).unwrap();
2863 }
2864
2865 let result_chunk = data_chunk_merger.finish().unwrap();
2866 println!("expected: {:?}", expected);
2867 println!("result: {:?}", result_chunk);
2868
2869 assert!(compare_data_chunk_with_rowsort(&expected, &result_chunk));
2872 }
2873
2874 assert_eq!(0, parent_mem_context.get_bytes_used());
2875 }
2876
2877 async fn do_test_shutdown(&self, has_non_equi_cond: bool) {
2878 let left_executor = self.create_left_executor();
2880 let right_executor = self.create_right_executor();
2881 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
2882 let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2883 has_non_equi_cond,
2884 false,
2885 self::CHUNK_SIZE,
2886 left_executor,
2887 right_executor,
2888 shutdown_rx,
2889 None,
2890 false,
2891 );
2892 shutdown_tx.cancel();
2893 #[for_await]
2894 for chunk in join_executor.execute() {
2895 assert!(chunk.is_err());
2896 break;
2897 }
2898
2899 let left_executor = self.create_left_executor();
2901 let right_executor = self.create_right_executor();
2902 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
2903 let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2904 has_non_equi_cond,
2905 false,
2906 self::CHUNK_SIZE,
2907 left_executor,
2908 right_executor,
2909 shutdown_rx,
2910 None,
2911 false,
2912 );
2913 shutdown_tx.abort("test");
2914 #[for_await]
2915 for chunk in join_executor.execute() {
2916 assert!(chunk.is_err());
2917 break;
2918 }
2919 }
2920 }
2921
2922 #[tokio::test]
2927 async fn test_inner_join() {
2928 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2929
2930 let expected_chunk = DataChunk::from_pretty(
2931 "i f i F
2932 2 . 2 .
2933 3 3.9 3 3.7
2934 3 3.9 3 .
2935 4 6.6 4 7.5
2936 3 . 3 3.7
2937 3 . 3 .",
2938 );
2939
2940 test_fixture.do_test(expected_chunk, false, false).await;
2941 }
2942
2943 #[tokio::test]
2948 async fn test_null_safe_inner_join() {
2949 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2950
2951 let expected_chunk = DataChunk::from_pretty(
2952 "i f i F
2953 2 . 2 .
2954 . 8.4 . 8.18
2955 . 8.4 . 9.6
2956 . 8.4 . 9.1
2957 . 8.4 . 8
2958 . 8.4 . 3.5
2959 . 8.4 . 8.9
2960 3 3.9 3 3.7
2961 3 3.9 3 .
2962 . . . 8.18
2963 . . . 9.6
2964 . . . 9.1
2965 . . . 8
2966 . . . 3.5
2967 . . . 8.9
2968 4 6.6 4 7.5
2969 3 . 3 3.7
2970 3 . 3 .
2971 . 0.7 . 8.18
2972 . 0.7 . 9.6
2973 . 0.7 . 9.1
2974 . 0.7 . 8
2975 . 0.7 . 3.5
2976 . 0.7 . 8.9
2977 . 5.5 . 8.18
2978 . 5.5 . 9.6
2979 . 5.5 . 9.1
2980 . 5.5 . 8
2981 . 5.5 . 3.5
2982 . 5.5 . 8.9",
2983 );
2984
2985 test_fixture.do_test(expected_chunk, false, true).await;
2986 }
2987
2988 #[tokio::test]
2993 async fn test_inner_join_with_non_equi_condition() {
2994 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2995
2996 let expected_chunk = DataChunk::from_pretty(
2997 "i f i F
2998 4 6.6 4 7.5",
2999 );
3000
3001 test_fixture.do_test(expected_chunk, true, false).await;
3002 }
3003
3004 #[tokio::test]
3009 async fn test_left_outer_join() {
3010 let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
3011
3012 let expected_chunk = DataChunk::from_pretty(
3013 "i f i F
3014 1 6.1 . .
3015 2 . 2 .
3016 . 8.4 . .
3017 3 3.9 3 3.7
3018 3 3.9 3 .
3019 . . . .
3020 4 6.6 4 7.5
3021 3 . 3 3.7
3022 3 . 3 .
3023 . 0.7 . .
3024 5 . . .
3025 . 5.5 . .",
3026 );
3027
3028 test_fixture.do_test(expected_chunk, false, false).await;
3029 }
3030
3031 #[tokio::test]
3036 async fn test_left_outer_join_with_non_equi_condition() {
3037 let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
3038
3039 let expected_chunk = DataChunk::from_pretty(
3040 "i f i F
3041 2 . . .
3042 3 3.9 . .
3043 4 6.6 4 7.5
3044 3 . . .
3045 1 6.1 . .
3046 . 8.4 . .
3047 . . . .
3048 . 0.7 . .
3049 5 . . .
3050 . 5.5 . .",
3051 );
3052
3053 test_fixture.do_test(expected_chunk, true, false).await;
3054 }
3055
3056 #[tokio::test]
3061 async fn test_right_outer_join() {
3062 let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
3063
3064 let expected_chunk = DataChunk::from_pretty(
3065 "i f i F
3066 2 . 2 .
3067 3 3.9 3 3.7
3068 3 3.9 3 .
3069 4 6.6 4 7.5
3070 3 . 3 3.7
3071 3 . 3 .
3072 . . 8 6.1
3073 . . . 8.9
3074 . . . 3.5
3075 . . 6 .
3076 . . 6 .
3077 . . . 8
3078 . . 7 .
3079 . . . 9.1
3080 . . 9 .
3081 . . 9 .
3082 . . . 9.6
3083 . . 100 .
3084 . . . 8.18
3085 . . 200 .",
3086 );
3087
3088 test_fixture.do_test(expected_chunk, false, false).await;
3089 }
3090
3091 #[tokio::test]
3096 async fn test_right_outer_join_with_non_equi_condition() {
3097 let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
3098
3099 let expected_chunk = DataChunk::from_pretty(
3100 "i f i F
3101 4 6.6 4 7.5
3102 . . 8 6.1
3103 . . 2 .
3104 . . . 8.9
3105 . . 3 .
3106 . . . 3.5
3107 . . 6 .
3108 . . 6 .
3109 . . . 8
3110 . . 7 .
3111 . . . 9.1
3112 . . 9 .
3113 . . 3 3.7
3114 . . 9 .
3115 . . . 9.6
3116 . . 100 .
3117 . . . 8.18
3118 . . 200 .",
3119 );
3120
3121 test_fixture.do_test(expected_chunk, true, false).await;
3122 }
3123
3124 #[tokio::test]
3128 async fn test_full_outer_join() {
3129 let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
3130
3131 let expected_chunk = DataChunk::from_pretty(
3132 "i f i F
3133 1 6.1 . .
3134 2 . 2 .
3135 . 8.4 . .
3136 3 3.9 3 3.7
3137 3 3.9 3 .
3138 . . . .
3139 4 6.6 4 7.5
3140 3 . 3 3.7
3141 3 . 3 .
3142 . 0.7 . .
3143 5 . . .
3144 . 5.5 . .
3145 . . 8 6.1
3146 . . . 8.9
3147 . . . 3.5
3148 . . 6 .
3149 . . 6 .
3150 . . . 8
3151 . . 7 .
3152 . . . 9.1
3153 . . 9 .
3154 . . 9 .
3155 . . . 9.6
3156 . . 100 .
3157 . . . 8.18
3158 . . 200 .",
3159 );
3160
3161 test_fixture.do_test(expected_chunk, false, false).await;
3162 }
3163
3164 #[tokio::test]
3168 async fn test_full_outer_join_with_non_equi_condition() {
3169 let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
3170
3171 let expected_chunk = DataChunk::from_pretty(
3172 "i f i F
3173 2 . . .
3174 3 3.9 . .
3175 4 6.6 4 7.5
3176 3 . . .
3177 1 6.1 . .
3178 . 8.4 . .
3179 . . . .
3180 . 0.7 . .
3181 5 . . .
3182 . 5.5 . .
3183 . . 8 6.1
3184 . . 2 .
3185 . . . 8.9
3186 . . 3 .
3187 . . . 3.5
3188 . . 6 .
3189 . . 6 .
3190 . . . 8
3191 . . 7 .
3192 . . . 9.1
3193 . . 9 .
3194 . . 3 3.7
3195 . . 9 .
3196 . . . 9.6
3197 . . 100 .
3198 . . . 8.18
3199 . . 200 .",
3200 );
3201
3202 test_fixture.do_test(expected_chunk, true, false).await;
3203 }
3204
3205 #[tokio::test]
3206 async fn test_left_anti_join() {
3207 let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
3208
3209 let expected_chunk = DataChunk::from_pretty(
3210 "i f
3211 1 6.1
3212 . 8.4
3213 . .
3214 . 0.7
3215 5 .
3216 . 5.5",
3217 );
3218
3219 test_fixture.do_test(expected_chunk, false, false).await;
3220 }
3221
3222 #[tokio::test]
3223 async fn test_left_anti_join_with_non_equi_condition() {
3224 let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
3225
3226 let expected_chunk = DataChunk::from_pretty(
3227 "i f
3228 2 .
3229 3 3.9
3230 3 .
3231 1 6.1
3232 . 8.4
3233 . .
3234 . 0.7
3235 5 .
3236 . 5.5",
3237 );
3238
3239 test_fixture.do_test(expected_chunk, true, false).await;
3240 }
3241
3242 #[tokio::test]
3243 async fn test_left_semi_join() {
3244 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3245
3246 let expected_chunk = DataChunk::from_pretty(
3247 "i f
3248 2 .
3249 3 3.9
3250 4 6.6
3251 3 .",
3252 );
3253
3254 test_fixture.do_test(expected_chunk, false, false).await;
3255 }
3256
3257 #[tokio::test]
3258 async fn test_left_semi_join_with_non_equi_condition() {
3259 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3260
3261 let expected_chunk = DataChunk::from_pretty(
3262 "i f
3263 4 6.6",
3264 );
3265
3266 test_fixture.do_test(expected_chunk, true, false).await;
3267 }
3268
3269 #[tokio::test]
3274 async fn test_left_semi_join_with_non_equi_condition_duplicates() {
3275 let schema = Schema {
3276 fields: vec![
3277 Field::unnamed(DataType::Int32),
3278 Field::unnamed(DataType::Float32),
3279 ],
3280 };
3281
3282 let mut left_executor = MockExecutor::new(schema);
3284 left_executor.add(DataChunk::from_pretty(
3285 "i f
3286 1 1.0
3287 1 1.0
3288 1 1.0
3289 1 1.0
3290 2 1.0",
3291 ));
3292
3293 let schema = Schema {
3295 fields: vec![
3296 Field::unnamed(DataType::Int32),
3297 Field::unnamed(DataType::Float64),
3298 ],
3299 };
3300 let mut right_executor = MockExecutor::new(schema);
3301 right_executor.add(DataChunk::from_pretty(
3302 "i F
3303 1 2.0
3304 1 2.0
3305 1 2.0
3306 1 2.0
3307 2 2.0",
3308 ));
3309
3310 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3311 let expected_chunk = DataChunk::from_pretty(
3312 "i f
3313 1 1.0
3314 1 1.0
3315 1 1.0
3316 1 1.0
3317 2 1.0",
3318 );
3319
3320 test_fixture
3321 .do_test_with_chunk_size_and_executors(
3322 expected_chunk,
3323 true,
3324 false,
3325 3,
3326 Box::new(left_executor),
3327 Box::new(right_executor),
3328 false,
3329 )
3330 .await;
3331 }
3332
3333 #[tokio::test]
3334 async fn test_right_anti_join() {
3335 let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
3336
3337 let expected_chunk = DataChunk::from_pretty(
3338 "i F
3339 8 6.1
3340 . 8.9
3341 . 3.5
3342 6 .
3343 6 .
3344 . 8.0
3345 7 .
3346 . 9.1
3347 9 .
3348 9 .
3349 . 9.6
3350 100 .
3351 . 8.18
3352 200 .",
3353 );
3354
3355 test_fixture.do_test(expected_chunk, false, false).await;
3356 }
3357
3358 #[tokio::test]
3359 async fn test_right_anti_join_with_non_equi_condition() {
3360 let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
3361
3362 let expected_chunk = DataChunk::from_pretty(
3363 "i F
3364 8 6.1
3365 2 .
3366 . 8.9
3367 3 .
3368 . 3.5
3369 6 .
3370 6 .
3371 . 8
3372 7 .
3373 . 9.1
3374 9 .
3375 3 3.7
3376 9 .
3377 . 9.6
3378 100 .
3379 . 8.18
3380 200 .",
3381 );
3382
3383 test_fixture.do_test(expected_chunk, true, false).await;
3384 }
3385
3386 #[tokio::test]
3387 async fn test_right_semi_join() {
3388 let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
3389
3390 let expected_chunk = DataChunk::from_pretty(
3391 "i F
3392 2 .
3393 3 .
3394 4 7.5
3395 3 3.7",
3396 );
3397
3398 test_fixture.do_test(expected_chunk, false, false).await;
3399 }
3400
3401 #[tokio::test]
3402 async fn test_right_semi_join_with_non_equi_condition() {
3403 let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
3404
3405 let expected_chunk = DataChunk::from_pretty(
3406 "i F
3407 4 7.5",
3408 );
3409
3410 test_fixture.do_test(expected_chunk, true, false).await;
3411 }
3412
3413 #[tokio::test]
3414 async fn test_process_left_outer_join_non_equi_condition() {
3415 let chunk = DataChunk::from_pretty(
3416 "i f i F
3417 1 3.5 1 5.5
3418 1 3.5 1 2.5
3419 2 4.0 . .
3420 3 5.0 3 4.0
3421 3 5.0 3 3.0
3422 3 5.0 3 4.0
3423 3 5.0 3 3.0
3424 4 1.0 4 0
3425 4 1.0 4 9.0",
3426 );
3427 let expect = DataChunk::from_pretty(
3428 "i f i F
3429 1 3.5 1 5.5
3430 2 4.0 . .
3431 3 5.0 . .
3432 3 5.0 . .
3433 4 1.0 4 9.0",
3434 );
3435 let cond = TestFixture::create_cond();
3436 let mut state = LeftNonEquiJoinState {
3437 probe_column_count: 2,
3438 first_output_row_id: vec![0, 2, 3, 5, 7],
3439 has_more_output_rows: true,
3440 found_matched: false,
3441 };
3442 assert!(compare_data_chunk_with_rowsort(
3443 &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3444 chunk,
3445 cond.as_ref(),
3446 &mut state
3447 )
3448 .await
3449 .unwrap()
3450 .compact(),
3451 &expect
3452 ));
3453 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3454 assert!(state.found_matched);
3455
3456 let chunk = DataChunk::from_pretty(
3457 "i f i F
3458 4 1.0 4 0.6
3459 4 1.0 4 2.0
3460 5 4.0 5 .
3461 6 7.0 6 .
3462 6 7.0 6 5.0",
3463 );
3464 let expect = DataChunk::from_pretty(
3465 "i f i F
3466 4 1.0 4 2.0
3467 5 4.0 . .
3468 6 7.0 . .",
3469 );
3470 state.first_output_row_id = vec![2, 3];
3471 state.has_more_output_rows = false;
3472 assert!(compare_data_chunk_with_rowsort(
3473 &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3474 chunk,
3475 cond.as_ref(),
3476 &mut state
3477 )
3478 .await
3479 .unwrap()
3480 .compact(),
3481 &expect
3482 ));
3483 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3484 assert!(!state.found_matched);
3485
3486 let chunk = DataChunk::from_pretty(
3487 "i f i F
3488 4 1.0 4 0.6
3489 4 1.0 4 1.0
3490 5 4.0 5 .
3491 6 7.0 6 .
3492 6 7.0 6 8.0",
3493 );
3494 let expect = DataChunk::from_pretty(
3495 "i f i F
3496 4 1.0 . .
3497 5 4.0 . .
3498 6 7.0 6 8.0",
3499 );
3500 state.first_output_row_id = vec![2, 3];
3501 state.has_more_output_rows = false;
3502 assert!(compare_data_chunk_with_rowsort(
3503 &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3504 chunk,
3505 cond.as_ref(),
3506 &mut state
3507 )
3508 .await
3509 .unwrap()
3510 .compact(),
3511 &expect
3512 ));
3513 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3514 assert!(!state.found_matched);
3515 }
3516
3517 #[tokio::test]
3518 async fn test_process_left_semi_join_non_equi_condition() {
3519 let chunk = DataChunk::from_pretty(
3520 "i f i F
3521 1 3.5 1 5.5
3522 1 3.5 1 2.5
3523 2 4.0 . .
3524 3 5.0 3 4.0
3525 3 5.0 3 3.0
3526 3 5.0 3 4.0
3527 3 5.0 3 3.0
3528 4 1.0 4 0
3529 4 1.0 4 0.5",
3530 );
3531 let expect = DataChunk::from_pretty(
3532 "i f i F
3533 1 3.5 1 5.5",
3534 );
3535 let cond = TestFixture::create_cond();
3536 let mut state = LeftNonEquiJoinState {
3537 probe_column_count: 2,
3538 first_output_row_id: vec![0, 2, 3, 5, 7],
3539 found_matched: false,
3540 ..Default::default()
3541 };
3542 assert!(compare_data_chunk_with_rowsort(
3543 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3544 chunk,
3545 cond.as_ref(),
3546 &mut state
3547 )
3548 .await
3549 .unwrap()
3550 .compact(),
3551 &expect
3552 ));
3553 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3554 assert!(!state.found_matched);
3555
3556 let chunk = DataChunk::from_pretty(
3557 "i f i F
3558 4 1.0 4 0.6
3559 4 1.0 4 2.0
3560 5 4.0 5 .
3561 6 7.0 6 .
3562 6 7.0 6 5.0",
3563 );
3564 let expect = DataChunk::from_pretty(
3565 "i f i F
3566 4 1.0 4 2.0",
3567 );
3568 state.first_output_row_id = vec![2, 3];
3569 assert!(compare_data_chunk_with_rowsort(
3570 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3571 chunk,
3572 cond.as_ref(),
3573 &mut state
3574 )
3575 .await
3576 .unwrap()
3577 .compact(),
3578 &expect
3579 ));
3580 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3581 assert!(!state.found_matched);
3582
3583 let chunk = DataChunk::from_pretty(
3584 "i f i F
3585 4 1.0 4 0.6
3586 4 1.0 4 1.0
3587 5 4.0 5 .
3588 6 7.0 6 .
3589 6 7.0 6 8.0",
3590 );
3591 let expect = DataChunk::from_pretty(
3592 "i f i F
3593 6 7.0 6 8.0",
3594 );
3595 state.first_output_row_id = vec![2, 3];
3596 assert!(compare_data_chunk_with_rowsort(
3597 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3598 chunk,
3599 cond.as_ref(),
3600 &mut state
3601 )
3602 .await
3603 .unwrap()
3604 .compact(),
3605 &expect
3606 ));
3607 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3608 }
3609
3610 #[tokio::test]
3611 async fn test_process_left_anti_join_non_equi_condition() {
3612 let chunk = DataChunk::from_pretty(
3613 "i f i F
3614 1 3.5 1 5.5
3615 1 3.5 1 2.5
3616 2 4.0 . .
3617 3 5.0 3 4.0
3618 3 5.0 3 3.0
3619 3 5.0 3 4.0
3620 3 5.0 3 3.0
3621 4 1.0 4 0
3622 4 1.0 4 0.5",
3623 );
3624 let expect = DataChunk::from_pretty(
3625 "i f i F
3626 2 4.0 . .
3627 3 5.0 3 4.0
3628 3 5.0 3 4.0",
3629 );
3630 let cond = TestFixture::create_cond();
3631 let mut state = LeftNonEquiJoinState {
3632 probe_column_count: 2,
3633 first_output_row_id: vec![0, 2, 3, 5, 7],
3634 has_more_output_rows: true,
3635 found_matched: false,
3636 };
3637 assert!(compare_data_chunk_with_rowsort(
3638 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3639 chunk,
3640 cond.as_ref(),
3641 &mut state
3642 )
3643 .await
3644 .unwrap()
3645 .compact(),
3646 &expect
3647 ));
3648 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3649 assert!(!state.found_matched);
3650
3651 let chunk = DataChunk::from_pretty(
3652 "i f i F
3653 4 1.0 4 0.6
3654 4 1.0 4 2.0
3655 5 4.0 5 .
3656 6 7.0 6 .
3657 6 7.0 6 5.0",
3658 );
3659 let expect = DataChunk::from_pretty(
3660 "i f i F
3661 5 4.0 5 .
3662 6 7.0 6 .",
3663 );
3664 state.first_output_row_id = vec![2, 3];
3665 state.has_more_output_rows = false;
3666 assert!(compare_data_chunk_with_rowsort(
3667 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3668 chunk,
3669 cond.as_ref(),
3670 &mut state
3671 )
3672 .await
3673 .unwrap()
3674 .compact(),
3675 &expect
3676 ));
3677 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3678 assert!(!state.found_matched);
3679
3680 let chunk = DataChunk::from_pretty(
3681 "i f i F
3682 4 1.0 4 0.6
3683 4 1.0 4 1.0
3684 5 4.0 5 .
3685 6 7.0 6 .
3686 6 7.0 6 8.0",
3687 );
3688 let expect = DataChunk::from_pretty(
3689 "i f i F
3690 4 1.0 4 0.6
3691 5 4.0 5 .",
3692 );
3693 state.first_output_row_id = vec![2, 3];
3694 state.has_more_output_rows = false;
3695 assert!(compare_data_chunk_with_rowsort(
3696 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3697 chunk,
3698 cond.as_ref(),
3699 &mut state
3700 )
3701 .await
3702 .unwrap()
3703 .compact(),
3704 &expect
3705 ));
3706 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3707 }
3708
3709 #[tokio::test]
3710 async fn test_process_right_outer_join_non_equi_condition() {
3711 let chunk = DataChunk::from_pretty(
3712 "i f i F
3713 1 3.5 1 5.5
3714 1 3.5 1 2.5
3715 3 5.0 3 4.0
3716 3 5.0 3 3.0
3717 3 5.0 3 4.0
3718 3 5.0 3 3.0
3719 4 1.0 4 0
3720 4 1.0 4 0.5",
3721 );
3722 let expect = DataChunk::from_pretty(
3723 "i f i F
3724 1 3.5 1 5.5",
3725 );
3726 let cond = TestFixture::create_cond();
3727 let build_row_matched = ChunkedData::with_chunk_sizes([14].into_iter()).unwrap();
3745 let mut state = RightNonEquiJoinState {
3746 build_row_ids: vec![
3747 RowId::new(0, 0),
3748 RowId::new(0, 1),
3749 RowId::new(0, 3),
3750 RowId::new(0, 4),
3751 RowId::new(0, 3),
3752 RowId::new(0, 4),
3753 RowId::new(0, 5),
3754 RowId::new(0, 7),
3755 ],
3756 build_row_matched,
3757 };
3758 assert!(compare_data_chunk_with_rowsort(
3759 &HashJoinExecutor::<Key32>::process_right_outer_join_non_equi_condition(
3760 chunk,
3761 cond.as_ref(),
3762 &mut state
3763 )
3764 .await
3765 .unwrap()
3766 .compact(),
3767 &expect
3768 ));
3769 assert_eq!(state.build_row_ids, Vec::new());
3770 assert_eq!(
3771 state.build_row_matched,
3772 ChunkedData::try_from(vec![{
3773 let mut v = vec![false; 14];
3774 v[0] = true;
3775 v
3776 }])
3777 .unwrap()
3778 );
3779
3780 let chunk = DataChunk::from_pretty(
3781 "i f i F
3782 4 1.0 4 0.6
3783 4 1.0 4 2.0
3784 5 4.0 5 .
3785 6 7.0 6 .
3786 6 7.0 6 5.0",
3787 );
3788 let expect = DataChunk::from_pretty(
3789 "i f i F
3790 4 1.0 4 2.0",
3791 );
3792 state.build_row_ids = vec![
3793 RowId::new(0, 8),
3794 RowId::new(0, 9),
3795 RowId::new(0, 10),
3796 RowId::new(0, 12),
3797 RowId::new(0, 13),
3798 ];
3799 assert!(compare_data_chunk_with_rowsort(
3800 &HashJoinExecutor::<Key32>::process_right_outer_join_non_equi_condition(
3801 chunk,
3802 cond.as_ref(),
3803 &mut state
3804 )
3805 .await
3806 .unwrap()
3807 .compact(),
3808 &expect
3809 ));
3810 assert_eq!(state.build_row_ids, Vec::new());
3811 assert_eq!(
3812 state.build_row_matched,
3813 ChunkedData::try_from(vec![{
3814 let mut v = vec![false; 14];
3815 v[0] = true;
3816 v[9] = true;
3817 v
3818 }])
3819 .unwrap()
3820 );
3821 }
3822
3823 #[tokio::test]
3824 async fn test_process_right_semi_anti_join_non_equi_condition() {
3825 let chunk = DataChunk::from_pretty(
3826 "i f i F
3827 1 3.5 1 5.5
3828 1 3.5 1 2.5
3829 3 5.0 3 4.0
3830 3 5.0 3 3.0
3831 3 5.0 3 4.0
3832 3 5.0 3 3.0
3833 4 1.0 4 0
3834 4 1.0 4 0.5",
3835 );
3836 let cond = TestFixture::create_cond();
3837 let build_row_matched = ChunkedData::with_chunk_sizes([14].into_iter()).unwrap();
3838 let mut state = RightNonEquiJoinState {
3839 build_row_ids: vec![
3840 RowId::new(0, 0),
3841 RowId::new(0, 1),
3842 RowId::new(0, 3),
3843 RowId::new(0, 4),
3844 RowId::new(0, 3),
3845 RowId::new(0, 4),
3846 RowId::new(0, 5),
3847 RowId::new(0, 7),
3848 ],
3849 build_row_matched,
3850 };
3851
3852 assert!(
3853 HashJoinExecutor::<Key32>::process_right_semi_anti_join_non_equi_condition(
3854 chunk,
3855 cond.as_ref(),
3856 &mut state
3857 )
3858 .await
3859 .is_ok()
3860 );
3861 assert_eq!(state.build_row_ids, Vec::new());
3862 assert_eq!(
3863 state.build_row_matched,
3864 ChunkedData::try_from(vec![{
3865 let mut v = vec![false; 14];
3866 v[0] = true;
3867 v
3868 }])
3869 .unwrap()
3870 );
3871
3872 let chunk = DataChunk::from_pretty(
3873 "i f i F
3874 4 1.0 4 0.6
3875 4 1.0 4 2.0
3876 5 4.0 5 .
3877 6 7.0 6 .
3878 6 7.0 6 5.0",
3879 );
3880 state.build_row_ids = vec![
3881 RowId::new(0, 8),
3882 RowId::new(0, 9),
3883 RowId::new(0, 10),
3884 RowId::new(0, 12),
3885 RowId::new(0, 13),
3886 ];
3887 assert!(
3888 HashJoinExecutor::<Key32>::process_right_semi_anti_join_non_equi_condition(
3889 chunk,
3890 cond.as_ref(),
3891 &mut state
3892 )
3893 .await
3894 .is_ok()
3895 );
3896 assert_eq!(state.build_row_ids, Vec::new());
3897 assert_eq!(
3898 state.build_row_matched,
3899 ChunkedData::try_from(vec![{
3900 let mut v = vec![false; 14];
3901 v[0] = true;
3902 v[9] = true;
3903 v
3904 }])
3905 .unwrap()
3906 );
3907 }
3908
3909 #[tokio::test]
3910 async fn test_process_full_outer_join_non_equi_condition() {
3911 let chunk = DataChunk::from_pretty(
3912 "i f i F
3913 1 3.5 1 5.5
3914 1 3.5 1 2.5
3915 3 5.0 3 4.0
3916 3 5.0 3 3.0
3917 3 5.0 3 4.0
3918 3 5.0 3 3.0
3919 4 1.0 4 0
3920 4 1.0 4 0.5",
3921 );
3922 let expect = DataChunk::from_pretty(
3923 "i f i F
3924 1 3.5 1 5.5
3925 3 5.0 . .
3926 3 5.0 . .",
3927 );
3928 let cond = TestFixture::create_cond();
3929 let mut left_state = LeftNonEquiJoinState {
3930 probe_column_count: 2,
3931 first_output_row_id: vec![0, 2, 4, 6],
3932 has_more_output_rows: true,
3933 found_matched: false,
3934 };
3935 let mut right_state = RightNonEquiJoinState {
3936 build_row_ids: vec![
3937 RowId::new(0, 0),
3938 RowId::new(0, 1),
3939 RowId::new(0, 3),
3940 RowId::new(0, 4),
3941 RowId::new(0, 3),
3942 RowId::new(0, 4),
3943 RowId::new(0, 5),
3944 RowId::new(0, 7),
3945 ],
3946 build_row_matched: ChunkedData::with_chunk_sizes([14].into_iter()).unwrap(),
3947 };
3948 assert!(compare_data_chunk_with_rowsort(
3949 &HashJoinExecutor::<Key32>::process_full_outer_join_non_equi_condition(
3950 chunk,
3951 cond.as_ref(),
3952 &mut left_state,
3953 &mut right_state,
3954 )
3955 .await
3956 .unwrap()
3957 .compact(),
3958 &expect
3959 ));
3960 assert_eq!(left_state.first_output_row_id, Vec::<usize>::new());
3961 assert!(!left_state.found_matched);
3962 assert_eq!(right_state.build_row_ids, Vec::new());
3963 assert_eq!(
3964 right_state.build_row_matched,
3965 ChunkedData::try_from(vec![{
3966 let mut v = vec![false; 14];
3967 v[0] = true;
3968 v
3969 }])
3970 .unwrap()
3971 );
3972
3973 let chunk = DataChunk::from_pretty(
3974 "i f i F
3975 4 1.0 4 0.6
3976 4 1.0 4 2.0
3977 5 4.0 5 .
3978 6 7.0 6 .
3979 6 7.0 6 8.0",
3980 );
3981 let expect = DataChunk::from_pretty(
3982 "i f i F
3983 4 1.0 4 2.0
3984 5 4.0 . .
3985 6 7.0 6 8.0",
3986 );
3987 left_state.first_output_row_id = vec![2, 3];
3988 left_state.has_more_output_rows = false;
3989 right_state.build_row_ids = vec![
3990 RowId::new(0, 8),
3991 RowId::new(0, 9),
3992 RowId::new(0, 10),
3993 RowId::new(0, 12),
3994 RowId::new(0, 13),
3995 ];
3996 assert!(compare_data_chunk_with_rowsort(
3997 &HashJoinExecutor::<Key32>::process_full_outer_join_non_equi_condition(
3998 chunk,
3999 cond.as_ref(),
4000 &mut left_state,
4001 &mut right_state,
4002 )
4003 .await
4004 .unwrap()
4005 .compact(),
4006 &expect
4007 ));
4008 assert_eq!(left_state.first_output_row_id, Vec::<usize>::new());
4009 assert!(left_state.found_matched);
4010 assert_eq!(right_state.build_row_ids, Vec::new());
4011 assert_eq!(
4012 right_state.build_row_matched,
4013 ChunkedData::try_from(vec![{
4014 let mut v = vec![false; 14];
4015 v[0] = true;
4016 v[9] = true;
4017 v[13] = true;
4018 v
4019 }])
4020 .unwrap()
4021 );
4022 }
4023
4024 #[tokio::test]
4025 async fn test_shutdown() {
4026 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
4027 test_fixture.do_test_shutdown(false).await;
4028 test_fixture.do_test_shutdown(true).await;
4029
4030 let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
4031 test_fixture.do_test_shutdown(false).await;
4032 test_fixture.do_test_shutdown(true).await;
4033
4034 let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
4035 test_fixture.do_test_shutdown(false).await;
4036 test_fixture.do_test_shutdown(true).await;
4037
4038 let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
4039 test_fixture.do_test_shutdown(false).await;
4040 test_fixture.do_test_shutdown(true).await;
4041
4042 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
4043 test_fixture.do_test_shutdown(false).await;
4044 test_fixture.do_test_shutdown(true).await;
4045
4046 let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
4047 test_fixture.do_test_shutdown(false).await;
4048 test_fixture.do_test_shutdown(true).await;
4049
4050 let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
4051 test_fixture.do_test_shutdown(false).await;
4052 test_fixture.do_test_shutdown(true).await;
4053
4054 let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
4055 test_fixture.do_test_shutdown(false).await;
4056 test_fixture.do_test_shutdown(true).await;
4057 }
4058}