1use std::cmp::Ordering;
16use std::iter;
17use std::iter::empty;
18use std::marker::PhantomData;
19use std::sync::Arc;
20
21use bytes::Bytes;
22use futures_async_stream::try_stream;
23use itertools::Itertools;
24use risingwave_common::array::{Array, DataChunk, RowRef};
25use risingwave_common::bitmap::{Bitmap, BitmapBuilder, FilterByBitmap};
26use risingwave_common::catalog::Schema;
27use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher};
28use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc};
29use risingwave_common::row::{Row, RowExt, repeat_n};
30use risingwave_common::types::{DataType, Datum, DefaultOrd};
31use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
32use risingwave_common::util::iter_util::ZipEqFast;
33use risingwave_common_estimate_size::EstimateSize;
34use risingwave_expr::expr::{BoxedExpression, Expression, build_from_prost};
35use risingwave_pb::Message;
36use risingwave_pb::batch_plan::plan_node::NodeBody;
37use risingwave_pb::data::DataChunk as PbDataChunk;
38
39use super::{AsOfDesc, AsOfInequalityType, ChunkedData, JoinType, RowId};
40use crate::error::{BatchError, Result};
41use crate::executor::{
42 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
43 WrapStreamExecutor,
44};
45use crate::monitor::BatchSpillMetrics;
46use crate::risingwave_common::hash::NullBitmap;
47use crate::spill::spill_op::SpillBackend::Disk;
48use crate::spill::spill_op::{
49 DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY, SpillBackend, SpillBuildHasher, SpillOp,
50};
51use crate::task::ShutdownToken;
52
53pub struct HashJoinExecutor<K> {
63 join_type: JoinType,
65 #[expect(dead_code)]
67 original_schema: Schema,
68 schema: Schema,
70 output_indices: Vec<usize>,
72 probe_side_source: BoxedExecutor,
74 build_side_source: BoxedExecutor,
76 probe_key_idxs: Vec<usize>,
78 build_key_idxs: Vec<usize>,
80 cond: Option<Arc<BoxedExpression>>,
82 null_matched: Vec<bool>,
85 identity: String,
86 chunk_size: usize,
87 asof_desc: Option<AsOfDesc>,
89
90 spill_backend: Option<SpillBackend>,
91 spill_metrics: Arc<BatchSpillMetrics>,
92 memory_upper_bound: Option<u64>,
94
95 shutdown_rx: ShutdownToken,
96
97 mem_ctx: MemoryContext,
98 _phantom: PhantomData<K>,
99}
100
101impl<K: HashKey> Executor for HashJoinExecutor<K> {
102 fn schema(&self) -> &Schema {
103 &self.schema
104 }
105
106 fn identity(&self) -> &str {
107 &self.identity
108 }
109
110 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
111 self.do_execute()
112 }
113}
114
115pub type JoinHashMap<K> =
148 hashbrown::HashMap<K, RowId, PrecomputedBuildHasher, MonitoredGlobalAlloc>;
149
150struct RowIdIter<'a> {
151 current_row_id: Option<RowId>,
152 next_row_id: &'a ChunkedData<Option<RowId>>,
153}
154
155impl ChunkedData<Option<RowId>> {
156 fn row_id_iter(&self, begin: Option<RowId>) -> RowIdIter<'_> {
157 RowIdIter {
158 current_row_id: begin,
159 next_row_id: self,
160 }
161 }
162}
163
164impl Iterator for RowIdIter<'_> {
165 type Item = RowId;
166
167 fn next(&mut self) -> Option<Self::Item> {
168 self.current_row_id.inspect(|row_id| {
169 self.current_row_id = self.next_row_id[*row_id];
170 })
171 }
172}
173
174pub struct EquiJoinParams<K> {
175 probe_side: BoxedExecutor,
176 probe_data_types: Vec<DataType>,
177 probe_key_idxs: Vec<usize>,
178 build_side: Vec<DataChunk, MonitoredGlobalAlloc>,
179 build_data_types: Vec<DataType>,
180 full_data_types: Vec<DataType>,
181 hash_map: JoinHashMap<K>,
182 next_build_row_with_same_key: ChunkedData<Option<RowId>>,
183 chunk_size: usize,
184 shutdown_rx: ShutdownToken,
185 asof_desc: Option<AsOfDesc>,
186}
187
188impl<K> EquiJoinParams<K> {
189 #[allow(clippy::too_many_arguments)]
190 pub(super) fn new(
191 probe_side: BoxedExecutor,
192 probe_data_types: Vec<DataType>,
193 probe_key_idxs: Vec<usize>,
194 build_side: Vec<DataChunk, MonitoredGlobalAlloc>,
195 build_data_types: Vec<DataType>,
196 full_data_types: Vec<DataType>,
197 hash_map: JoinHashMap<K>,
198 next_build_row_with_same_key: ChunkedData<Option<RowId>>,
199 chunk_size: usize,
200 shutdown_rx: ShutdownToken,
201 asof_desc: Option<AsOfDesc>,
202 ) -> Self {
203 Self {
204 probe_side,
205 probe_data_types,
206 probe_key_idxs,
207 build_side,
208 build_data_types,
209 full_data_types,
210 hash_map,
211 next_build_row_with_same_key,
212 chunk_size,
213 shutdown_rx,
214 asof_desc,
215 }
216 }
217
218 pub(crate) fn is_asof_join(&self) -> bool {
219 self.asof_desc.is_some()
220 }
221}
222
223#[derive(Default)]
225struct LeftNonEquiJoinState {
226 probe_column_count: usize,
228 first_output_row_id: Vec<usize>,
231 has_more_output_rows: bool,
233 found_matched: bool,
236}
237
238#[derive(Default)]
240struct RightNonEquiJoinState {
241 build_row_ids: Vec<RowId>,
243 build_row_matched: ChunkedData<bool>,
245}
246
247pub struct JoinSpillManager {
248 op: SpillOp,
249 partition_num: usize,
250 probe_side_writers: Vec<opendal::Writer>,
251 build_side_writers: Vec<opendal::Writer>,
252 probe_side_chunk_builders: Vec<DataChunkBuilder>,
253 build_side_chunk_builders: Vec<DataChunkBuilder>,
254 spill_build_hasher: SpillBuildHasher,
255 probe_side_data_types: Vec<DataType>,
256 build_side_data_types: Vec<DataType>,
257 spill_chunk_size: usize,
258 spill_metrics: Arc<BatchSpillMetrics>,
259}
260
261impl JoinSpillManager {
276 pub fn new(
277 spill_backend: SpillBackend,
278 join_identity: &String,
279 partition_num: usize,
280 probe_side_data_types: Vec<DataType>,
281 build_side_data_types: Vec<DataType>,
282 spill_chunk_size: usize,
283 spill_metrics: Arc<BatchSpillMetrics>,
284 ) -> Result<Self> {
285 let suffix_uuid = uuid::Uuid::new_v4();
286 let dir = format!("{}-{}/", join_identity, suffix_uuid);
287 let op = SpillOp::create(dir, spill_backend)?;
288 let probe_side_writers = Vec::with_capacity(partition_num);
289 let build_side_writers = Vec::with_capacity(partition_num);
290 let probe_side_chunk_builders = Vec::with_capacity(partition_num);
291 let build_side_chunk_builders = Vec::with_capacity(partition_num);
292 let spill_build_hasher = SpillBuildHasher(suffix_uuid.as_u64_pair().1);
293 Ok(Self {
294 op,
295 partition_num,
296 probe_side_writers,
297 build_side_writers,
298 probe_side_chunk_builders,
299 build_side_chunk_builders,
300 spill_build_hasher,
301 probe_side_data_types,
302 build_side_data_types,
303 spill_chunk_size,
304 spill_metrics,
305 })
306 }
307
308 pub async fn init_writers(&mut self) -> Result<()> {
309 for i in 0..self.partition_num {
310 let join_probe_side_partition_file_name = format!("join-probe-side-p{}", i);
311 let w = self
312 .op
313 .writer_with(&join_probe_side_partition_file_name)
314 .await?;
315 self.probe_side_writers.push(w);
316
317 let join_build_side_partition_file_name = format!("join-build-side-p{}", i);
318 let w = self
319 .op
320 .writer_with(&join_build_side_partition_file_name)
321 .await?;
322 self.build_side_writers.push(w);
323 self.probe_side_chunk_builders.push(DataChunkBuilder::new(
324 self.probe_side_data_types.clone(),
325 self.spill_chunk_size,
326 ));
327 self.build_side_chunk_builders.push(DataChunkBuilder::new(
328 self.build_side_data_types.clone(),
329 self.spill_chunk_size,
330 ));
331 }
332 Ok(())
333 }
334
335 pub async fn write_probe_side_chunk(
336 &mut self,
337 chunk: DataChunk,
338 hash_codes: Vec<u64>,
339 ) -> Result<()> {
340 let (columns, vis) = chunk.into_parts_v2();
341 for partition in 0..self.partition_num {
342 let new_vis = vis.clone()
343 & Bitmap::from_iter(
344 hash_codes
345 .iter()
346 .map(|hash_code| (*hash_code as usize % self.partition_num) == partition),
347 );
348 let new_chunk = DataChunk::from_parts(columns.clone(), new_vis);
349 for output_chunk in self.probe_side_chunk_builders[partition].append_chunk(new_chunk) {
350 let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
351 let buf = Message::encode_to_vec(&chunk_pb);
352 let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
353 self.spill_metrics
354 .batch_spill_write_bytes
355 .inc_by((buf.len() + len_bytes.len()) as u64);
356 self.probe_side_writers[partition].write(len_bytes).await?;
357 self.probe_side_writers[partition].write(buf).await?;
358 }
359 }
360 Ok(())
361 }
362
363 pub async fn write_build_side_chunk(
364 &mut self,
365 chunk: DataChunk,
366 hash_codes: Vec<u64>,
367 ) -> Result<()> {
368 let (columns, vis) = chunk.into_parts_v2();
369 for partition in 0..self.partition_num {
370 let new_vis = vis.clone()
371 & Bitmap::from_iter(
372 hash_codes
373 .iter()
374 .map(|hash_code| (*hash_code as usize % self.partition_num) == partition),
375 );
376 let new_chunk = DataChunk::from_parts(columns.clone(), new_vis);
377 for output_chunk in self.build_side_chunk_builders[partition].append_chunk(new_chunk) {
378 let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
379 let buf = Message::encode_to_vec(&chunk_pb);
380 let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
381 self.spill_metrics
382 .batch_spill_write_bytes
383 .inc_by((buf.len() + len_bytes.len()) as u64);
384 self.build_side_writers[partition].write(len_bytes).await?;
385 self.build_side_writers[partition].write(buf).await?;
386 }
387 }
388 Ok(())
389 }
390
391 pub async fn close_writers(&mut self) -> Result<()> {
392 for partition in 0..self.partition_num {
393 if let Some(output_chunk) = self.probe_side_chunk_builders[partition].consume_all() {
394 let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
395 let buf = Message::encode_to_vec(&chunk_pb);
396 let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
397 self.spill_metrics
398 .batch_spill_write_bytes
399 .inc_by((buf.len() + len_bytes.len()) as u64);
400 self.probe_side_writers[partition].write(len_bytes).await?;
401 self.probe_side_writers[partition].write(buf).await?;
402 }
403
404 if let Some(output_chunk) = self.build_side_chunk_builders[partition].consume_all() {
405 let chunk_pb: PbDataChunk = output_chunk.to_protobuf();
406 let buf = Message::encode_to_vec(&chunk_pb);
407 let len_bytes = Bytes::copy_from_slice(&(buf.len() as u32).to_le_bytes());
408 self.spill_metrics
409 .batch_spill_write_bytes
410 .inc_by((buf.len() + len_bytes.len()) as u64);
411 self.build_side_writers[partition].write(len_bytes).await?;
412 self.build_side_writers[partition].write(buf).await?;
413 }
414 }
415
416 for mut w in self.probe_side_writers.drain(..) {
417 w.close().await?;
418 }
419 for mut w in self.build_side_writers.drain(..) {
420 w.close().await?;
421 }
422 Ok(())
423 }
424
425 async fn read_probe_side_partition(
426 &mut self,
427 partition: usize,
428 ) -> Result<BoxedDataChunkStream> {
429 let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
430 let r = self
431 .op
432 .reader_with(&join_probe_side_partition_file_name)
433 .await?;
434 Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
435 }
436
437 async fn read_build_side_partition(
438 &mut self,
439 partition: usize,
440 ) -> Result<BoxedDataChunkStream> {
441 let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
442 let r = self
443 .op
444 .reader_with(&join_build_side_partition_file_name)
445 .await?;
446 Ok(SpillOp::read_stream(r, self.spill_metrics.clone()))
447 }
448
449 pub async fn estimate_partition_size(&self, partition: usize) -> Result<u64> {
450 let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
451 let probe_size = self
452 .op
453 .stat(&join_probe_side_partition_file_name)
454 .await?
455 .content_length();
456 let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
457 let build_size = self
458 .op
459 .stat(&join_build_side_partition_file_name)
460 .await?
461 .content_length();
462 Ok(probe_size + build_size)
463 }
464
465 async fn clear_partition(&mut self, partition: usize) -> Result<()> {
466 let join_probe_side_partition_file_name = format!("join-probe-side-p{}", partition);
467 self.op.delete(&join_probe_side_partition_file_name).await?;
468 let join_build_side_partition_file_name = format!("join-build-side-p{}", partition);
469 self.op.delete(&join_build_side_partition_file_name).await?;
470 Ok(())
471 }
472}
473
474impl<K: HashKey> HashJoinExecutor<K> {
475 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
476 async fn do_execute(self: Box<Self>) {
477 let mut need_to_spill = false;
478 let check_memory = match self.memory_upper_bound {
480 Some(upper_bound) => upper_bound > SPILL_AT_LEAST_MEMORY,
481 None => true,
482 };
483
484 let probe_schema = self.probe_side_source.schema().clone();
485 let build_schema = self.build_side_source.schema().clone();
486 let probe_data_types = self.probe_side_source.schema().data_types();
487 let build_data_types = self.build_side_source.schema().data_types();
488 let full_data_types = [probe_data_types.clone(), build_data_types.clone()].concat();
489
490 let mut build_side = Vec::new_in(self.mem_ctx.global_allocator());
491 let mut build_row_count = 0;
492 let mut build_side_stream = self.build_side_source.execute();
493 #[for_await]
494 for build_chunk in &mut build_side_stream {
495 let build_chunk = build_chunk?;
496 if build_chunk.cardinality() > 0 {
497 build_row_count += build_chunk.cardinality();
498 let chunk_estimated_heap_size = build_chunk.estimated_heap_size();
499 build_side.push(build_chunk);
501 if !self.mem_ctx.add(chunk_estimated_heap_size as i64) && check_memory {
502 if self.spill_backend.is_some() {
503 need_to_spill = true;
504 break;
505 } else {
506 Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
507 }
508 }
509 }
510 }
511 let mut hash_map = JoinHashMap::with_capacity_and_hasher_in(
512 build_row_count,
513 PrecomputedBuildHasher,
514 self.mem_ctx.global_allocator(),
515 );
516 let mut next_build_row_with_same_key =
517 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
518
519 let null_matched = K::Bitmap::from_bool_vec(self.null_matched.clone());
520
521 let mut mem_added_by_hash_table = 0;
522 if !need_to_spill {
523 for (build_chunk_id, build_chunk) in build_side.iter().enumerate() {
525 let build_keys = K::build_many(&self.build_key_idxs, build_chunk);
526
527 for (build_row_id, build_key) in build_keys
528 .into_iter()
529 .enumerate()
530 .filter_by_bitmap(build_chunk.visibility())
531 {
532 self.shutdown_rx.check()?;
533 if build_key.null_bitmap().is_subset(&null_matched) {
535 let row_id = RowId::new(build_chunk_id, build_row_id);
536 let build_key_size = build_key.estimated_heap_size() as i64;
537 mem_added_by_hash_table += build_key_size;
538 if !self.mem_ctx.add(build_key_size) && check_memory {
539 if self.spill_backend.is_some() {
540 need_to_spill = true;
541 break;
542 } else {
543 Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
544 }
545 }
546 next_build_row_with_same_key[row_id] = hash_map.insert(build_key, row_id);
547 }
548 }
549 }
550 }
551
552 if need_to_spill {
553 info!(
560 "batch hash join executor {} starts to spill out",
561 &self.identity
562 );
563 let mut join_spill_manager = JoinSpillManager::new(
564 self.spill_backend.clone().unwrap(),
565 &self.identity,
566 DEFAULT_SPILL_PARTITION_NUM,
567 probe_data_types.clone(),
568 build_data_types.clone(),
569 self.chunk_size,
570 self.spill_metrics.clone(),
571 )?;
572 join_spill_manager.init_writers().await?;
573
574 self.mem_ctx.add(-mem_added_by_hash_table);
576 drop(hash_map);
577 drop(next_build_row_with_same_key);
578
579 for chunk in build_side {
581 self.mem_ctx.add(-(chunk.estimated_heap_size() as i64));
583 let hash_codes = chunk.get_hash_values(
584 self.build_key_idxs.as_slice(),
585 join_spill_manager.spill_build_hasher,
586 );
587 join_spill_manager
588 .write_build_side_chunk(
589 chunk,
590 hash_codes
591 .into_iter()
592 .map(|hash_code| hash_code.value())
593 .collect(),
594 )
595 .await?;
596 }
597
598 #[for_await]
600 for chunk in build_side_stream {
601 let chunk = chunk?;
602 let hash_codes = chunk.get_hash_values(
603 self.build_key_idxs.as_slice(),
604 join_spill_manager.spill_build_hasher,
605 );
606 join_spill_manager
607 .write_build_side_chunk(
608 chunk,
609 hash_codes
610 .into_iter()
611 .map(|hash_code| hash_code.value())
612 .collect(),
613 )
614 .await?;
615 }
616
617 #[for_await]
619 for chunk in self.probe_side_source.execute() {
620 let chunk = chunk?;
621 let hash_codes = chunk.get_hash_values(
622 self.probe_key_idxs.as_slice(),
623 join_spill_manager.spill_build_hasher,
624 );
625 join_spill_manager
626 .write_probe_side_chunk(
627 chunk,
628 hash_codes
629 .into_iter()
630 .map(|hash_code| hash_code.value())
631 .collect(),
632 )
633 .await?;
634 }
635
636 join_spill_manager.close_writers().await?;
637
638 for i in 0..join_spill_manager.partition_num {
640 let partition_size = join_spill_manager.estimate_partition_size(i).await?;
641 let probe_side_stream = join_spill_manager.read_probe_side_partition(i).await?;
642 let build_side_stream = join_spill_manager.read_build_side_partition(i).await?;
643
644 let sub_hash_join_executor: HashJoinExecutor<K> = HashJoinExecutor::new_inner(
645 self.join_type,
646 self.output_indices.clone(),
647 Box::new(WrapStreamExecutor::new(
648 probe_schema.clone(),
649 probe_side_stream,
650 )),
651 Box::new(WrapStreamExecutor::new(
652 build_schema.clone(),
653 build_side_stream,
654 )),
655 self.probe_key_idxs.clone(),
656 self.build_key_idxs.clone(),
657 self.null_matched.clone(),
658 self.cond.clone(),
659 format!("{}-sub{}", self.identity.clone(), i),
660 self.chunk_size,
661 self.asof_desc.clone(),
662 self.spill_backend.clone(),
663 self.spill_metrics.clone(),
664 Some(partition_size),
665 self.shutdown_rx.clone(),
666 self.mem_ctx.clone(),
667 );
668
669 debug!(
670 "create sub_hash_join {} for hash_join {} to spill",
671 sub_hash_join_executor.identity, self.identity
672 );
673
674 let sub_hash_join_executor = Box::new(sub_hash_join_executor).execute();
675
676 #[for_await]
677 for chunk in sub_hash_join_executor {
678 let chunk = chunk?;
679 yield chunk;
680 }
681
682 join_spill_manager.clear_partition(i).await?;
684 }
685 } else {
686 let params = EquiJoinParams::new(
687 self.probe_side_source,
688 probe_data_types,
689 self.probe_key_idxs,
690 build_side,
691 build_data_types,
692 full_data_types,
693 hash_map,
694 next_build_row_with_same_key,
695 self.chunk_size,
696 self.shutdown_rx.clone(),
697 self.asof_desc,
698 );
699
700 if let Some(cond) = self.cond.as_ref()
701 && !params.is_asof_join()
702 {
703 let stream = match self.join_type {
704 JoinType::Inner => Self::do_inner_join_with_non_equi_condition(params, cond),
705 JoinType::LeftOuter => {
706 Self::do_left_outer_join_with_non_equi_condition(params, cond)
707 }
708 JoinType::LeftSemi => {
709 Self::do_left_semi_join_with_non_equi_condition(params, cond)
710 }
711 JoinType::LeftAnti => {
712 Self::do_left_anti_join_with_non_equi_condition(params, cond)
713 }
714 JoinType::RightOuter => {
715 Self::do_right_outer_join_with_non_equi_condition(params, cond)
716 }
717 JoinType::RightSemi => {
718 Self::do_right_semi_anti_join_with_non_equi_condition::<false>(params, cond)
719 }
720 JoinType::RightAnti => {
721 Self::do_right_semi_anti_join_with_non_equi_condition::<true>(params, cond)
722 }
723 JoinType::FullOuter => {
724 Self::do_full_outer_join_with_non_equi_condition(params, cond)
725 }
726 JoinType::AsOfInner | JoinType::AsOfLeftOuter => {
727 unreachable!("AsOf join should not reach here")
728 }
729 };
730 let mut output_chunk_builder =
732 DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
733 #[for_await]
734 for chunk in stream {
735 for output_chunk in
736 output_chunk_builder.append_chunk(chunk?.project(&self.output_indices))
737 {
738 yield output_chunk
739 }
740 }
741 if let Some(output_chunk) = output_chunk_builder.consume_all() {
742 yield output_chunk
743 }
744 } else {
745 let stream = match self.join_type {
746 JoinType::Inner | JoinType::AsOfInner => Self::do_inner_join(params),
747 JoinType::LeftOuter | JoinType::AsOfLeftOuter => {
748 Self::do_left_outer_join(params)
749 }
750 JoinType::LeftSemi => Self::do_left_semi_anti_join::<false>(params),
751 JoinType::LeftAnti => Self::do_left_semi_anti_join::<true>(params),
752 JoinType::RightOuter => Self::do_right_outer_join(params),
753 JoinType::RightSemi => Self::do_right_semi_anti_join::<false>(params),
754 JoinType::RightAnti => Self::do_right_semi_anti_join::<true>(params),
755 JoinType::FullOuter => Self::do_full_outer_join(params),
756 };
757 #[for_await]
758 for chunk in stream {
759 yield chunk?.project(&self.output_indices)
760 }
761 }
762 }
763 }
764
765 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
766 pub async fn do_inner_join(
767 EquiJoinParams {
768 probe_side,
769 probe_key_idxs,
770 build_side,
771 full_data_types,
772 hash_map,
773 next_build_row_with_same_key,
774 chunk_size,
775 shutdown_rx,
776 asof_desc,
777 ..
778 }: EquiJoinParams<K>,
779 ) {
780 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
781 #[for_await]
782 for probe_chunk in probe_side.execute() {
783 let probe_chunk = probe_chunk?;
784 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
785 for (probe_row_id, probe_key) in probe_keys
786 .iter()
787 .enumerate()
788 .filter_by_bitmap(probe_chunk.visibility())
789 {
790 let build_side_row_iter =
791 next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied());
792 if let Some(asof_desc) = &asof_desc {
793 if let Some(build_row_id) = Self::find_asof_matched_rows(
794 probe_chunk.row_at_unchecked_vis(probe_row_id),
795 &build_side,
796 build_side_row_iter,
797 asof_desc,
798 ) {
799 shutdown_rx.check()?;
800 if let Some(spilled) = Self::append_one_row(
801 &mut chunk_builder,
802 &probe_chunk,
803 probe_row_id,
804 &build_side[build_row_id.chunk_id()],
805 build_row_id.row_id(),
806 ) {
807 yield spilled
808 }
809 }
810 } else {
811 for build_row_id in build_side_row_iter {
812 shutdown_rx.check()?;
813 let build_chunk = &build_side[build_row_id.chunk_id()];
814 if let Some(spilled) = Self::append_one_row(
815 &mut chunk_builder,
816 &probe_chunk,
817 probe_row_id,
818 build_chunk,
819 build_row_id.row_id(),
820 ) {
821 yield spilled
822 }
823 }
824 }
825 }
826 }
827 if let Some(spilled) = chunk_builder.consume_all() {
828 yield spilled
829 }
830 }
831
832 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
833 pub async fn do_inner_join_with_non_equi_condition(
834 params: EquiJoinParams<K>,
835 cond: &BoxedExpression,
836 ) {
837 #[for_await]
838 for chunk in Self::do_inner_join(params) {
839 let mut chunk = chunk?;
840 chunk.set_visibility(cond.eval(&chunk).await?.as_bool().iter().collect());
841 yield chunk
842 }
843 }
844
845 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
846 pub async fn do_left_outer_join(
847 EquiJoinParams {
848 probe_side,
849 probe_key_idxs,
850 build_side,
851 build_data_types,
852 full_data_types,
853 hash_map,
854 next_build_row_with_same_key,
855 chunk_size,
856 shutdown_rx,
857 asof_desc,
858 ..
859 }: EquiJoinParams<K>,
860 ) {
861 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
862 #[for_await]
863 for probe_chunk in probe_side.execute() {
864 let probe_chunk = probe_chunk?;
865 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
866 for (probe_row_id, probe_key) in probe_keys
867 .iter()
868 .enumerate()
869 .filter_by_bitmap(probe_chunk.visibility())
870 {
871 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
872 let build_side_row_iter =
873 next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id));
874 if let Some(asof_desc) = &asof_desc {
875 if let Some(build_row_id) = Self::find_asof_matched_rows(
876 probe_chunk.row_at_unchecked_vis(probe_row_id),
877 &build_side,
878 build_side_row_iter,
879 asof_desc,
880 ) {
881 shutdown_rx.check()?;
882 if let Some(spilled) = Self::append_one_row(
883 &mut chunk_builder,
884 &probe_chunk,
885 probe_row_id,
886 &build_side[build_row_id.chunk_id()],
887 build_row_id.row_id(),
888 ) {
889 yield spilled
890 }
891 } else {
892 shutdown_rx.check()?;
893 let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
894 if let Some(spilled) = Self::append_one_row_with_null_build_side(
895 &mut chunk_builder,
896 probe_row,
897 build_data_types.len(),
898 ) {
899 yield spilled
900 }
901 }
902 } else {
903 for build_row_id in build_side_row_iter {
904 shutdown_rx.check()?;
905 let build_chunk = &build_side[build_row_id.chunk_id()];
906 if let Some(spilled) = Self::append_one_row(
907 &mut chunk_builder,
908 &probe_chunk,
909 probe_row_id,
910 build_chunk,
911 build_row_id.row_id(),
912 ) {
913 yield spilled
914 }
915 }
916 }
917 } else {
918 shutdown_rx.check()?;
919 let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
920 if let Some(spilled) = Self::append_one_row_with_null_build_side(
921 &mut chunk_builder,
922 probe_row,
923 build_data_types.len(),
924 ) {
925 yield spilled
926 }
927 }
928 }
929 }
930 if let Some(spilled) = chunk_builder.consume_all() {
931 yield spilled
932 }
933 }
934
935 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
936 pub async fn do_left_outer_join_with_non_equi_condition(
937 EquiJoinParams {
938 probe_side,
939 probe_data_types,
940 probe_key_idxs,
941 build_side,
942 build_data_types,
943 full_data_types,
944 hash_map,
945 next_build_row_with_same_key,
946 chunk_size,
947 shutdown_rx,
948 ..
949 }: EquiJoinParams<K>,
950 cond: &BoxedExpression,
951 ) {
952 let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size);
953 let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
954 let mut non_equi_state = LeftNonEquiJoinState {
955 probe_column_count: probe_data_types.len(),
956 ..Default::default()
957 };
958
959 #[for_await]
960 for probe_chunk in probe_side.execute() {
961 let probe_chunk = probe_chunk?;
962 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
963 for (probe_row_id, probe_key) in probe_keys
964 .iter()
965 .enumerate()
966 .filter_by_bitmap(probe_chunk.visibility())
967 {
968 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
969 non_equi_state
970 .first_output_row_id
971 .push(chunk_builder.buffered_count());
972
973 let mut build_row_id_iter = next_build_row_with_same_key
974 .row_id_iter(Some(*first_matched_build_row_id))
975 .peekable();
976 while let Some(build_row_id) = build_row_id_iter.next() {
977 shutdown_rx.check()?;
978 let build_chunk = &build_side[build_row_id.chunk_id()];
979 if let Some(spilled) = Self::append_one_row(
980 &mut chunk_builder,
981 &probe_chunk,
982 probe_row_id,
983 build_chunk,
984 build_row_id.row_id(),
985 ) {
986 non_equi_state.has_more_output_rows =
987 build_row_id_iter.peek().is_some();
988 yield Self::process_left_outer_join_non_equi_condition(
989 spilled,
990 cond.as_ref(),
991 &mut non_equi_state,
992 )
993 .await?
994 }
995 }
996 } else {
997 shutdown_rx.check()?;
998 let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
999 if let Some(spilled) = Self::append_one_row_with_null_build_side(
1000 &mut remaining_chunk_builder,
1001 probe_row,
1002 build_data_types.len(),
1003 ) {
1004 yield spilled
1005 }
1006 }
1007 }
1008 }
1009 non_equi_state.has_more_output_rows = false;
1010 if let Some(spilled) = chunk_builder.consume_all() {
1011 yield Self::process_left_outer_join_non_equi_condition(
1012 spilled,
1013 cond.as_ref(),
1014 &mut non_equi_state,
1015 )
1016 .await?
1017 }
1018
1019 if let Some(spilled) = remaining_chunk_builder.consume_all() {
1020 yield spilled
1021 }
1022 }
1023
1024 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1025 pub async fn do_left_semi_anti_join<const ANTI_JOIN: bool>(
1026 EquiJoinParams {
1027 probe_side,
1028 probe_data_types,
1029 probe_key_idxs,
1030 hash_map,
1031 chunk_size,
1032 shutdown_rx,
1033 ..
1034 }: EquiJoinParams<K>,
1035 ) {
1036 let mut chunk_builder = DataChunkBuilder::new(probe_data_types, chunk_size);
1037 #[for_await]
1038 for probe_chunk in probe_side.execute() {
1039 let probe_chunk = probe_chunk?;
1040 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1041 for (probe_row_id, probe_key) in probe_keys
1042 .iter()
1043 .enumerate()
1044 .filter_by_bitmap(probe_chunk.visibility())
1045 {
1046 shutdown_rx.check()?;
1047 if !ANTI_JOIN {
1048 if hash_map.contains_key(probe_key)
1049 && let Some(spilled) = Self::append_one_probe_row(
1050 &mut chunk_builder,
1051 &probe_chunk,
1052 probe_row_id,
1053 )
1054 {
1055 yield spilled
1056 }
1057 } else if hash_map.get(probe_key).is_none()
1058 && let Some(spilled) =
1059 Self::append_one_probe_row(&mut chunk_builder, &probe_chunk, probe_row_id)
1060 {
1061 yield spilled
1062 }
1063 }
1064 }
1065 if let Some(spilled) = chunk_builder.consume_all() {
1066 yield spilled
1067 }
1068 }
1069
1070 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1080 pub async fn do_left_semi_join_with_non_equi_condition<'a>(
1081 EquiJoinParams {
1082 probe_side,
1083 probe_key_idxs,
1084 build_side,
1085 full_data_types,
1086 hash_map,
1087 next_build_row_with_same_key,
1088 chunk_size,
1089 shutdown_rx,
1090 ..
1091 }: EquiJoinParams<K>,
1092 cond: &'a BoxedExpression,
1093 ) {
1094 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1095 let mut non_equi_state = LeftNonEquiJoinState::default();
1096
1097 #[for_await]
1098 for probe_chunk in probe_side.execute() {
1099 let probe_chunk = probe_chunk?;
1100 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1101 for (probe_row_id, probe_key) in probe_keys
1102 .iter()
1103 .enumerate()
1104 .filter_by_bitmap(probe_chunk.visibility())
1105 {
1106 non_equi_state.found_matched = false;
1107 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1108 non_equi_state
1109 .first_output_row_id
1110 .push(chunk_builder.buffered_count());
1111
1112 for build_row_id in
1113 next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id))
1114 {
1115 shutdown_rx.check()?;
1116 if non_equi_state.found_matched {
1117 break;
1118 }
1119 let build_chunk = &build_side[build_row_id.chunk_id()];
1120 if let Some(spilled) = Self::append_one_row(
1121 &mut chunk_builder,
1122 &probe_chunk,
1123 probe_row_id,
1124 build_chunk,
1125 build_row_id.row_id(),
1126 ) {
1127 yield Self::process_left_semi_anti_join_non_equi_condition::<false>(
1128 spilled,
1129 cond.as_ref(),
1130 &mut non_equi_state,
1131 )
1132 .await?
1133 }
1134 }
1135 }
1136 }
1137 }
1138
1139 if let Some(spilled) = chunk_builder.consume_all() {
1141 yield Self::process_left_semi_anti_join_non_equi_condition::<false>(
1142 spilled,
1143 cond.as_ref(),
1144 &mut non_equi_state,
1145 )
1146 .await?
1147 }
1148 }
1149
1150 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1151 pub async fn do_left_anti_join_with_non_equi_condition(
1152 EquiJoinParams {
1153 probe_side,
1154 probe_data_types,
1155 probe_key_idxs,
1156 build_side,
1157 full_data_types,
1158 hash_map,
1159 next_build_row_with_same_key,
1160 chunk_size,
1161 shutdown_rx,
1162 ..
1163 }: EquiJoinParams<K>,
1164 cond: &BoxedExpression,
1165 ) {
1166 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1167 let mut remaining_chunk_builder = DataChunkBuilder::new(probe_data_types, chunk_size);
1168 let mut non_equi_state = LeftNonEquiJoinState::default();
1169
1170 #[for_await]
1171 for probe_chunk in probe_side.execute() {
1172 let probe_chunk = probe_chunk?;
1173 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1174 for (probe_row_id, probe_key) in probe_keys
1175 .iter()
1176 .enumerate()
1177 .filter_by_bitmap(probe_chunk.visibility())
1178 {
1179 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1180 non_equi_state
1181 .first_output_row_id
1182 .push(chunk_builder.buffered_count());
1183 let mut build_row_id_iter = next_build_row_with_same_key
1184 .row_id_iter(Some(*first_matched_build_row_id))
1185 .peekable();
1186 while let Some(build_row_id) = build_row_id_iter.next() {
1187 shutdown_rx.check()?;
1188 let build_chunk = &build_side[build_row_id.chunk_id()];
1189 if let Some(spilled) = Self::append_one_row(
1190 &mut chunk_builder,
1191 &probe_chunk,
1192 probe_row_id,
1193 build_chunk,
1194 build_row_id.row_id(),
1195 ) {
1196 non_equi_state.has_more_output_rows =
1197 build_row_id_iter.peek().is_some();
1198 yield Self::process_left_semi_anti_join_non_equi_condition::<true>(
1199 spilled,
1200 cond.as_ref(),
1201 &mut non_equi_state,
1202 )
1203 .await?
1204 }
1205 }
1206 } else if let Some(spilled) = Self::append_one_probe_row(
1207 &mut remaining_chunk_builder,
1208 &probe_chunk,
1209 probe_row_id,
1210 ) {
1211 yield spilled
1212 }
1213 }
1214 }
1215 non_equi_state.has_more_output_rows = false;
1216 if let Some(spilled) = chunk_builder.consume_all() {
1217 yield Self::process_left_semi_anti_join_non_equi_condition::<true>(
1218 spilled,
1219 cond.as_ref(),
1220 &mut non_equi_state,
1221 )
1222 .await?
1223 }
1224 if let Some(spilled) = remaining_chunk_builder.consume_all() {
1225 yield spilled
1226 }
1227 }
1228
1229 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1230 pub async fn do_right_outer_join(
1231 EquiJoinParams {
1232 probe_side,
1233 probe_data_types,
1234 probe_key_idxs,
1235 build_side,
1236 full_data_types,
1237 hash_map,
1238 next_build_row_with_same_key,
1239 chunk_size,
1240 shutdown_rx,
1241 ..
1242 }: EquiJoinParams<K>,
1243 ) {
1244 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1245 let mut build_row_matched =
1246 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1247
1248 #[for_await]
1249 for probe_chunk in probe_side.execute() {
1250 let probe_chunk = probe_chunk?;
1251 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1252 for (probe_row_id, probe_key) in probe_keys
1253 .iter()
1254 .enumerate()
1255 .filter_by_bitmap(probe_chunk.visibility())
1256 {
1257 for build_row_id in
1258 next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1259 {
1260 shutdown_rx.check()?;
1261 build_row_matched[build_row_id] = true;
1262 let build_chunk = &build_side[build_row_id.chunk_id()];
1263 if let Some(spilled) = Self::append_one_row(
1264 &mut chunk_builder,
1265 &probe_chunk,
1266 probe_row_id,
1267 build_chunk,
1268 build_row_id.row_id(),
1269 ) {
1270 yield spilled
1271 }
1272 }
1273 }
1274 }
1275 #[for_await]
1276 for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1277 &mut chunk_builder,
1278 &build_side,
1279 &build_row_matched,
1280 probe_data_types.len(),
1281 ) {
1282 yield spilled?
1283 }
1284 }
1285
1286 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1287 pub async fn do_right_outer_join_with_non_equi_condition(
1288 EquiJoinParams {
1289 probe_side,
1290 probe_data_types,
1291 probe_key_idxs,
1292 build_side,
1293 full_data_types,
1294 hash_map,
1295 next_build_row_with_same_key,
1296 chunk_size,
1297 shutdown_rx,
1298 ..
1299 }: EquiJoinParams<K>,
1300 cond: &BoxedExpression,
1301 ) {
1302 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1303 let build_row_matched =
1304 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1305 let mut non_equi_state = RightNonEquiJoinState {
1306 build_row_matched,
1307 ..Default::default()
1308 };
1309
1310 #[for_await]
1311 for probe_chunk in probe_side.execute() {
1312 let probe_chunk = probe_chunk?;
1313 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1314 for (probe_row_id, probe_key) in probe_keys
1315 .iter()
1316 .enumerate()
1317 .filter_by_bitmap(probe_chunk.visibility())
1318 {
1319 for build_row_id in
1320 next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1321 {
1322 shutdown_rx.check()?;
1323 non_equi_state.build_row_ids.push(build_row_id);
1324 let build_chunk = &build_side[build_row_id.chunk_id()];
1325 if let Some(spilled) = Self::append_one_row(
1326 &mut chunk_builder,
1327 &probe_chunk,
1328 probe_row_id,
1329 build_chunk,
1330 build_row_id.row_id(),
1331 ) {
1332 yield Self::process_right_outer_join_non_equi_condition(
1333 spilled,
1334 cond.as_ref(),
1335 &mut non_equi_state,
1336 )
1337 .await?
1338 }
1339 }
1340 }
1341 }
1342 if let Some(spilled) = chunk_builder.consume_all() {
1343 yield Self::process_right_outer_join_non_equi_condition(
1344 spilled,
1345 cond.as_ref(),
1346 &mut non_equi_state,
1347 )
1348 .await?
1349 }
1350 #[for_await]
1351 for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1352 &mut chunk_builder,
1353 &build_side,
1354 &non_equi_state.build_row_matched,
1355 probe_data_types.len(),
1356 ) {
1357 yield spilled?
1358 }
1359 }
1360
1361 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1362 pub async fn do_right_semi_anti_join<const ANTI_JOIN: bool>(
1363 EquiJoinParams {
1364 probe_side,
1365 probe_key_idxs,
1366 build_side,
1367 build_data_types,
1368 hash_map,
1369 next_build_row_with_same_key,
1370 chunk_size,
1371 shutdown_rx,
1372 ..
1373 }: EquiJoinParams<K>,
1374 ) {
1375 let mut chunk_builder = DataChunkBuilder::new(build_data_types, chunk_size);
1376 let mut build_row_matched =
1377 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1378
1379 #[for_await]
1380 for probe_chunk in probe_side.execute() {
1381 let probe_chunk = probe_chunk?;
1382 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1383 for probe_key in probe_keys.iter().filter_by_bitmap(probe_chunk.visibility()) {
1384 for build_row_id in
1385 next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1386 {
1387 shutdown_rx.check()?;
1388 build_row_matched[build_row_id] = true;
1389 }
1390 }
1391 }
1392 #[for_await]
1393 for spilled in Self::handle_remaining_build_rows_for_right_semi_anti_join::<ANTI_JOIN>(
1394 &mut chunk_builder,
1395 &build_side,
1396 &build_row_matched,
1397 ) {
1398 yield spilled?
1399 }
1400 }
1401
1402 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1403 pub async fn do_right_semi_anti_join_with_non_equi_condition<const ANTI_JOIN: bool>(
1404 EquiJoinParams {
1405 probe_side,
1406 probe_key_idxs,
1407 build_side,
1408 build_data_types,
1409 full_data_types,
1410 hash_map,
1411 next_build_row_with_same_key,
1412 chunk_size,
1413 shutdown_rx,
1414 ..
1415 }: EquiJoinParams<K>,
1416 cond: &BoxedExpression,
1417 ) {
1418 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1419 let mut remaining_chunk_builder = DataChunkBuilder::new(build_data_types, chunk_size);
1420 let build_row_matched =
1421 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1422 let mut non_equi_state = RightNonEquiJoinState {
1423 build_row_matched,
1424 ..Default::default()
1425 };
1426
1427 #[for_await]
1428 for probe_chunk in probe_side.execute() {
1429 let probe_chunk = probe_chunk?;
1430 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1431 for (probe_row_id, probe_key) in probe_keys
1432 .iter()
1433 .enumerate()
1434 .filter_by_bitmap(probe_chunk.visibility())
1435 {
1436 for build_row_id in
1437 next_build_row_with_same_key.row_id_iter(hash_map.get(probe_key).copied())
1438 {
1439 shutdown_rx.check()?;
1440 non_equi_state.build_row_ids.push(build_row_id);
1441 let build_chunk = &build_side[build_row_id.chunk_id()];
1442 if let Some(spilled) = Self::append_one_row(
1443 &mut chunk_builder,
1444 &probe_chunk,
1445 probe_row_id,
1446 build_chunk,
1447 build_row_id.row_id(),
1448 ) {
1449 Self::process_right_semi_anti_join_non_equi_condition(
1450 spilled,
1451 cond.as_ref(),
1452 &mut non_equi_state,
1453 )
1454 .await?
1455 }
1456 }
1457 }
1458 }
1459 if let Some(spilled) = chunk_builder.consume_all() {
1460 Self::process_right_semi_anti_join_non_equi_condition(
1461 spilled,
1462 cond.as_ref(),
1463 &mut non_equi_state,
1464 )
1465 .await?
1466 }
1467 #[for_await]
1468 for spilled in Self::handle_remaining_build_rows_for_right_semi_anti_join::<ANTI_JOIN>(
1469 &mut remaining_chunk_builder,
1470 &build_side,
1471 &non_equi_state.build_row_matched,
1472 ) {
1473 yield spilled?
1474 }
1475 }
1476
1477 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1478 pub async fn do_full_outer_join(
1479 EquiJoinParams {
1480 probe_side,
1481 probe_data_types,
1482 probe_key_idxs,
1483 build_side,
1484 build_data_types,
1485 full_data_types,
1486 hash_map,
1487 next_build_row_with_same_key,
1488 chunk_size,
1489 shutdown_rx,
1490 ..
1491 }: EquiJoinParams<K>,
1492 ) {
1493 let mut chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1494 let mut build_row_matched =
1495 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1496
1497 #[for_await]
1498 for probe_chunk in probe_side.execute() {
1499 let probe_chunk = probe_chunk?;
1500 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1501 for (probe_row_id, probe_key) in probe_keys
1502 .iter()
1503 .enumerate()
1504 .filter_by_bitmap(probe_chunk.visibility())
1505 {
1506 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1507 for build_row_id in
1508 next_build_row_with_same_key.row_id_iter(Some(*first_matched_build_row_id))
1509 {
1510 shutdown_rx.check()?;
1511 build_row_matched[build_row_id] = true;
1512 let build_chunk = &build_side[build_row_id.chunk_id()];
1513 if let Some(spilled) = Self::append_one_row(
1514 &mut chunk_builder,
1515 &probe_chunk,
1516 probe_row_id,
1517 build_chunk,
1518 build_row_id.row_id(),
1519 ) {
1520 yield spilled
1521 }
1522 }
1523 } else {
1524 let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
1525 if let Some(spilled) = Self::append_one_row_with_null_build_side(
1526 &mut chunk_builder,
1527 probe_row,
1528 build_data_types.len(),
1529 ) {
1530 yield spilled
1531 }
1532 }
1533 }
1534 }
1535 #[for_await]
1536 for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1537 &mut chunk_builder,
1538 &build_side,
1539 &build_row_matched,
1540 probe_data_types.len(),
1541 ) {
1542 yield spilled?
1543 }
1544 }
1545
1546 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
1547 pub async fn do_full_outer_join_with_non_equi_condition(
1548 EquiJoinParams {
1549 probe_side,
1550 probe_data_types,
1551 probe_key_idxs,
1552 build_side,
1553 build_data_types,
1554 full_data_types,
1555 hash_map,
1556 next_build_row_with_same_key,
1557 chunk_size,
1558 shutdown_rx,
1559 ..
1560 }: EquiJoinParams<K>,
1561 cond: &BoxedExpression,
1562 ) {
1563 let mut chunk_builder = DataChunkBuilder::new(full_data_types.clone(), chunk_size);
1564 let mut remaining_chunk_builder = DataChunkBuilder::new(full_data_types, chunk_size);
1565 let mut left_non_equi_state = LeftNonEquiJoinState {
1566 probe_column_count: probe_data_types.len(),
1567 ..Default::default()
1568 };
1569 let build_row_matched =
1570 ChunkedData::with_chunk_sizes(build_side.iter().map(|c| c.capacity()))?;
1571 let mut right_non_equi_state = RightNonEquiJoinState {
1572 build_row_matched,
1573 ..Default::default()
1574 };
1575
1576 #[for_await]
1577 for probe_chunk in probe_side.execute() {
1578 let probe_chunk = probe_chunk?;
1579 let probe_keys = K::build_many(&probe_key_idxs, &probe_chunk);
1580 for (probe_row_id, probe_key) in probe_keys
1581 .iter()
1582 .enumerate()
1583 .filter_by_bitmap(probe_chunk.visibility())
1584 {
1585 left_non_equi_state.found_matched = false;
1586 if let Some(first_matched_build_row_id) = hash_map.get(probe_key) {
1587 left_non_equi_state
1588 .first_output_row_id
1589 .push(chunk_builder.buffered_count());
1590 let mut build_row_id_iter = next_build_row_with_same_key
1591 .row_id_iter(Some(*first_matched_build_row_id))
1592 .peekable();
1593 while let Some(build_row_id) = build_row_id_iter.next() {
1594 shutdown_rx.check()?;
1595 right_non_equi_state.build_row_ids.push(build_row_id);
1596 let build_chunk = &build_side[build_row_id.chunk_id()];
1597 if let Some(spilled) = Self::append_one_row(
1598 &mut chunk_builder,
1599 &probe_chunk,
1600 probe_row_id,
1601 build_chunk,
1602 build_row_id.row_id(),
1603 ) {
1604 left_non_equi_state.has_more_output_rows =
1605 build_row_id_iter.peek().is_some();
1606 yield Self::process_full_outer_join_non_equi_condition(
1607 spilled,
1608 cond.as_ref(),
1609 &mut left_non_equi_state,
1610 &mut right_non_equi_state,
1611 )
1612 .await?
1613 }
1614 }
1615 } else {
1616 shutdown_rx.check()?;
1617 let probe_row = probe_chunk.row_at_unchecked_vis(probe_row_id);
1618 if let Some(spilled) = Self::append_one_row_with_null_build_side(
1619 &mut remaining_chunk_builder,
1620 probe_row,
1621 build_data_types.len(),
1622 ) {
1623 yield spilled
1624 }
1625 }
1626 }
1627 }
1628 left_non_equi_state.has_more_output_rows = false;
1629 if let Some(spilled) = chunk_builder.consume_all() {
1630 yield Self::process_full_outer_join_non_equi_condition(
1631 spilled,
1632 cond.as_ref(),
1633 &mut left_non_equi_state,
1634 &mut right_non_equi_state,
1635 )
1636 .await?
1637 }
1638 #[for_await]
1639 for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
1640 &mut remaining_chunk_builder,
1641 &build_side,
1642 &right_non_equi_state.build_row_matched,
1643 probe_data_types.len(),
1644 ) {
1645 yield spilled?
1646 }
1647 }
1648
1649 async fn process_left_outer_join_non_equi_condition(
1786 chunk: DataChunk,
1787 cond: &dyn Expression,
1788 LeftNonEquiJoinState {
1789 probe_column_count,
1790 first_output_row_id,
1791 has_more_output_rows,
1792 found_matched,
1793 }: &mut LeftNonEquiJoinState,
1794 ) -> Result<DataChunk> {
1795 let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1796 Ok(DataChunkMutator(chunk)
1797 .nullify_build_side_for_non_equi_condition(&filter, *probe_column_count)
1798 .remove_duplicate_rows_for_left_outer_join(
1799 &filter,
1800 first_output_row_id,
1801 *has_more_output_rows,
1802 found_matched,
1803 )
1804 .take())
1805 }
1806
1807 async fn process_left_semi_anti_join_non_equi_condition<const ANTI_JOIN: bool>(
1810 chunk: DataChunk,
1811 cond: &dyn Expression,
1812 LeftNonEquiJoinState {
1813 first_output_row_id,
1814 found_matched,
1815 has_more_output_rows,
1816 ..
1817 }: &mut LeftNonEquiJoinState,
1818 ) -> Result<DataChunk> {
1819 let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1820 Ok(DataChunkMutator(chunk)
1821 .remove_duplicate_rows_for_left_semi_anti_join::<ANTI_JOIN>(
1822 &filter,
1823 first_output_row_id,
1824 *has_more_output_rows,
1825 found_matched,
1826 )
1827 .take())
1828 }
1829
1830 async fn process_right_outer_join_non_equi_condition(
1831 chunk: DataChunk,
1832 cond: &dyn Expression,
1833 RightNonEquiJoinState {
1834 build_row_ids,
1835 build_row_matched,
1836 }: &mut RightNonEquiJoinState,
1837 ) -> Result<DataChunk> {
1838 let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1839 Ok(DataChunkMutator(chunk)
1840 .remove_duplicate_rows_for_right_outer_join(&filter, build_row_ids, build_row_matched)
1841 .take())
1842 }
1843
1844 async fn process_right_semi_anti_join_non_equi_condition(
1845 chunk: DataChunk,
1846 cond: &dyn Expression,
1847 RightNonEquiJoinState {
1848 build_row_ids,
1849 build_row_matched,
1850 }: &mut RightNonEquiJoinState,
1851 ) -> Result<()> {
1852 let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1853 DataChunkMutator(chunk).remove_duplicate_rows_for_right_semi_anti_join(
1854 &filter,
1855 build_row_ids,
1856 build_row_matched,
1857 );
1858 Ok(())
1859 }
1860
1861 async fn process_full_outer_join_non_equi_condition(
1862 chunk: DataChunk,
1863 cond: &dyn Expression,
1864 left_non_equi_state: &mut LeftNonEquiJoinState,
1865 right_non_equi_state: &mut RightNonEquiJoinState,
1866 ) -> Result<DataChunk> {
1867 let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
1868 Ok(DataChunkMutator(chunk)
1869 .nullify_build_side_for_non_equi_condition(
1870 &filter,
1871 left_non_equi_state.probe_column_count,
1872 )
1873 .remove_duplicate_rows_for_full_outer_join(
1874 &filter,
1875 left_non_equi_state,
1876 right_non_equi_state,
1877 )
1878 .take())
1879 }
1880
1881 #[try_stream(ok = DataChunk, error = BatchError)]
1882 async fn handle_remaining_build_rows_for_right_outer_join<'a>(
1883 chunk_builder: &'a mut DataChunkBuilder,
1884 build_side: &'a [DataChunk],
1885 build_row_matched: &'a ChunkedData<bool>,
1886 probe_column_count: usize,
1887 ) {
1888 for build_row_id in build_row_matched
1889 .all_row_ids()
1890 .filter(|build_row_id| !build_row_matched[*build_row_id])
1891 {
1892 let build_row =
1893 build_side[build_row_id.chunk_id()].row_at_unchecked_vis(build_row_id.row_id());
1894 if let Some(spilled) = Self::append_one_row_with_null_probe_side(
1895 chunk_builder,
1896 build_row,
1897 probe_column_count,
1898 ) {
1899 yield spilled
1900 }
1901 }
1902 if let Some(spilled) = chunk_builder.consume_all() {
1903 yield spilled
1904 }
1905 }
1906
1907 #[try_stream(ok = DataChunk, error = BatchError)]
1908 async fn handle_remaining_build_rows_for_right_semi_anti_join<'a, const ANTI_JOIN: bool>(
1909 chunk_builder: &'a mut DataChunkBuilder,
1910 build_side: &'a [DataChunk],
1911 build_row_matched: &'a ChunkedData<bool>,
1912 ) {
1913 for build_row_id in build_row_matched.all_row_ids().filter(|build_row_id| {
1914 if !ANTI_JOIN {
1915 build_row_matched[*build_row_id]
1916 } else {
1917 !build_row_matched[*build_row_id]
1918 }
1919 }) {
1920 if let Some(spilled) = Self::append_one_build_row(
1921 chunk_builder,
1922 &build_side[build_row_id.chunk_id()],
1923 build_row_id.row_id(),
1924 ) {
1925 yield spilled
1926 }
1927 }
1928 if let Some(spilled) = chunk_builder.consume_all() {
1929 yield spilled
1930 }
1931 }
1932
1933 fn append_one_row(
1934 chunk_builder: &mut DataChunkBuilder,
1935 probe_chunk: &DataChunk,
1936 probe_row_id: usize,
1937 build_chunk: &DataChunk,
1938 build_row_id: usize,
1939 ) -> Option<DataChunk> {
1940 chunk_builder.append_one_row_from_array_elements(
1941 probe_chunk.columns().iter().map(|c| c.as_ref()),
1942 probe_row_id,
1943 build_chunk.columns().iter().map(|c| c.as_ref()),
1944 build_row_id,
1945 )
1946 }
1947
1948 fn append_one_probe_row(
1949 chunk_builder: &mut DataChunkBuilder,
1950 probe_chunk: &DataChunk,
1951 probe_row_id: usize,
1952 ) -> Option<DataChunk> {
1953 chunk_builder.append_one_row_from_array_elements(
1954 probe_chunk.columns().iter().map(|c| c.as_ref()),
1955 probe_row_id,
1956 empty(),
1957 0,
1958 )
1959 }
1960
1961 fn append_one_build_row(
1962 chunk_builder: &mut DataChunkBuilder,
1963 build_chunk: &DataChunk,
1964 build_row_id: usize,
1965 ) -> Option<DataChunk> {
1966 chunk_builder.append_one_row_from_array_elements(
1967 empty(),
1968 0,
1969 build_chunk.columns().iter().map(|c| c.as_ref()),
1970 build_row_id,
1971 )
1972 }
1973
1974 fn append_one_row_with_null_build_side(
1975 chunk_builder: &mut DataChunkBuilder,
1976 probe_row_ref: RowRef<'_>,
1977 build_column_count: usize,
1978 ) -> Option<DataChunk> {
1979 chunk_builder.append_one_row(probe_row_ref.chain(repeat_n(Datum::None, build_column_count)))
1980 }
1981
1982 fn append_one_row_with_null_probe_side(
1983 chunk_builder: &mut DataChunkBuilder,
1984 build_row_ref: RowRef<'_>,
1985 probe_column_count: usize,
1986 ) -> Option<DataChunk> {
1987 chunk_builder.append_one_row(repeat_n(Datum::None, probe_column_count).chain(build_row_ref))
1988 }
1989
1990 fn find_asof_matched_rows(
1991 probe_row_ref: RowRef<'_>,
1992 build_side: &[DataChunk],
1993 build_side_row_iter: RowIdIter<'_>,
1994 asof_join_condition: &AsOfDesc,
1995 ) -> Option<RowId> {
1996 let probe_inequality_value = probe_row_ref.datum_at(asof_join_condition.left_idx);
1997 if let Some(probe_inequality_scalar) = probe_inequality_value {
1998 let mut result_row_id: Option<RowId> = None;
1999 let mut build_row_ref;
2000
2001 for build_row_id in build_side_row_iter {
2002 build_row_ref =
2003 build_side[build_row_id.chunk_id()].row_at_unchecked_vis(build_row_id.row_id());
2004 let build_inequality_value = build_row_ref.datum_at(asof_join_condition.right_idx);
2005 if let Some(build_inequality_scalar) = build_inequality_value {
2006 let mut pick_result = |compare: fn(Ordering) -> bool| {
2007 if let Some(result_row_id_inner) = result_row_id {
2008 let result_row_ref = build_side[result_row_id_inner.chunk_id()]
2009 .row_at_unchecked_vis(result_row_id_inner.row_id());
2010 let result_inequality_scalar = result_row_ref
2011 .datum_at(asof_join_condition.right_idx)
2012 .unwrap();
2013 if compare(
2014 probe_inequality_scalar.default_cmp(&build_inequality_scalar),
2015 ) && compare(
2016 build_inequality_scalar.default_cmp(&result_inequality_scalar),
2017 ) {
2018 result_row_id = Some(build_row_id);
2019 }
2020 } else if compare(
2021 probe_inequality_scalar.default_cmp(&build_inequality_scalar),
2022 ) {
2023 result_row_id = Some(build_row_id);
2024 }
2025 };
2026 match asof_join_condition.inequality_type {
2027 AsOfInequalityType::Lt => {
2028 pick_result(Ordering::is_lt);
2029 }
2030 AsOfInequalityType::Le => {
2031 pick_result(Ordering::is_le);
2032 }
2033 AsOfInequalityType::Gt => {
2034 pick_result(Ordering::is_gt);
2035 }
2036 AsOfInequalityType::Ge => {
2037 pick_result(Ordering::is_ge);
2038 }
2039 }
2040 }
2041 }
2042 result_row_id
2043 } else {
2044 None
2045 }
2046 }
2047}
2048
2049#[repr(transparent)]
2051struct DataChunkMutator(DataChunk);
2052
2053impl DataChunkMutator {
2054 fn nullify_build_side_for_non_equi_condition(
2055 self,
2056 filter: &Bitmap,
2057 probe_column_count: usize,
2058 ) -> Self {
2059 let (mut columns, vis) = self.0.into_parts();
2060
2061 for build_column in columns.split_off(probe_column_count) {
2062 let mut array = Arc::try_unwrap(build_column).unwrap();
2064 array.set_bitmap(array.null_bitmap() & filter);
2065 columns.push(array.into());
2066 }
2067
2068 Self(DataChunk::new(columns, vis))
2069 }
2070
2071 fn remove_duplicate_rows_for_left_outer_join(
2072 mut self,
2073 filter: &Bitmap,
2074 first_output_row_ids: &mut Vec<usize>,
2075 has_more_output_rows: bool,
2076 found_non_null: &mut bool,
2077 ) -> Self {
2078 let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2079
2080 for (&start_row_id, &end_row_id) in iter::once(&0)
2081 .chain(first_output_row_ids.iter())
2082 .tuple_windows()
2083 .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2084 {
2085 for row_id in start_row_id..end_row_id {
2086 if filter.is_set(row_id) {
2087 *found_non_null = true;
2088 new_visibility.set(row_id, true);
2089 }
2090 }
2091 if !*found_non_null {
2092 new_visibility.set(start_row_id, true);
2093 }
2094 *found_non_null = false;
2095 }
2096
2097 let start_row_id = first_output_row_ids.last().copied().unwrap_or_default();
2098 for row_id in start_row_id..filter.len() {
2099 if filter.is_set(row_id) {
2100 *found_non_null = true;
2101 new_visibility.set(row_id, true);
2102 }
2103 }
2104 if !has_more_output_rows {
2105 if !*found_non_null {
2106 new_visibility.set(start_row_id, true);
2107 }
2108 *found_non_null = false;
2109 }
2110
2111 first_output_row_ids.clear();
2112
2113 self.0
2114 .set_visibility(new_visibility.finish() & self.0.visibility());
2115 self
2116 }
2117
2118 fn remove_duplicate_rows_for_left_semi_anti_join<const ANTI_JOIN: bool>(
2122 mut self,
2123 filter: &Bitmap,
2124 first_output_row_ids: &mut Vec<usize>,
2125 has_more_output_rows: bool,
2126 found_matched: &mut bool,
2127 ) -> Self {
2128 let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2129
2130 for (&start_row_id, &end_row_id) in iter::once(&0)
2131 .chain(first_output_row_ids.iter())
2132 .tuple_windows()
2133 .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2134 {
2135 for row_id in start_row_id..end_row_id {
2136 if filter.is_set(row_id) {
2137 if !ANTI_JOIN && !*found_matched {
2138 new_visibility.set(row_id, true);
2139 }
2140 *found_matched = true;
2141 break;
2142 }
2143 }
2144 if ANTI_JOIN && !*found_matched {
2145 new_visibility.set(start_row_id, true);
2146 }
2147 *found_matched = false;
2148 }
2149
2150 let start_row_id = first_output_row_ids.last().copied().unwrap_or_default();
2151 for row_id in start_row_id..filter.len() {
2152 if filter.is_set(row_id) {
2153 if !ANTI_JOIN && !*found_matched {
2154 new_visibility.set(row_id, true);
2155 }
2156 *found_matched = true;
2157 break;
2158 }
2159 }
2160 if !has_more_output_rows && ANTI_JOIN {
2161 if !*found_matched {
2162 new_visibility.set(start_row_id, true);
2163 }
2164 *found_matched = false;
2165 }
2166
2167 first_output_row_ids.clear();
2168
2169 self.0
2170 .set_visibility(new_visibility.finish() & self.0.visibility());
2171 self
2172 }
2173
2174 fn remove_duplicate_rows_for_right_outer_join(
2175 mut self,
2176 filter: &Bitmap,
2177 build_row_ids: &mut Vec<RowId>,
2178 build_row_matched: &mut ChunkedData<bool>,
2179 ) -> Self {
2180 let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2181 for (output_row_id, (output_row_non_null, &build_row_id)) in
2182 filter.iter().zip_eq_fast(build_row_ids.iter()).enumerate()
2183 {
2184 if output_row_non_null {
2185 build_row_matched[build_row_id] = true;
2186 new_visibility.set(output_row_id, true);
2187 }
2188 }
2189
2190 build_row_ids.clear();
2191
2192 self.0
2193 .set_visibility(new_visibility.finish() & self.0.visibility());
2194 self
2195 }
2196
2197 fn remove_duplicate_rows_for_right_semi_anti_join(
2198 self,
2199 filter: &Bitmap,
2200 build_row_ids: &mut Vec<RowId>,
2201 build_row_matched: &mut ChunkedData<bool>,
2202 ) {
2203 for (output_row_non_null, &build_row_id) in filter.iter().zip_eq_fast(build_row_ids.iter())
2204 {
2205 if output_row_non_null {
2206 build_row_matched[build_row_id] = true;
2207 }
2208 }
2209
2210 build_row_ids.clear();
2211 }
2212
2213 fn remove_duplicate_rows_for_full_outer_join(
2214 mut self,
2215 filter: &Bitmap,
2216 LeftNonEquiJoinState {
2217 first_output_row_id,
2218 has_more_output_rows,
2219 found_matched,
2220 ..
2221 }: &mut LeftNonEquiJoinState,
2222 RightNonEquiJoinState {
2223 build_row_ids,
2224 build_row_matched,
2225 }: &mut RightNonEquiJoinState,
2226 ) -> Self {
2227 let mut new_visibility = BitmapBuilder::zeroed(self.0.capacity());
2228
2229 for (&start_row_id, &end_row_id) in iter::once(&0)
2230 .chain(first_output_row_id.iter())
2231 .tuple_windows()
2232 .filter(|(start_row_id, end_row_id)| start_row_id < end_row_id)
2233 {
2234 for row_id in start_row_id..end_row_id {
2235 if filter.is_set(row_id) {
2236 *found_matched = true;
2237 new_visibility.set(row_id, true);
2238 }
2239 }
2240 if !*found_matched {
2241 new_visibility.set(start_row_id, true);
2242 }
2243 *found_matched = false;
2244 }
2245
2246 let start_row_id = first_output_row_id.last().copied().unwrap_or_default();
2247 for row_id in start_row_id..filter.len() {
2248 if filter.is_set(row_id) {
2249 *found_matched = true;
2250 new_visibility.set(row_id, true);
2251 }
2252 }
2253 if !*has_more_output_rows && !*found_matched {
2254 new_visibility.set(start_row_id, true);
2255 }
2256
2257 first_output_row_id.clear();
2258
2259 for (output_row_id, (output_row_non_null, &build_row_id)) in
2260 filter.iter().zip_eq_fast(build_row_ids.iter()).enumerate()
2261 {
2262 if output_row_non_null {
2263 build_row_matched[build_row_id] = true;
2264 new_visibility.set(output_row_id, true);
2265 }
2266 }
2267
2268 build_row_ids.clear();
2269
2270 self.0
2271 .set_visibility(new_visibility.finish() & self.0.visibility());
2272 self
2273 }
2274
2275 fn take(self) -> DataChunk {
2276 self.0
2277 }
2278}
2279
2280impl BoxedExecutorBuilder for HashJoinExecutor<()> {
2281 async fn new_boxed_executor(
2282 context: &ExecutorBuilder<'_>,
2283 inputs: Vec<BoxedExecutor>,
2284 ) -> Result<BoxedExecutor> {
2285 let [left_child, right_child]: [_; 2] = inputs.try_into().unwrap();
2286
2287 let hash_join_node = try_match_expand!(
2288 context.plan_node().get_node_body().unwrap(),
2289 NodeBody::HashJoin
2290 )?;
2291
2292 let join_type = JoinType::from_prost(hash_join_node.get_join_type()?);
2293
2294 let cond = match hash_join_node.get_condition() {
2295 Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
2296 Err(_) => None,
2297 };
2298
2299 let left_key_idxs = hash_join_node
2300 .get_left_key()
2301 .iter()
2302 .map(|&idx| idx as usize)
2303 .collect_vec();
2304 let right_key_idxs = hash_join_node
2305 .get_right_key()
2306 .iter()
2307 .map(|&idx| idx as usize)
2308 .collect_vec();
2309
2310 ensure!(left_key_idxs.len() == right_key_idxs.len());
2311
2312 let right_data_types = right_child.schema().data_types();
2313 let right_key_types = right_key_idxs
2314 .iter()
2315 .map(|&idx| right_data_types[idx].clone())
2316 .collect_vec();
2317
2318 let output_indices: Vec<usize> = hash_join_node
2319 .get_output_indices()
2320 .iter()
2321 .map(|&x| x as usize)
2322 .collect();
2323
2324 let identity = context.plan_node().get_identity().clone();
2325
2326 let asof_desc = hash_join_node
2327 .asof_desc
2328 .map(|desc| AsOfDesc::from_protobuf(&desc))
2329 .transpose()?;
2330
2331 Ok(HashJoinExecutorArgs {
2332 join_type,
2333 output_indices,
2334 probe_side_source: left_child,
2335 build_side_source: right_child,
2336 probe_key_idxs: left_key_idxs,
2337 build_key_idxs: right_key_idxs,
2338 null_matched: hash_join_node.get_null_safe().clone(),
2339 cond,
2340 identity: identity.clone(),
2341 right_key_types,
2342 chunk_size: context.context().get_config().developer.chunk_size,
2343 asof_desc,
2344 spill_backend: if context.context().get_config().enable_spill {
2345 Some(Disk)
2346 } else {
2347 None
2348 },
2349 spill_metrics: context.context().spill_metrics(),
2350 shutdown_rx: context.shutdown_rx().clone(),
2351 mem_ctx: context.context().create_executor_mem_context(&identity),
2352 }
2353 .dispatch())
2354 }
2355}
2356
2357struct HashJoinExecutorArgs {
2358 join_type: JoinType,
2359 output_indices: Vec<usize>,
2360 probe_side_source: BoxedExecutor,
2361 build_side_source: BoxedExecutor,
2362 probe_key_idxs: Vec<usize>,
2363 build_key_idxs: Vec<usize>,
2364 null_matched: Vec<bool>,
2365 cond: Option<BoxedExpression>,
2366 identity: String,
2367 right_key_types: Vec<DataType>,
2368 chunk_size: usize,
2369 asof_desc: Option<AsOfDesc>,
2370 spill_backend: Option<SpillBackend>,
2371 spill_metrics: Arc<BatchSpillMetrics>,
2372 shutdown_rx: ShutdownToken,
2373 mem_ctx: MemoryContext,
2374}
2375
2376impl HashKeyDispatcher for HashJoinExecutorArgs {
2377 type Output = BoxedExecutor;
2378
2379 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
2380 Box::new(HashJoinExecutor::<K>::new(
2381 self.join_type,
2382 self.output_indices,
2383 self.probe_side_source,
2384 self.build_side_source,
2385 self.probe_key_idxs,
2386 self.build_key_idxs,
2387 self.null_matched,
2388 self.cond.map(Arc::new),
2389 self.identity,
2390 self.chunk_size,
2391 self.asof_desc,
2392 self.spill_backend,
2393 self.spill_metrics,
2394 self.shutdown_rx,
2395 self.mem_ctx,
2396 ))
2397 }
2398
2399 fn data_types(&self) -> &[DataType] {
2400 &self.right_key_types
2401 }
2402}
2403
2404impl<K> HashJoinExecutor<K> {
2405 #[allow(clippy::too_many_arguments)]
2406 pub fn new(
2407 join_type: JoinType,
2408 output_indices: Vec<usize>,
2409 probe_side_source: BoxedExecutor,
2410 build_side_source: BoxedExecutor,
2411 probe_key_idxs: Vec<usize>,
2412 build_key_idxs: Vec<usize>,
2413 null_matched: Vec<bool>,
2414 cond: Option<Arc<BoxedExpression>>,
2415 identity: String,
2416 chunk_size: usize,
2417 asof_desc: Option<AsOfDesc>,
2418 spill_backend: Option<SpillBackend>,
2419 spill_metrics: Arc<BatchSpillMetrics>,
2420 shutdown_rx: ShutdownToken,
2421 mem_ctx: MemoryContext,
2422 ) -> Self {
2423 Self::new_inner(
2424 join_type,
2425 output_indices,
2426 probe_side_source,
2427 build_side_source,
2428 probe_key_idxs,
2429 build_key_idxs,
2430 null_matched,
2431 cond,
2432 identity,
2433 chunk_size,
2434 asof_desc,
2435 spill_backend,
2436 spill_metrics,
2437 None,
2438 shutdown_rx,
2439 mem_ctx,
2440 )
2441 }
2442
2443 #[allow(clippy::too_many_arguments)]
2444 fn new_inner(
2445 join_type: JoinType,
2446 output_indices: Vec<usize>,
2447 probe_side_source: BoxedExecutor,
2448 build_side_source: BoxedExecutor,
2449 probe_key_idxs: Vec<usize>,
2450 build_key_idxs: Vec<usize>,
2451 null_matched: Vec<bool>,
2452 cond: Option<Arc<BoxedExpression>>,
2453 identity: String,
2454 chunk_size: usize,
2455 asof_desc: Option<AsOfDesc>,
2456 spill_backend: Option<SpillBackend>,
2457 spill_metrics: Arc<BatchSpillMetrics>,
2458 memory_upper_bound: Option<u64>,
2459 shutdown_rx: ShutdownToken,
2460 mem_ctx: MemoryContext,
2461 ) -> Self {
2462 assert_eq!(probe_key_idxs.len(), build_key_idxs.len());
2463 assert_eq!(probe_key_idxs.len(), null_matched.len());
2464 let original_schema = match join_type {
2465 JoinType::LeftSemi | JoinType::LeftAnti => probe_side_source.schema().clone(),
2466 JoinType::RightSemi | JoinType::RightAnti => build_side_source.schema().clone(),
2467 _ => Schema::from_iter(
2468 probe_side_source
2469 .schema()
2470 .fields()
2471 .iter()
2472 .chain(build_side_source.schema().fields().iter())
2473 .cloned(),
2474 ),
2475 };
2476 let schema = Schema::from_iter(
2477 output_indices
2478 .iter()
2479 .map(|&idx| original_schema[idx].clone()),
2480 );
2481 Self {
2482 join_type,
2483 original_schema,
2484 schema,
2485 output_indices,
2486 probe_side_source,
2487 build_side_source,
2488 probe_key_idxs,
2489 build_key_idxs,
2490 null_matched,
2491 cond,
2492 identity,
2493 chunk_size,
2494 asof_desc,
2495 shutdown_rx,
2496 spill_backend,
2497 spill_metrics,
2498 memory_upper_bound,
2499 mem_ctx,
2500 _phantom: PhantomData,
2501 }
2502 }
2503}
2504
2505#[cfg(test)]
2506mod tests {
2507 use futures::StreamExt;
2508 use futures_async_stream::for_await;
2509 use itertools::Itertools;
2510 use risingwave_common::array::{ArrayBuilderImpl, DataChunk};
2511 use risingwave_common::catalog::{Field, Schema};
2512 use risingwave_common::hash::Key32;
2513 use risingwave_common::memory::MemoryContext;
2514 use risingwave_common::metrics::LabelGuardedIntGauge;
2515 use risingwave_common::test_prelude::DataChunkTestExt;
2516 use risingwave_common::types::DataType;
2517 use risingwave_common::util::iter_util::ZipEqDebug;
2518 use risingwave_common::util::memcmp_encoding::encode_chunk;
2519 use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
2520 use risingwave_expr::expr::{BoxedExpression, build_from_pretty};
2521
2522 use super::{
2523 AsOfDesc, AsOfInequalityType, ChunkedData, HashJoinExecutor, JoinType,
2524 LeftNonEquiJoinState, RightNonEquiJoinState, RowId,
2525 };
2526 use crate::error::Result;
2527 use crate::executor::test_utils::MockExecutor;
2528 use crate::executor::{BoxedExecutor, Executor};
2529 use crate::monitor::BatchSpillMetrics;
2530 use crate::spill::spill_op::SpillBackend;
2531 use crate::task::ShutdownToken;
2532
2533 const CHUNK_SIZE: usize = 1024;
2534
2535 struct DataChunkMerger {
2536 array_builders: Vec<ArrayBuilderImpl>,
2537 array_len: usize,
2538 }
2539
2540 impl DataChunkMerger {
2541 fn new(data_types: Vec<DataType>) -> Result<Self> {
2542 let array_builders = data_types
2543 .iter()
2544 .map(|data_type| data_type.create_array_builder(CHUNK_SIZE))
2545 .collect();
2546
2547 Ok(Self {
2548 array_builders,
2549 array_len: 0,
2550 })
2551 }
2552
2553 fn append(&mut self, data_chunk: &DataChunk) -> Result<()> {
2554 ensure!(self.array_builders.len() == data_chunk.dimension());
2555 for idx in 0..self.array_builders.len() {
2556 self.array_builders[idx].append_array(data_chunk.column_at(idx));
2557 }
2558 self.array_len += data_chunk.capacity();
2559
2560 Ok(())
2561 }
2562
2563 fn finish(self) -> Result<DataChunk> {
2564 let columns = self
2565 .array_builders
2566 .into_iter()
2567 .map(|b| b.finish().into())
2568 .collect();
2569
2570 Ok(DataChunk::new(columns, self.array_len))
2571 }
2572 }
2573
2574 fn compare_data_chunk_with_rowsort(left: &DataChunk, right: &DataChunk) -> bool {
2576 assert!(left.is_vis_compacted());
2577 assert!(right.is_vis_compacted());
2578
2579 if left.cardinality() != right.cardinality() {
2580 return false;
2581 }
2582
2583 let column_orders = (0..left.columns().len())
2585 .map(|i| ColumnOrder::new(i, OrderType::ascending()))
2586 .collect_vec();
2587 let left_encoded_chunk = encode_chunk(left, &column_orders).unwrap();
2588 let mut sorted_left = left_encoded_chunk
2589 .into_iter()
2590 .enumerate()
2591 .map(|(row_id, row)| (left.row_at_unchecked_vis(row_id), row))
2592 .collect_vec();
2593 sorted_left.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
2594
2595 let right_encoded_chunk = encode_chunk(right, &column_orders).unwrap();
2596 let mut sorted_right = right_encoded_chunk
2597 .into_iter()
2598 .enumerate()
2599 .map(|(row_id, row)| (right.row_at_unchecked_vis(row_id), row))
2600 .collect_vec();
2601 sorted_right.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
2602
2603 sorted_left
2604 .into_iter()
2605 .map(|(row, _)| row)
2606 .zip_eq_debug(sorted_right.into_iter().map(|(row, _)| row))
2607 .all(|(row1, row2)| row1 == row2)
2608 }
2609
2610 struct TestFixture {
2611 left_types: Vec<DataType>,
2612 right_types: Vec<DataType>,
2613 join_type: JoinType,
2614 }
2615
2616 impl TestFixture {
2633 fn with_join_type(join_type: JoinType) -> Self {
2634 Self {
2635 left_types: vec![DataType::Int32, DataType::Float32],
2636 right_types: vec![DataType::Int32, DataType::Float64],
2637 join_type,
2638 }
2639 }
2640
2641 fn create_left_executor(&self) -> BoxedExecutor {
2642 let schema = Schema {
2643 fields: vec![
2644 Field::unnamed(DataType::Int32),
2645 Field::unnamed(DataType::Float32),
2646 ],
2647 };
2648 let mut executor = MockExecutor::new(schema);
2649
2650 executor.add(DataChunk::from_pretty(
2651 "i f
2652 1 6.1
2653 2 .
2654 . 8.4
2655 3 3.9
2656 . . ",
2657 ));
2658
2659 executor.add(DataChunk::from_pretty(
2660 "i f
2661 4 6.6
2662 3 .
2663 . 0.7
2664 5 .
2665 . 5.5",
2666 ));
2667
2668 Box::new(executor)
2669 }
2670
2671 fn create_right_executor(&self) -> BoxedExecutor {
2672 let schema = Schema {
2673 fields: vec![
2674 Field::unnamed(DataType::Int32),
2675 Field::unnamed(DataType::Float64),
2676 ],
2677 };
2678 let mut executor = MockExecutor::new(schema);
2679
2680 executor.add(DataChunk::from_pretty(
2681 "i F
2682 8 6.1
2683 2 .
2684 . 8.9
2685 3 .
2686 . 3.5
2687 6 . ",
2688 ));
2689
2690 executor.add(DataChunk::from_pretty(
2691 "i F
2692 4 7.5
2693 6 .
2694 . 8
2695 7 .
2696 . 9.1
2697 9 . ",
2698 ));
2699
2700 executor.add(DataChunk::from_pretty(
2701 " i F
2702 3 3.7
2703 9 .
2704 . 9.6
2705 100 .
2706 . 8.18
2707 200 . ",
2708 ));
2709
2710 Box::new(executor)
2711 }
2712
2713 fn output_data_types(&self) -> Vec<DataType> {
2714 let join_type = self.join_type;
2715 if join_type.keep_all() {
2716 [self.left_types.clone(), self.right_types.clone()].concat()
2717 } else if join_type.keep_left() {
2718 self.left_types.clone()
2719 } else if join_type.keep_right() {
2720 self.right_types.clone()
2721 } else {
2722 unreachable!()
2723 }
2724 }
2725
2726 fn create_cond() -> BoxedExpression {
2727 build_from_pretty("(less_than:boolean $1:float4 $3:float8)")
2728 }
2729
2730 fn create_join_executor_with_chunk_size_and_executors(
2731 &self,
2732 has_non_equi_cond: bool,
2733 null_safe: bool,
2734 chunk_size: usize,
2735 left_child: BoxedExecutor,
2736 right_child: BoxedExecutor,
2737 shutdown_rx: ShutdownToken,
2738 parent_mem_ctx: Option<MemoryContext>,
2739 test_spill: bool,
2740 ) -> BoxedExecutor {
2741 let join_type = self.join_type;
2742
2743 let output_indices = (0..match join_type {
2744 JoinType::LeftSemi | JoinType::LeftAnti => left_child.schema().fields().len(),
2745 JoinType::RightSemi | JoinType::RightAnti => right_child.schema().fields().len(),
2746 _ => left_child.schema().fields().len() + right_child.schema().fields().len(),
2747 })
2748 .collect();
2749
2750 let cond = if has_non_equi_cond {
2751 Some(Self::create_cond().into())
2752 } else {
2753 None
2754 };
2755
2756 let mem_ctx = if test_spill {
2757 MemoryContext::new_with_mem_limit(
2758 parent_mem_ctx,
2759 LabelGuardedIntGauge::test_int_gauge::<4>(),
2760 0,
2761 )
2762 } else {
2763 MemoryContext::new(parent_mem_ctx, LabelGuardedIntGauge::test_int_gauge::<4>())
2764 };
2765 Box::new(HashJoinExecutor::<Key32>::new(
2766 join_type,
2767 output_indices,
2768 left_child,
2769 right_child,
2770 vec![0],
2771 vec![0],
2772 vec![null_safe],
2773 cond,
2774 "HashJoinExecutor".to_owned(),
2775 chunk_size,
2776 None,
2777 if test_spill {
2778 Some(SpillBackend::Memory)
2779 } else {
2780 None
2781 },
2782 BatchSpillMetrics::for_test(),
2783 shutdown_rx,
2784 mem_ctx,
2785 ))
2786 }
2787
2788 async fn do_test(&self, expected: DataChunk, has_non_equi_cond: bool, null_safe: bool) {
2789 let left_executor = self.create_left_executor();
2790 let right_executor = self.create_right_executor();
2791 self.do_test_with_chunk_size_and_executors(
2792 expected.clone(),
2793 has_non_equi_cond,
2794 null_safe,
2795 self::CHUNK_SIZE,
2796 left_executor,
2797 right_executor,
2798 false,
2799 )
2800 .await;
2801
2802 let left_executor = self.create_left_executor();
2804 let right_executor = self.create_right_executor();
2805 self.do_test_with_chunk_size_and_executors(
2806 expected,
2807 has_non_equi_cond,
2808 null_safe,
2809 self::CHUNK_SIZE,
2810 left_executor,
2811 right_executor,
2812 true,
2813 )
2814 .await;
2815 }
2816
2817 async fn do_test_with_chunk_size_and_executors(
2818 &self,
2819 expected: DataChunk,
2820 has_non_equi_cond: bool,
2821 null_safe: bool,
2822 chunk_size: usize,
2823 left_executor: BoxedExecutor,
2824 right_executor: BoxedExecutor,
2825 test_spill: bool,
2826 ) {
2827 let parent_mem_context =
2828 MemoryContext::root(LabelGuardedIntGauge::test_int_gauge::<4>(), u64::MAX);
2829
2830 {
2831 let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2832 has_non_equi_cond,
2833 null_safe,
2834 chunk_size,
2835 left_executor,
2836 right_executor,
2837 ShutdownToken::empty(),
2838 Some(parent_mem_context.clone()),
2839 test_spill,
2840 );
2841
2842 let mut data_chunk_merger = DataChunkMerger::new(self.output_data_types()).unwrap();
2843
2844 let fields = &join_executor.schema().fields;
2845
2846 if self.join_type.keep_all() {
2847 assert_eq!(fields[1].data_type, DataType::Float32);
2848 assert_eq!(fields[3].data_type, DataType::Float64);
2849 } else if self.join_type.keep_left() {
2850 assert_eq!(fields[1].data_type, DataType::Float32);
2851 } else if self.join_type.keep_right() {
2852 assert_eq!(fields[1].data_type, DataType::Float64)
2853 } else {
2854 unreachable!()
2855 }
2856
2857 let mut stream = join_executor.execute();
2858
2859 while let Some(data_chunk) = stream.next().await {
2860 let data_chunk = data_chunk.unwrap();
2861 let data_chunk = data_chunk.compact_vis();
2862 data_chunk_merger.append(&data_chunk).unwrap();
2863 }
2864
2865 let result_chunk = data_chunk_merger.finish().unwrap();
2866 println!("expected: {:?}", expected);
2867 println!("result: {:?}", result_chunk);
2868
2869 assert!(compare_data_chunk_with_rowsort(&expected, &result_chunk));
2872 }
2873
2874 assert_eq!(0, parent_mem_context.get_bytes_used());
2875 }
2876
2877 async fn do_test_shutdown(&self, has_non_equi_cond: bool) {
2878 let left_executor = self.create_left_executor();
2880 let right_executor = self.create_right_executor();
2881 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
2882 let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2883 has_non_equi_cond,
2884 false,
2885 self::CHUNK_SIZE,
2886 left_executor,
2887 right_executor,
2888 shutdown_rx,
2889 None,
2890 false,
2891 );
2892 shutdown_tx.cancel();
2893 #[for_await]
2894 for chunk in join_executor.execute() {
2895 assert!(chunk.is_err());
2896 break;
2897 }
2898
2899 let left_executor = self.create_left_executor();
2901 let right_executor = self.create_right_executor();
2902 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
2903 let join_executor = self.create_join_executor_with_chunk_size_and_executors(
2904 has_non_equi_cond,
2905 false,
2906 self::CHUNK_SIZE,
2907 left_executor,
2908 right_executor,
2909 shutdown_rx,
2910 None,
2911 false,
2912 );
2913 shutdown_tx.abort("test");
2914 #[for_await]
2915 for chunk in join_executor.execute() {
2916 assert!(chunk.is_err());
2917 break;
2918 }
2919 }
2920 }
2921
2922 #[tokio::test]
2927 async fn test_inner_join() {
2928 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2929
2930 let expected_chunk = DataChunk::from_pretty(
2931 "i f i F
2932 2 . 2 .
2933 3 3.9 3 3.7
2934 3 3.9 3 .
2935 4 6.6 4 7.5
2936 3 . 3 3.7
2937 3 . 3 .",
2938 );
2939
2940 test_fixture.do_test(expected_chunk, false, false).await;
2941 }
2942
2943 #[tokio::test]
2948 async fn test_null_safe_inner_join() {
2949 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2950
2951 let expected_chunk = DataChunk::from_pretty(
2952 "i f i F
2953 2 . 2 .
2954 . 8.4 . 8.18
2955 . 8.4 . 9.6
2956 . 8.4 . 9.1
2957 . 8.4 . 8
2958 . 8.4 . 3.5
2959 . 8.4 . 8.9
2960 3 3.9 3 3.7
2961 3 3.9 3 .
2962 . . . 8.18
2963 . . . 9.6
2964 . . . 9.1
2965 . . . 8
2966 . . . 3.5
2967 . . . 8.9
2968 4 6.6 4 7.5
2969 3 . 3 3.7
2970 3 . 3 .
2971 . 0.7 . 8.18
2972 . 0.7 . 9.6
2973 . 0.7 . 9.1
2974 . 0.7 . 8
2975 . 0.7 . 3.5
2976 . 0.7 . 8.9
2977 . 5.5 . 8.18
2978 . 5.5 . 9.6
2979 . 5.5 . 9.1
2980 . 5.5 . 8
2981 . 5.5 . 3.5
2982 . 5.5 . 8.9",
2983 );
2984
2985 test_fixture.do_test(expected_chunk, false, true).await;
2986 }
2987
2988 #[tokio::test]
2993 async fn test_inner_join_with_non_equi_condition() {
2994 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
2995
2996 let expected_chunk = DataChunk::from_pretty(
2997 "i f i F
2998 4 6.6 4 7.5",
2999 );
3000
3001 test_fixture.do_test(expected_chunk, true, false).await;
3002 }
3003
3004 #[tokio::test]
3009 async fn test_left_outer_join() {
3010 let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
3011
3012 let expected_chunk = DataChunk::from_pretty(
3013 "i f i F
3014 1 6.1 . .
3015 2 . 2 .
3016 . 8.4 . .
3017 3 3.9 3 3.7
3018 3 3.9 3 .
3019 . . . .
3020 4 6.6 4 7.5
3021 3 . 3 3.7
3022 3 . 3 .
3023 . 0.7 . .
3024 5 . . .
3025 . 5.5 . .",
3026 );
3027
3028 test_fixture.do_test(expected_chunk, false, false).await;
3029 }
3030
3031 #[tokio::test]
3036 async fn test_left_outer_join_with_non_equi_condition() {
3037 let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
3038
3039 let expected_chunk = DataChunk::from_pretty(
3040 "i f i F
3041 2 . . .
3042 3 3.9 . .
3043 4 6.6 4 7.5
3044 3 . . .
3045 1 6.1 . .
3046 . 8.4 . .
3047 . . . .
3048 . 0.7 . .
3049 5 . . .
3050 . 5.5 . .",
3051 );
3052
3053 test_fixture.do_test(expected_chunk, true, false).await;
3054 }
3055
3056 #[tokio::test]
3061 async fn test_right_outer_join() {
3062 let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
3063
3064 let expected_chunk = DataChunk::from_pretty(
3065 "i f i F
3066 2 . 2 .
3067 3 3.9 3 3.7
3068 3 3.9 3 .
3069 4 6.6 4 7.5
3070 3 . 3 3.7
3071 3 . 3 .
3072 . . 8 6.1
3073 . . . 8.9
3074 . . . 3.5
3075 . . 6 .
3076 . . 6 .
3077 . . . 8
3078 . . 7 .
3079 . . . 9.1
3080 . . 9 .
3081 . . 9 .
3082 . . . 9.6
3083 . . 100 .
3084 . . . 8.18
3085 . . 200 .",
3086 );
3087
3088 test_fixture.do_test(expected_chunk, false, false).await;
3089 }
3090
3091 #[tokio::test]
3096 async fn test_right_outer_join_with_non_equi_condition() {
3097 let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
3098
3099 let expected_chunk = DataChunk::from_pretty(
3100 "i f i F
3101 4 6.6 4 7.5
3102 . . 8 6.1
3103 . . 2 .
3104 . . . 8.9
3105 . . 3 .
3106 . . . 3.5
3107 . . 6 .
3108 . . 6 .
3109 . . . 8
3110 . . 7 .
3111 . . . 9.1
3112 . . 9 .
3113 . . 3 3.7
3114 . . 9 .
3115 . . . 9.6
3116 . . 100 .
3117 . . . 8.18
3118 . . 200 .",
3119 );
3120
3121 test_fixture.do_test(expected_chunk, true, false).await;
3122 }
3123
3124 #[tokio::test]
3128 async fn test_full_outer_join() {
3129 let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
3130
3131 let expected_chunk = DataChunk::from_pretty(
3132 "i f i F
3133 1 6.1 . .
3134 2 . 2 .
3135 . 8.4 . .
3136 3 3.9 3 3.7
3137 3 3.9 3 .
3138 . . . .
3139 4 6.6 4 7.5
3140 3 . 3 3.7
3141 3 . 3 .
3142 . 0.7 . .
3143 5 . . .
3144 . 5.5 . .
3145 . . 8 6.1
3146 . . . 8.9
3147 . . . 3.5
3148 . . 6 .
3149 . . 6 .
3150 . . . 8
3151 . . 7 .
3152 . . . 9.1
3153 . . 9 .
3154 . . 9 .
3155 . . . 9.6
3156 . . 100 .
3157 . . . 8.18
3158 . . 200 .",
3159 );
3160
3161 test_fixture.do_test(expected_chunk, false, false).await;
3162 }
3163
3164 #[tokio::test]
3168 async fn test_full_outer_join_with_non_equi_condition() {
3169 let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
3170
3171 let expected_chunk = DataChunk::from_pretty(
3172 "i f i F
3173 2 . . .
3174 3 3.9 . .
3175 4 6.6 4 7.5
3176 3 . . .
3177 1 6.1 . .
3178 . 8.4 . .
3179 . . . .
3180 . 0.7 . .
3181 5 . . .
3182 . 5.5 . .
3183 . . 8 6.1
3184 . . 2 .
3185 . . . 8.9
3186 . . 3 .
3187 . . . 3.5
3188 . . 6 .
3189 . . 6 .
3190 . . . 8
3191 . . 7 .
3192 . . . 9.1
3193 . . 9 .
3194 . . 3 3.7
3195 . . 9 .
3196 . . . 9.6
3197 . . 100 .
3198 . . . 8.18
3199 . . 200 .",
3200 );
3201
3202 test_fixture.do_test(expected_chunk, true, false).await;
3203 }
3204
3205 #[tokio::test]
3206 async fn test_left_anti_join() {
3207 let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
3208
3209 let expected_chunk = DataChunk::from_pretty(
3210 "i f
3211 1 6.1
3212 . 8.4
3213 . .
3214 . 0.7
3215 5 .
3216 . 5.5",
3217 );
3218
3219 test_fixture.do_test(expected_chunk, false, false).await;
3220 }
3221
3222 #[tokio::test]
3223 async fn test_left_anti_join_with_non_equi_condition() {
3224 let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
3225
3226 let expected_chunk = DataChunk::from_pretty(
3227 "i f
3228 2 .
3229 3 3.9
3230 3 .
3231 1 6.1
3232 . 8.4
3233 . .
3234 . 0.7
3235 5 .
3236 . 5.5",
3237 );
3238
3239 test_fixture.do_test(expected_chunk, true, false).await;
3240 }
3241
3242 #[tokio::test]
3243 async fn test_left_semi_join() {
3244 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3245
3246 let expected_chunk = DataChunk::from_pretty(
3247 "i f
3248 2 .
3249 3 3.9
3250 4 6.6
3251 3 .",
3252 );
3253
3254 test_fixture.do_test(expected_chunk, false, false).await;
3255 }
3256
3257 #[tokio::test]
3258 async fn test_left_semi_join_with_non_equi_condition() {
3259 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3260
3261 let expected_chunk = DataChunk::from_pretty(
3262 "i f
3263 4 6.6",
3264 );
3265
3266 test_fixture.do_test(expected_chunk, true, false).await;
3267 }
3268
3269 #[tokio::test]
3274 async fn test_left_semi_join_with_non_equi_condition_duplicates() {
3275 let schema = Schema {
3276 fields: vec![
3277 Field::unnamed(DataType::Int32),
3278 Field::unnamed(DataType::Float32),
3279 ],
3280 };
3281
3282 let mut left_executor = MockExecutor::new(schema);
3284 left_executor.add(DataChunk::from_pretty(
3285 "i f
3286 1 1.0
3287 1 1.0
3288 1 1.0
3289 1 1.0
3290 2 1.0",
3291 ));
3292
3293 let schema = Schema {
3295 fields: vec![
3296 Field::unnamed(DataType::Int32),
3297 Field::unnamed(DataType::Float64),
3298 ],
3299 };
3300 let mut right_executor = MockExecutor::new(schema);
3301 right_executor.add(DataChunk::from_pretty(
3302 "i F
3303 1 2.0
3304 1 2.0
3305 1 2.0
3306 1 2.0
3307 2 2.0",
3308 ));
3309
3310 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
3311 let expected_chunk = DataChunk::from_pretty(
3312 "i f
3313 1 1.0
3314 1 1.0
3315 1 1.0
3316 1 1.0
3317 2 1.0",
3318 );
3319
3320 test_fixture
3321 .do_test_with_chunk_size_and_executors(
3322 expected_chunk,
3323 true,
3324 false,
3325 3,
3326 Box::new(left_executor),
3327 Box::new(right_executor),
3328 false,
3329 )
3330 .await;
3331 }
3332
3333 #[tokio::test]
3334 async fn test_right_anti_join() {
3335 let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
3336
3337 let expected_chunk = DataChunk::from_pretty(
3338 "i F
3339 8 6.1
3340 . 8.9
3341 . 3.5
3342 6 .
3343 6 .
3344 . 8.0
3345 7 .
3346 . 9.1
3347 9 .
3348 9 .
3349 . 9.6
3350 100 .
3351 . 8.18
3352 200 .",
3353 );
3354
3355 test_fixture.do_test(expected_chunk, false, false).await;
3356 }
3357
3358 #[tokio::test]
3359 async fn test_right_anti_join_with_non_equi_condition() {
3360 let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
3361
3362 let expected_chunk = DataChunk::from_pretty(
3363 "i F
3364 8 6.1
3365 2 .
3366 . 8.9
3367 3 .
3368 . 3.5
3369 6 .
3370 6 .
3371 . 8
3372 7 .
3373 . 9.1
3374 9 .
3375 3 3.7
3376 9 .
3377 . 9.6
3378 100 .
3379 . 8.18
3380 200 .",
3381 );
3382
3383 test_fixture.do_test(expected_chunk, true, false).await;
3384 }
3385
3386 #[tokio::test]
3387 async fn test_right_semi_join() {
3388 let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
3389
3390 let expected_chunk = DataChunk::from_pretty(
3391 "i F
3392 2 .
3393 3 .
3394 4 7.5
3395 3 3.7",
3396 );
3397
3398 test_fixture.do_test(expected_chunk, false, false).await;
3399 }
3400
3401 #[tokio::test]
3402 async fn test_right_semi_join_with_non_equi_condition() {
3403 let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
3404
3405 let expected_chunk = DataChunk::from_pretty(
3406 "i F
3407 4 7.5",
3408 );
3409
3410 test_fixture.do_test(expected_chunk, true, false).await;
3411 }
3412
3413 #[tokio::test]
3414 async fn test_process_left_outer_join_non_equi_condition() {
3415 let chunk = DataChunk::from_pretty(
3416 "i f i F
3417 1 3.5 1 5.5
3418 1 3.5 1 2.5
3419 2 4.0 . .
3420 3 5.0 3 4.0
3421 3 5.0 3 3.0
3422 3 5.0 3 4.0
3423 3 5.0 3 3.0
3424 4 1.0 4 0
3425 4 1.0 4 9.0",
3426 );
3427 let expect = DataChunk::from_pretty(
3428 "i f i F
3429 1 3.5 1 5.5
3430 2 4.0 . .
3431 3 5.0 . .
3432 3 5.0 . .
3433 4 1.0 4 9.0",
3434 );
3435 let cond = TestFixture::create_cond();
3436 let mut state = LeftNonEquiJoinState {
3437 probe_column_count: 2,
3438 first_output_row_id: vec![0, 2, 3, 5, 7],
3439 has_more_output_rows: true,
3440 found_matched: false,
3441 };
3442 assert!(compare_data_chunk_with_rowsort(
3443 &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3444 chunk,
3445 cond.as_ref(),
3446 &mut state
3447 )
3448 .await
3449 .unwrap()
3450 .compact_vis(),
3451 &expect
3452 ));
3453 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3454 assert!(state.found_matched);
3455
3456 let chunk = DataChunk::from_pretty(
3457 "i f i F
3458 4 1.0 4 0.6
3459 4 1.0 4 2.0
3460 5 4.0 5 .
3461 6 7.0 6 .
3462 6 7.0 6 5.0",
3463 );
3464 let expect = DataChunk::from_pretty(
3465 "i f i F
3466 4 1.0 4 2.0
3467 5 4.0 . .
3468 6 7.0 . .",
3469 );
3470 state.first_output_row_id = vec![2, 3];
3471 state.has_more_output_rows = false;
3472 assert!(compare_data_chunk_with_rowsort(
3473 &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3474 chunk,
3475 cond.as_ref(),
3476 &mut state
3477 )
3478 .await
3479 .unwrap()
3480 .compact_vis(),
3481 &expect
3482 ));
3483 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3484 assert!(!state.found_matched);
3485
3486 let chunk = DataChunk::from_pretty(
3487 "i f i F
3488 4 1.0 4 0.6
3489 4 1.0 4 1.0
3490 5 4.0 5 .
3491 6 7.0 6 .
3492 6 7.0 6 8.0",
3493 );
3494 let expect = DataChunk::from_pretty(
3495 "i f i F
3496 4 1.0 . .
3497 5 4.0 . .
3498 6 7.0 6 8.0",
3499 );
3500 state.first_output_row_id = vec![2, 3];
3501 state.has_more_output_rows = false;
3502 assert!(compare_data_chunk_with_rowsort(
3503 &HashJoinExecutor::<Key32>::process_left_outer_join_non_equi_condition(
3504 chunk,
3505 cond.as_ref(),
3506 &mut state
3507 )
3508 .await
3509 .unwrap()
3510 .compact_vis(),
3511 &expect
3512 ));
3513 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3514 assert!(!state.found_matched);
3515 }
3516
3517 #[tokio::test]
3518 async fn test_process_left_semi_join_non_equi_condition() {
3519 let chunk = DataChunk::from_pretty(
3520 "i f i F
3521 1 3.5 1 5.5
3522 1 3.5 1 2.5
3523 2 4.0 . .
3524 3 5.0 3 4.0
3525 3 5.0 3 3.0
3526 3 5.0 3 4.0
3527 3 5.0 3 3.0
3528 4 1.0 4 0
3529 4 1.0 4 0.5",
3530 );
3531 let expect = DataChunk::from_pretty(
3532 "i f i F
3533 1 3.5 1 5.5",
3534 );
3535 let cond = TestFixture::create_cond();
3536 let mut state = LeftNonEquiJoinState {
3537 probe_column_count: 2,
3538 first_output_row_id: vec![0, 2, 3, 5, 7],
3539 found_matched: false,
3540 ..Default::default()
3541 };
3542 assert!(compare_data_chunk_with_rowsort(
3543 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3544 chunk,
3545 cond.as_ref(),
3546 &mut state
3547 )
3548 .await
3549 .unwrap()
3550 .compact_vis(),
3551 &expect
3552 ));
3553 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3554 assert!(!state.found_matched);
3555
3556 let chunk = DataChunk::from_pretty(
3557 "i f i F
3558 4 1.0 4 0.6
3559 4 1.0 4 2.0
3560 5 4.0 5 .
3561 6 7.0 6 .
3562 6 7.0 6 5.0",
3563 );
3564 let expect = DataChunk::from_pretty(
3565 "i f i F
3566 4 1.0 4 2.0",
3567 );
3568 state.first_output_row_id = vec![2, 3];
3569 assert!(compare_data_chunk_with_rowsort(
3570 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3571 chunk,
3572 cond.as_ref(),
3573 &mut state
3574 )
3575 .await
3576 .unwrap()
3577 .compact_vis(),
3578 &expect
3579 ));
3580 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3581 assert!(!state.found_matched);
3582
3583 let chunk = DataChunk::from_pretty(
3584 "i f i F
3585 4 1.0 4 0.6
3586 4 1.0 4 1.0
3587 5 4.0 5 .
3588 6 7.0 6 .
3589 6 7.0 6 8.0",
3590 );
3591 let expect = DataChunk::from_pretty(
3592 "i f i F
3593 6 7.0 6 8.0",
3594 );
3595 state.first_output_row_id = vec![2, 3];
3596 assert!(compare_data_chunk_with_rowsort(
3597 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<false>(
3598 chunk,
3599 cond.as_ref(),
3600 &mut state
3601 )
3602 .await
3603 .unwrap()
3604 .compact_vis(),
3605 &expect
3606 ));
3607 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3608 }
3609
3610 #[tokio::test]
3611 async fn test_process_left_anti_join_non_equi_condition() {
3612 let chunk = DataChunk::from_pretty(
3613 "i f i F
3614 1 3.5 1 5.5
3615 1 3.5 1 2.5
3616 2 4.0 . .
3617 3 5.0 3 4.0
3618 3 5.0 3 3.0
3619 3 5.0 3 4.0
3620 3 5.0 3 3.0
3621 4 1.0 4 0
3622 4 1.0 4 0.5",
3623 );
3624 let expect = DataChunk::from_pretty(
3625 "i f i F
3626 2 4.0 . .
3627 3 5.0 3 4.0
3628 3 5.0 3 4.0",
3629 );
3630 let cond = TestFixture::create_cond();
3631 let mut state = LeftNonEquiJoinState {
3632 probe_column_count: 2,
3633 first_output_row_id: vec![0, 2, 3, 5, 7],
3634 has_more_output_rows: true,
3635 found_matched: false,
3636 };
3637 assert!(compare_data_chunk_with_rowsort(
3638 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3639 chunk,
3640 cond.as_ref(),
3641 &mut state
3642 )
3643 .await
3644 .unwrap()
3645 .compact_vis(),
3646 &expect
3647 ));
3648 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3649 assert!(!state.found_matched);
3650
3651 let chunk = DataChunk::from_pretty(
3652 "i f i F
3653 4 1.0 4 0.6
3654 4 1.0 4 2.0
3655 5 4.0 5 .
3656 6 7.0 6 .
3657 6 7.0 6 5.0",
3658 );
3659 let expect = DataChunk::from_pretty(
3660 "i f i F
3661 5 4.0 5 .
3662 6 7.0 6 .",
3663 );
3664 state.first_output_row_id = vec![2, 3];
3665 state.has_more_output_rows = false;
3666 assert!(compare_data_chunk_with_rowsort(
3667 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3668 chunk,
3669 cond.as_ref(),
3670 &mut state
3671 )
3672 .await
3673 .unwrap()
3674 .compact_vis(),
3675 &expect
3676 ));
3677 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3678 assert!(!state.found_matched);
3679
3680 let chunk = DataChunk::from_pretty(
3681 "i f i F
3682 4 1.0 4 0.6
3683 4 1.0 4 1.0
3684 5 4.0 5 .
3685 6 7.0 6 .
3686 6 7.0 6 8.0",
3687 );
3688 let expect = DataChunk::from_pretty(
3689 "i f i F
3690 4 1.0 4 0.6
3691 5 4.0 5 .",
3692 );
3693 state.first_output_row_id = vec![2, 3];
3694 state.has_more_output_rows = false;
3695 assert!(compare_data_chunk_with_rowsort(
3696 &HashJoinExecutor::<Key32>::process_left_semi_anti_join_non_equi_condition::<true>(
3697 chunk,
3698 cond.as_ref(),
3699 &mut state
3700 )
3701 .await
3702 .unwrap()
3703 .compact_vis(),
3704 &expect
3705 ));
3706 assert_eq!(state.first_output_row_id, Vec::<usize>::new());
3707 }
3708
3709 #[tokio::test]
3710 async fn test_process_right_outer_join_non_equi_condition() {
3711 let chunk = DataChunk::from_pretty(
3712 "i f i F
3713 1 3.5 1 5.5
3714 1 3.5 1 2.5
3715 3 5.0 3 4.0
3716 3 5.0 3 3.0
3717 3 5.0 3 4.0
3718 3 5.0 3 3.0
3719 4 1.0 4 0
3720 4 1.0 4 0.5",
3721 );
3722 let expect = DataChunk::from_pretty(
3723 "i f i F
3724 1 3.5 1 5.5",
3725 );
3726 let cond = TestFixture::create_cond();
3727 let build_row_matched = ChunkedData::with_chunk_sizes([14].into_iter()).unwrap();
3745 let mut state = RightNonEquiJoinState {
3746 build_row_ids: vec![
3747 RowId::new(0, 0),
3748 RowId::new(0, 1),
3749 RowId::new(0, 3),
3750 RowId::new(0, 4),
3751 RowId::new(0, 3),
3752 RowId::new(0, 4),
3753 RowId::new(0, 5),
3754 RowId::new(0, 7),
3755 ],
3756 build_row_matched,
3757 };
3758 assert!(compare_data_chunk_with_rowsort(
3759 &HashJoinExecutor::<Key32>::process_right_outer_join_non_equi_condition(
3760 chunk,
3761 cond.as_ref(),
3762 &mut state
3763 )
3764 .await
3765 .unwrap()
3766 .compact_vis(),
3767 &expect
3768 ));
3769 assert_eq!(state.build_row_ids, Vec::new());
3770 assert_eq!(
3771 state.build_row_matched,
3772 ChunkedData::try_from(vec![{
3773 let mut v = vec![false; 14];
3774 v[0] = true;
3775 v
3776 }])
3777 .unwrap()
3778 );
3779
3780 let chunk = DataChunk::from_pretty(
3781 "i f i F
3782 4 1.0 4 0.6
3783 4 1.0 4 2.0
3784 5 4.0 5 .
3785 6 7.0 6 .
3786 6 7.0 6 5.0",
3787 );
3788 let expect = DataChunk::from_pretty(
3789 "i f i F
3790 4 1.0 4 2.0",
3791 );
3792 state.build_row_ids = vec![
3793 RowId::new(0, 8),
3794 RowId::new(0, 9),
3795 RowId::new(0, 10),
3796 RowId::new(0, 12),
3797 RowId::new(0, 13),
3798 ];
3799 assert!(compare_data_chunk_with_rowsort(
3800 &HashJoinExecutor::<Key32>::process_right_outer_join_non_equi_condition(
3801 chunk,
3802 cond.as_ref(),
3803 &mut state
3804 )
3805 .await
3806 .unwrap()
3807 .compact_vis(),
3808 &expect
3809 ));
3810 assert_eq!(state.build_row_ids, Vec::new());
3811 assert_eq!(
3812 state.build_row_matched,
3813 ChunkedData::try_from(vec![{
3814 let mut v = vec![false; 14];
3815 v[0] = true;
3816 v[9] = true;
3817 v
3818 }])
3819 .unwrap()
3820 );
3821 }
3822
3823 #[tokio::test]
3824 async fn test_process_right_semi_anti_join_non_equi_condition() {
3825 let chunk = DataChunk::from_pretty(
3826 "i f i F
3827 1 3.5 1 5.5
3828 1 3.5 1 2.5
3829 3 5.0 3 4.0
3830 3 5.0 3 3.0
3831 3 5.0 3 4.0
3832 3 5.0 3 3.0
3833 4 1.0 4 0
3834 4 1.0 4 0.5",
3835 );
3836 let cond = TestFixture::create_cond();
3837 let build_row_matched = ChunkedData::with_chunk_sizes([14].into_iter()).unwrap();
3838 let mut state = RightNonEquiJoinState {
3839 build_row_ids: vec![
3840 RowId::new(0, 0),
3841 RowId::new(0, 1),
3842 RowId::new(0, 3),
3843 RowId::new(0, 4),
3844 RowId::new(0, 3),
3845 RowId::new(0, 4),
3846 RowId::new(0, 5),
3847 RowId::new(0, 7),
3848 ],
3849 build_row_matched,
3850 };
3851
3852 assert!(
3853 HashJoinExecutor::<Key32>::process_right_semi_anti_join_non_equi_condition(
3854 chunk,
3855 cond.as_ref(),
3856 &mut state
3857 )
3858 .await
3859 .is_ok()
3860 );
3861 assert_eq!(state.build_row_ids, Vec::new());
3862 assert_eq!(
3863 state.build_row_matched,
3864 ChunkedData::try_from(vec![{
3865 let mut v = vec![false; 14];
3866 v[0] = true;
3867 v
3868 }])
3869 .unwrap()
3870 );
3871
3872 let chunk = DataChunk::from_pretty(
3873 "i f i F
3874 4 1.0 4 0.6
3875 4 1.0 4 2.0
3876 5 4.0 5 .
3877 6 7.0 6 .
3878 6 7.0 6 5.0",
3879 );
3880 state.build_row_ids = vec![
3881 RowId::new(0, 8),
3882 RowId::new(0, 9),
3883 RowId::new(0, 10),
3884 RowId::new(0, 12),
3885 RowId::new(0, 13),
3886 ];
3887 assert!(
3888 HashJoinExecutor::<Key32>::process_right_semi_anti_join_non_equi_condition(
3889 chunk,
3890 cond.as_ref(),
3891 &mut state
3892 )
3893 .await
3894 .is_ok()
3895 );
3896 assert_eq!(state.build_row_ids, Vec::new());
3897 assert_eq!(
3898 state.build_row_matched,
3899 ChunkedData::try_from(vec![{
3900 let mut v = vec![false; 14];
3901 v[0] = true;
3902 v[9] = true;
3903 v
3904 }])
3905 .unwrap()
3906 );
3907 }
3908
3909 #[tokio::test]
3910 async fn test_process_full_outer_join_non_equi_condition() {
3911 let chunk = DataChunk::from_pretty(
3912 "i f i F
3913 1 3.5 1 5.5
3914 1 3.5 1 2.5
3915 3 5.0 3 4.0
3916 3 5.0 3 3.0
3917 3 5.0 3 4.0
3918 3 5.0 3 3.0
3919 4 1.0 4 0
3920 4 1.0 4 0.5",
3921 );
3922 let expect = DataChunk::from_pretty(
3923 "i f i F
3924 1 3.5 1 5.5
3925 3 5.0 . .
3926 3 5.0 . .",
3927 );
3928 let cond = TestFixture::create_cond();
3929 let mut left_state = LeftNonEquiJoinState {
3930 probe_column_count: 2,
3931 first_output_row_id: vec![0, 2, 4, 6],
3932 has_more_output_rows: true,
3933 found_matched: false,
3934 };
3935 let mut right_state = RightNonEquiJoinState {
3936 build_row_ids: vec![
3937 RowId::new(0, 0),
3938 RowId::new(0, 1),
3939 RowId::new(0, 3),
3940 RowId::new(0, 4),
3941 RowId::new(0, 3),
3942 RowId::new(0, 4),
3943 RowId::new(0, 5),
3944 RowId::new(0, 7),
3945 ],
3946 build_row_matched: ChunkedData::with_chunk_sizes([14].into_iter()).unwrap(),
3947 };
3948 assert!(compare_data_chunk_with_rowsort(
3949 &HashJoinExecutor::<Key32>::process_full_outer_join_non_equi_condition(
3950 chunk,
3951 cond.as_ref(),
3952 &mut left_state,
3953 &mut right_state,
3954 )
3955 .await
3956 .unwrap()
3957 .compact_vis(),
3958 &expect
3959 ));
3960 assert_eq!(left_state.first_output_row_id, Vec::<usize>::new());
3961 assert!(!left_state.found_matched);
3962 assert_eq!(right_state.build_row_ids, Vec::new());
3963 assert_eq!(
3964 right_state.build_row_matched,
3965 ChunkedData::try_from(vec![{
3966 let mut v = vec![false; 14];
3967 v[0] = true;
3968 v
3969 }])
3970 .unwrap()
3971 );
3972
3973 let chunk = DataChunk::from_pretty(
3974 "i f i F
3975 4 1.0 4 0.6
3976 4 1.0 4 2.0
3977 5 4.0 5 .
3978 6 7.0 6 .
3979 6 7.0 6 8.0",
3980 );
3981 let expect = DataChunk::from_pretty(
3982 "i f i F
3983 4 1.0 4 2.0
3984 5 4.0 . .
3985 6 7.0 6 8.0",
3986 );
3987 left_state.first_output_row_id = vec![2, 3];
3988 left_state.has_more_output_rows = false;
3989 right_state.build_row_ids = vec![
3990 RowId::new(0, 8),
3991 RowId::new(0, 9),
3992 RowId::new(0, 10),
3993 RowId::new(0, 12),
3994 RowId::new(0, 13),
3995 ];
3996 assert!(compare_data_chunk_with_rowsort(
3997 &HashJoinExecutor::<Key32>::process_full_outer_join_non_equi_condition(
3998 chunk,
3999 cond.as_ref(),
4000 &mut left_state,
4001 &mut right_state,
4002 )
4003 .await
4004 .unwrap()
4005 .compact_vis(),
4006 &expect
4007 ));
4008 assert_eq!(left_state.first_output_row_id, Vec::<usize>::new());
4009 assert!(left_state.found_matched);
4010 assert_eq!(right_state.build_row_ids, Vec::new());
4011 assert_eq!(
4012 right_state.build_row_matched,
4013 ChunkedData::try_from(vec![{
4014 let mut v = vec![false; 14];
4015 v[0] = true;
4016 v[9] = true;
4017 v[13] = true;
4018 v
4019 }])
4020 .unwrap()
4021 );
4022 }
4023
4024 #[test]
4025 fn test_find_asof_matched_rows_prefers_closest_le_match() {
4026 let probe_chunk = DataChunk::from_pretty(
4030 "i
4031 5",
4032 );
4033 let build_chunk = DataChunk::from_pretty(
4034 "i
4035 10
4036 20",
4037 );
4038 let build_side = vec![build_chunk];
4039
4040 let mut next_row_id = ChunkedData::with_chunk_sizes([2]).unwrap();
4041 next_row_id[RowId::new(0, 0)] = Some(RowId::new(0, 1));
4042 next_row_id[RowId::new(0, 1)] = None;
4043
4044 let asof_desc = AsOfDesc {
4045 left_idx: 0,
4046 right_idx: 0,
4047 inequality_type: AsOfInequalityType::Le,
4048 };
4049
4050 let matched = HashJoinExecutor::<Key32>::find_asof_matched_rows(
4051 probe_chunk.row_at_unchecked_vis(0),
4052 &build_side,
4053 next_row_id.row_id_iter(Some(RowId::new(0, 0))),
4054 &asof_desc,
4055 );
4056
4057 assert_eq!(matched, Some(RowId::new(0, 0)));
4058 }
4059
4060 #[test]
4061 fn test_find_asof_matched_rows_prefers_closest_ge_match() {
4062 let probe_chunk = DataChunk::from_pretty(
4066 "i
4067 12",
4068 );
4069 let build_chunk = DataChunk::from_pretty(
4070 "i
4071 5
4072 10",
4073 );
4074 let build_side = vec![build_chunk];
4075
4076 let mut next_row_id = ChunkedData::with_chunk_sizes([2]).unwrap();
4077 next_row_id[RowId::new(0, 0)] = Some(RowId::new(0, 1));
4078 next_row_id[RowId::new(0, 1)] = None;
4079
4080 let asof_desc = AsOfDesc {
4081 left_idx: 0,
4082 right_idx: 0,
4083 inequality_type: AsOfInequalityType::Ge,
4084 };
4085
4086 let matched = HashJoinExecutor::<Key32>::find_asof_matched_rows(
4087 probe_chunk.row_at_unchecked_vis(0),
4088 &build_side,
4089 next_row_id.row_id_iter(Some(RowId::new(0, 0))),
4090 &asof_desc,
4091 );
4092
4093 assert_eq!(matched, Some(RowId::new(0, 1)));
4094 }
4095
4096 #[tokio::test]
4097 async fn test_batch_hash_join_asof_ge_returns_closest_match() {
4098 let left_schema = Schema {
4099 fields: vec![
4100 Field::unnamed(DataType::Int32),
4101 Field::unnamed(DataType::Int32),
4102 ],
4103 };
4104 let right_schema = Schema {
4105 fields: vec![
4106 Field::unnamed(DataType::Int32),
4107 Field::unnamed(DataType::Int32),
4108 ],
4109 };
4110
4111 let mut left_executor = MockExecutor::new(left_schema);
4112 left_executor.add(DataChunk::from_pretty(
4113 "i i
4114 3 12",
4115 ));
4116
4117 let mut right_executor = MockExecutor::new(right_schema);
4118 right_executor.add(DataChunk::from_pretty(
4119 "i i
4120 3 5
4121 3 10",
4122 ));
4123
4124 let join_executor = Box::new(HashJoinExecutor::<Key32>::new(
4125 JoinType::Inner,
4126 vec![0, 1, 2, 3],
4127 Box::new(left_executor),
4128 Box::new(right_executor),
4129 vec![0],
4130 vec![0],
4131 vec![false],
4132 None,
4133 "HashJoinExecutor".to_owned(),
4134 CHUNK_SIZE,
4135 Some(AsOfDesc {
4136 left_idx: 1,
4137 right_idx: 1,
4138 inequality_type: AsOfInequalityType::Ge,
4139 }),
4140 None,
4141 BatchSpillMetrics::for_test(),
4142 ShutdownToken::empty(),
4143 MemoryContext::new(None, LabelGuardedIntGauge::test_int_gauge::<4>()),
4144 ));
4145
4146 let mut stream = join_executor.execute();
4147 let chunk = stream.next().await.unwrap().unwrap().compact_vis();
4148 let expected = DataChunk::from_pretty(
4149 "i i i i
4150 3 12 3 10",
4151 );
4152
4153 assert!(compare_data_chunk_with_rowsort(&expected, &chunk));
4154 assert!(stream.next().await.is_none());
4155 }
4156
4157 #[tokio::test]
4158 async fn test_shutdown() {
4159 let test_fixture = TestFixture::with_join_type(JoinType::Inner);
4160 test_fixture.do_test_shutdown(false).await;
4161 test_fixture.do_test_shutdown(true).await;
4162
4163 let test_fixture = TestFixture::with_join_type(JoinType::FullOuter);
4164 test_fixture.do_test_shutdown(false).await;
4165 test_fixture.do_test_shutdown(true).await;
4166
4167 let test_fixture = TestFixture::with_join_type(JoinType::LeftAnti);
4168 test_fixture.do_test_shutdown(false).await;
4169 test_fixture.do_test_shutdown(true).await;
4170
4171 let test_fixture = TestFixture::with_join_type(JoinType::LeftOuter);
4172 test_fixture.do_test_shutdown(false).await;
4173 test_fixture.do_test_shutdown(true).await;
4174
4175 let test_fixture = TestFixture::with_join_type(JoinType::LeftSemi);
4176 test_fixture.do_test_shutdown(false).await;
4177 test_fixture.do_test_shutdown(true).await;
4178
4179 let test_fixture = TestFixture::with_join_type(JoinType::RightAnti);
4180 test_fixture.do_test_shutdown(false).await;
4181 test_fixture.do_test_shutdown(true).await;
4182
4183 let test_fixture = TestFixture::with_join_type(JoinType::RightOuter);
4184 test_fixture.do_test_shutdown(false).await;
4185 test_fixture.do_test_shutdown(true).await;
4186
4187 let test_fixture = TestFixture::with_join_type(JoinType::RightSemi);
4188 test_fixture.do_test_shutdown(false).await;
4189 test_fixture.do_test_shutdown(true).await;
4190 }
4191}