1use std::collections::HashMap;
16use std::marker::PhantomData;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
22use risingwave_common::catalog::{ColumnDesc, Field, Schema};
23use risingwave_common::hash::table_distribution::TableDistribution;
24use risingwave_common::hash::{
25 ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, VnodeCountCompat,
26 WorkerSlotId,
27};
28use risingwave_common::memory::MemoryContext;
29use risingwave_common::types::{DataType, Datum};
30use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
31use risingwave_common::util::iter_util::ZipEqFast;
32use risingwave_common::util::scan_range::ScanRange;
33use risingwave_common::util::tracing::TracingContext;
34use risingwave_expr::expr::{BoxedExpression, build_from_prost};
35use risingwave_pb::batch_plan::exchange_info::DistributionMode;
36use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
37use risingwave_pb::batch_plan::plan_node::NodeBody;
38use risingwave_pb::batch_plan::{
39 ExchangeInfo, ExchangeNode, LocalExecutePlan, PbExchangeSource, PbTaskId, PlanFragment,
40 PlanNode, RowSeqScanNode, TaskOutputId,
41};
42use risingwave_pb::common::{BatchQueryEpoch, WorkerNode};
43use risingwave_pb::plan_common::StorageTableDesc;
44
45use super::AsOfDesc;
46use crate::error::Result;
47use crate::executor::{
48 AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, DummyExecutor, Executor,
49 ExecutorBuilder, JoinType, LookupJoinBase, unix_timestamp_sec_to_epoch,
50};
51use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
52
53struct InnerSideExecutorBuilder {
55 table_desc: StorageTableDesc,
56 table_distribution: TableDistribution,
57 vnode_mapping: ExpandedWorkerSlotMapping,
58 outer_side_key_types: Vec<DataType>,
59 inner_side_schema: Schema,
60 inner_side_column_ids: Vec<i32>,
61 inner_side_key_types: Vec<DataType>,
62 lookup_prefix_len: usize,
63 context: Arc<dyn BatchTaskContext>,
64 task_id: TaskId,
65 epoch: BatchQueryEpoch,
66 worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode>,
67 worker_slot_to_scan_range_mapping: HashMap<WorkerSlotId, Vec<(ScanRange, VirtualNode)>>,
68 #[expect(dead_code)]
69 chunk_size: usize,
70 shutdown_rx: ShutdownToken,
71 next_stage_id: usize,
72 as_of: Option<AsOf>,
73}
74
75pub trait LookupExecutorBuilder: Send + 'static {
77 fn reset(&mut self);
78
79 fn add_scan_range(
80 &mut self,
81 key_datums: Vec<Datum>,
82 ) -> impl Future<Output = Result<()>> + Send + '_;
83
84 fn build_executor(&mut self) -> impl Future<Output = Result<BoxedExecutor>> + Send + '_;
85}
86
87impl InnerSideExecutorBuilder {
88 fn get_virtual_node(&self, scan_range: &ScanRange) -> Result<VirtualNode> {
90 let virtual_node = scan_range
91 .try_compute_vnode(&self.table_distribution)
92 .context("Could not compute vnode for lookup join")?;
93 Ok(virtual_node)
94 }
95
96 fn create_row_seq_scan_node(&self, id: &WorkerSlotId) -> Result<NodeBody> {
99 let list = self.worker_slot_to_scan_range_mapping.get(id).unwrap();
100 let mut scan_ranges = vec![];
101 let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len());
102
103 list.iter().for_each(|(scan_range, vnode)| {
104 scan_ranges.push(scan_range.to_protobuf());
105 vnode_bitmap.set(vnode.to_index(), true);
106 });
107
108 let row_seq_scan_node = NodeBody::RowSeqScan(RowSeqScanNode {
109 table_desc: Some(self.table_desc.clone()),
110 column_ids: self.inner_side_column_ids.clone(),
111 scan_ranges,
112 ordered: false,
113 vnode_bitmap: Some(vnode_bitmap.finish().to_protobuf()),
114 limit: None,
115 as_of: self.as_of.as_ref().map(Into::into),
116 });
117
118 Ok(row_seq_scan_node)
119 }
120
121 fn build_prost_exchange_source(&self, id: &WorkerSlotId) -> Result<PbExchangeSource> {
123 let worker = self
124 .worker_slot_mapping
125 .get(id)
126 .context("No worker node found for the given worker slot id.")?;
127
128 let local_execute_plan = LocalExecutePlan {
129 plan: Some(PlanFragment {
130 root: Some(PlanNode {
131 children: vec![],
132 identity: "SeqScan".to_owned(),
133 node_body: Some(self.create_row_seq_scan_node(id)?),
134 }),
135 exchange_info: Some(ExchangeInfo {
136 mode: DistributionMode::Single as i32,
137 ..Default::default()
138 }),
139 }),
140 epoch: Some(self.epoch),
141 tracing_context: TracingContext::from_current_span().to_protobuf(),
142 };
143
144 let prost_exchange_source = PbExchangeSource {
145 task_output_id: Some(TaskOutputId {
146 task_id: Some(PbTaskId {
147 query_id: self.task_id.query_id.clone(),
152 stage_id: self.task_id.stage_id + 10000 + self.next_stage_id as u32,
153 task_id: (*id).into(),
154 }),
155 output_id: 0,
156 }),
157 host: Some(worker.host.as_ref().unwrap().clone()),
158 local_execute_plan: Some(Plan(local_execute_plan)),
159 };
160
161 Ok(prost_exchange_source)
162 }
163}
164
165impl LookupExecutorBuilder for InnerSideExecutorBuilder {
166 fn reset(&mut self) {
167 self.worker_slot_to_scan_range_mapping = HashMap::new();
168 }
169
170 async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()> {
173 let mut scan_range = ScanRange::full_table_scan();
174
175 for ((datum, outer_type), inner_type) in key_datums
176 .into_iter()
177 .zip_eq_fast(
178 self.outer_side_key_types
179 .iter()
180 .take(self.lookup_prefix_len),
181 )
182 .zip_eq_fast(
183 self.inner_side_key_types
184 .iter()
185 .take(self.lookup_prefix_len),
186 )
187 {
188 let datum = if inner_type == outer_type {
189 datum
190 } else {
191 bail!("Join key types are not aligned: LHS: {outer_type:?}, RHS: {inner_type:?}");
192 };
193
194 scan_range.eq_conds.push(datum);
195 }
196
197 let vnode = self.get_virtual_node(&scan_range)?;
198 let worker_slot_id = self.vnode_mapping[vnode.to_index()];
199
200 let list = self
201 .worker_slot_to_scan_range_mapping
202 .entry(worker_slot_id)
203 .or_default();
204 list.push((scan_range, vnode));
205
206 Ok(())
207 }
208
209 async fn build_executor(&mut self) -> Result<BoxedExecutor> {
212 self.next_stage_id += 1;
213 let mut sources = vec![];
214 for id in self.worker_slot_to_scan_range_mapping.keys() {
215 sources.push(self.build_prost_exchange_source(id)?);
216 }
217
218 if sources.is_empty() {
219 return Ok(Box::new(DummyExecutor {
220 schema: Schema::default(),
221 }));
222 }
223
224 let exchange_node = NodeBody::Exchange(ExchangeNode {
225 sources,
226 sequential: true,
227 input_schema: self.inner_side_schema.to_prost(),
228 });
229
230 let plan_node = PlanNode {
231 children: vec![],
232 identity: "LocalLookupJoinExchangeExecutor".to_owned(),
233 node_body: Some(exchange_node),
234 };
235
236 let task_id = self.task_id.clone();
237
238 let executor_builder = ExecutorBuilder::new(
239 &plan_node,
240 &task_id,
241 self.context.clone(),
242 self.epoch,
243 self.shutdown_rx.clone(),
244 );
245
246 executor_builder.build().await
247 }
248}
249
250pub struct LocalLookupJoinExecutor<K, B: LookupExecutorBuilder> {
262 base: LookupJoinBase<K, B>,
263}
264
265impl<K: HashKey, B: LookupExecutorBuilder> Executor for LocalLookupJoinExecutor<K, B> {
266 fn schema(&self) -> &Schema {
267 &self.base.schema
268 }
269
270 fn identity(&self) -> &str {
271 &self.base.identity
272 }
273
274 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
275 Box::new(self.base).do_execute()
276 }
277}
278
279impl<K, B: LookupExecutorBuilder> LocalLookupJoinExecutor<K, B> {
280 pub fn new(base: LookupJoinBase<K, B>) -> Self {
281 Self { base }
282 }
283}
284
285pub struct LocalLookupJoinExecutorBuilder {}
286
287impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
288 async fn new_boxed_executor(
289 source: &ExecutorBuilder<'_>,
290 inputs: Vec<BoxedExecutor>,
291 ) -> Result<BoxedExecutor> {
292 let [outer_side_input]: [_; 1] = inputs.try_into().unwrap();
293
294 let lookup_join_node = try_match_expand!(
295 source.plan_node().get_node_body().unwrap(),
296 NodeBody::LocalLookupJoin
297 )?;
298 let as_of = lookup_join_node
300 .as_of
301 .as_ref()
302 .map(AsOf::try_from)
303 .transpose()?;
304 let query_epoch = as_of
305 .as_ref()
306 .map(|a| {
307 let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
308 tracing::debug!(epoch, "time travel");
309 risingwave_pb::common::BatchQueryEpoch {
310 epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
311 epoch,
312 )),
313 }
314 })
315 .unwrap_or_else(|| source.epoch());
316
317 let join_type = JoinType::from_prost(lookup_join_node.get_join_type()?);
318 let condition = match lookup_join_node.get_condition() {
319 Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
320 Err(_) => None,
321 };
322
323 let output_indices: Vec<usize> = lookup_join_node
324 .get_output_indices()
325 .iter()
326 .map(|&x| x as usize)
327 .collect();
328
329 let outer_side_data_types = outer_side_input.schema().data_types();
330
331 let table_desc = lookup_join_node.get_inner_side_table_desc()?;
332 let inner_side_column_ids = lookup_join_node.get_inner_side_column_ids().to_vec();
333
334 let inner_side_schema = Schema {
335 fields: inner_side_column_ids
336 .iter()
337 .map(|&id| {
338 let column = table_desc
339 .columns
340 .iter()
341 .find(|c| c.column_id == id)
342 .unwrap();
343 Field::from(&ColumnDesc::from(column))
344 })
345 .collect_vec(),
346 };
347
348 let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
349 outer_side_input.schema().fields.clone()
350 } else {
351 [
352 outer_side_input.schema().fields.clone(),
353 inner_side_schema.fields.clone(),
354 ]
355 .concat()
356 };
357
358 let original_schema = Schema { fields };
359 let actual_schema = output_indices
360 .iter()
361 .map(|&idx| original_schema[idx].clone())
362 .collect();
363
364 let mut outer_side_key_idxs = vec![];
365 for outer_side_key in lookup_join_node.get_outer_side_key() {
366 outer_side_key_idxs.push(*outer_side_key as usize)
367 }
368
369 let outer_side_key_types: Vec<DataType> = outer_side_key_idxs
370 .iter()
371 .map(|&i| outer_side_data_types[i].clone())
372 .collect_vec();
373
374 let lookup_prefix_len: usize = lookup_join_node.get_lookup_prefix_len() as usize;
375
376 let mut inner_side_key_idxs = vec![];
377 for inner_side_key in lookup_join_node.get_inner_side_key() {
378 inner_side_key_idxs.push(*inner_side_key as usize)
379 }
380
381 let inner_side_key_types = inner_side_key_idxs
382 .iter()
383 .map(|&i| inner_side_schema.fields[i].data_type.clone())
384 .collect_vec();
385
386 let null_safe = lookup_join_node.get_null_safe().to_vec();
387
388 let vnode_mapping = lookup_join_node
389 .get_inner_side_vnode_mapping()
390 .iter()
391 .copied()
392 .map(WorkerSlotId::from)
393 .collect_vec();
394
395 assert!(!vnode_mapping.is_empty());
396
397 let chunk_size = source.context().get_config().developer.chunk_size;
398
399 let asof_desc = lookup_join_node
400 .asof_desc
401 .map(|desc| AsOfDesc::from_protobuf(&desc))
402 .transpose()?;
403
404 let worker_nodes = lookup_join_node.get_worker_nodes();
405 let worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode> = worker_nodes
406 .iter()
407 .flat_map(|worker| {
408 (0..(worker.compute_node_parallelism()))
409 .map(|i| (WorkerSlotId::new(worker.id, i), worker.clone()))
410 })
411 .collect();
412
413 let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into());
414
415 let inner_side_builder = InnerSideExecutorBuilder {
416 table_desc: table_desc.clone(),
417 table_distribution: TableDistribution::new_from_storage_table_desc(vnodes, table_desc),
418 vnode_mapping,
419 outer_side_key_types,
420 inner_side_schema,
421 inner_side_column_ids,
422 inner_side_key_types: inner_side_key_types.clone(),
423 lookup_prefix_len,
424 context: source.context().clone(),
425 task_id: source.task_id.clone(),
426 epoch: query_epoch,
427 worker_slot_to_scan_range_mapping: HashMap::new(),
428 chunk_size,
429 shutdown_rx: source.shutdown_rx().clone(),
430 next_stage_id: 0,
431 worker_slot_mapping,
432 as_of,
433 };
434
435 let identity = source.plan_node().get_identity().clone();
436 Ok(LocalLookupJoinExecutorArgs {
437 join_type,
438 condition,
439 outer_side_input,
440 outer_side_data_types,
441 outer_side_key_idxs,
442 inner_side_builder,
443 inner_side_key_types,
444 inner_side_key_idxs,
445 null_safe,
446 lookup_prefix_len,
447 chunk_builder: DataChunkBuilder::new(original_schema.data_types(), chunk_size),
448 schema: actual_schema,
449 output_indices,
450 chunk_size,
451 asof_desc,
452 identity: identity.clone(),
453 shutdown_rx: source.shutdown_rx().clone(),
454 mem_ctx: source.context().create_executor_mem_context(&identity),
455 }
456 .dispatch())
457 }
458}
459
460struct LocalLookupJoinExecutorArgs<B: LookupExecutorBuilder> {
461 join_type: JoinType,
462 condition: Option<BoxedExpression>,
463 outer_side_input: BoxedExecutor,
464 outer_side_data_types: Vec<DataType>,
465 outer_side_key_idxs: Vec<usize>,
466 inner_side_builder: B,
467 inner_side_key_types: Vec<DataType>,
468 inner_side_key_idxs: Vec<usize>,
469 null_safe: Vec<bool>,
470 lookup_prefix_len: usize,
471 chunk_builder: DataChunkBuilder,
472 schema: Schema,
473 output_indices: Vec<usize>,
474 chunk_size: usize,
475 asof_desc: Option<AsOfDesc>,
476 identity: String,
477 shutdown_rx: ShutdownToken,
478 mem_ctx: MemoryContext,
479}
480
481impl<B: LookupExecutorBuilder> HashKeyDispatcher for LocalLookupJoinExecutorArgs<B> {
482 type Output = BoxedExecutor;
483
484 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
485 Box::new(LocalLookupJoinExecutor::<K, B>::new(LookupJoinBase {
486 join_type: self.join_type,
487 condition: self.condition,
488 outer_side_input: self.outer_side_input,
489 outer_side_data_types: self.outer_side_data_types,
490 outer_side_key_idxs: self.outer_side_key_idxs,
491 inner_side_builder: self.inner_side_builder,
492 inner_side_key_types: self.inner_side_key_types,
493 inner_side_key_idxs: self.inner_side_key_idxs,
494 null_safe: self.null_safe,
495 lookup_prefix_len: self.lookup_prefix_len,
496 chunk_builder: self.chunk_builder,
497 schema: self.schema,
498 output_indices: self.output_indices,
499 chunk_size: self.chunk_size,
500 asof_desc: self.asof_desc,
501 identity: self.identity,
502 shutdown_rx: self.shutdown_rx,
503 mem_ctx: self.mem_ctx,
504 _phantom: PhantomData,
505 }))
506 }
507
508 fn data_types(&self) -> &[DataType] {
509 &self.inner_side_key_types
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use std::sync::Arc;
516
517 use risingwave_common::array::{DataChunk, DataChunkTestExt};
518 use risingwave_common::catalog::{Field, Schema};
519 use risingwave_common::hash::HashKeyDispatcher;
520 use risingwave_common::memory::MemoryContext;
521 use risingwave_common::types::DataType;
522 use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
523 use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
524 use risingwave_expr::expr::{BoxedExpression, build_from_pretty};
525
526 use crate::executor::join::JoinType;
527 use crate::executor::test_utils::{
528 FakeInnerSideExecutorBuilder, MockExecutor, diff_executor_output,
529 };
530 use crate::executor::{BoxedExecutor, SortExecutor};
531 use crate::local_lookup_join::LocalLookupJoinExecutorArgs;
532 use crate::monitor::BatchSpillMetrics;
533 use crate::task::ShutdownToken;
534
535 const CHUNK_SIZE: usize = 1024;
536
537 fn create_outer_side_input() -> BoxedExecutor {
538 let schema = Schema {
539 fields: vec![
540 Field::unnamed(DataType::Int32),
541 Field::unnamed(DataType::Float32),
542 ],
543 };
544 let mut executor = MockExecutor::new(schema);
545
546 executor.add(DataChunk::from_pretty(
547 "i f
548 1 6.1
549 2 8.4
550 3 3.9",
551 ));
552
553 executor.add(DataChunk::from_pretty(
554 "i f
555 2 5.5
556 5 4.1
557 5 9.1
558 . .",
559 ));
560
561 Box::new(executor)
562 }
563
564 fn create_lookup_join_executor(
565 join_type: JoinType,
566 condition: Option<BoxedExpression>,
567 null_safe: bool,
568 ) -> BoxedExecutor {
569 let outer_side_input = create_outer_side_input();
570
571 let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
572 outer_side_input.schema().fields.clone()
573 } else {
574 [
575 outer_side_input.schema().fields.clone(),
576 outer_side_input.schema().fields.clone(),
577 ]
578 .concat()
579 };
580 let original_schema = Schema { fields };
581
582 let inner_side_schema = Schema {
583 fields: outer_side_input.schema().fields.clone(),
584 };
585
586 let inner_side_data_types = inner_side_schema.data_types();
587 let outer_side_data_types = outer_side_input.schema().data_types();
588
589 LocalLookupJoinExecutorArgs {
590 join_type,
591 condition,
592 outer_side_input,
593 outer_side_data_types,
594 outer_side_key_idxs: vec![0],
595 inner_side_builder: FakeInnerSideExecutorBuilder::new(inner_side_schema),
596 inner_side_key_types: vec![inner_side_data_types[0].clone()],
597 inner_side_key_idxs: vec![0],
598 null_safe: vec![null_safe],
599 lookup_prefix_len: 1,
600 chunk_builder: DataChunkBuilder::new(original_schema.data_types(), CHUNK_SIZE),
601 schema: original_schema.clone(),
602 output_indices: (0..original_schema.len()).collect(),
603 chunk_size: CHUNK_SIZE,
604 asof_desc: None,
605 identity: "TestLookupJoinExecutor".to_owned(),
606 shutdown_rx: ShutdownToken::empty(),
607 mem_ctx: MemoryContext::none(),
608 }
609 .dispatch()
610 }
611
612 fn create_order_by_executor(child: BoxedExecutor) -> BoxedExecutor {
613 let column_orders = vec![
614 ColumnOrder {
615 column_index: 0,
616 order_type: OrderType::ascending(),
617 },
618 ColumnOrder {
619 column_index: 1,
620 order_type: OrderType::ascending(),
621 },
622 ];
623
624 Box::new(SortExecutor::new(
625 child,
626 Arc::new(column_orders),
627 "SortExecutor".into(),
628 CHUNK_SIZE,
629 MemoryContext::none(),
630 None,
631 BatchSpillMetrics::for_test(),
632 ))
633 }
634
635 async fn do_test(
636 join_type: JoinType,
637 condition: Option<BoxedExpression>,
638 null_safe: bool,
639 expected: DataChunk,
640 ) {
641 let lookup_join_executor = create_lookup_join_executor(join_type, condition, null_safe);
642 let order_by_executor = create_order_by_executor(lookup_join_executor);
643 let mut expected_mock_exec = MockExecutor::new(order_by_executor.schema().clone());
644 expected_mock_exec.add(expected);
645 diff_executor_output(order_by_executor, Box::new(expected_mock_exec)).await;
646 }
647
648 #[tokio::test]
649 async fn test_inner_join() {
650 let expected = DataChunk::from_pretty(
651 "i f i f
652 1 6.1 1 9.2
653 2 5.5 2 5.5
654 2 5.5 2 4.4
655 2 8.4 2 5.5
656 2 8.4 2 4.4
657 5 4.1 5 2.3
658 5 4.1 5 3.7
659 5 9.1 5 2.3
660 5 9.1 5 3.7",
661 );
662
663 do_test(JoinType::Inner, None, false, expected).await;
664 }
665
666 #[tokio::test]
667 async fn test_null_safe_inner_join() {
668 let expected = DataChunk::from_pretty(
669 "i f i f
670 1 6.1 1 9.2
671 2 5.5 2 5.5
672 2 5.5 2 4.4
673 2 8.4 2 5.5
674 2 8.4 2 4.4
675 5 4.1 5 2.3
676 5 4.1 5 3.7
677 5 9.1 5 2.3
678 5 9.1 5 3.7
679 . . . .",
680 );
681
682 do_test(JoinType::Inner, None, true, expected).await;
683 }
684
685 #[tokio::test]
686 async fn test_left_outer_join() {
687 let expected = DataChunk::from_pretty(
688 "i f i f
689 1 6.1 1 9.2
690 2 5.5 2 5.5
691 2 5.5 2 4.4
692 2 8.4 2 5.5
693 2 8.4 2 4.4
694 3 3.9 . .
695 5 4.1 5 2.3
696 5 4.1 5 3.7
697 5 9.1 5 2.3
698 5 9.1 5 3.7
699 . . . .",
700 );
701
702 do_test(JoinType::LeftOuter, None, false, expected).await;
703 }
704
705 #[tokio::test]
706 async fn test_left_semi_join() {
707 let expected = DataChunk::from_pretty(
708 "i f
709 1 6.1
710 2 5.5
711 2 8.4
712 5 4.1
713 5 9.1",
714 );
715
716 do_test(JoinType::LeftSemi, None, false, expected).await;
717 }
718
719 #[tokio::test]
720 async fn test_left_anti_join() {
721 let expected = DataChunk::from_pretty(
722 "i f
723 3 3.9
724 . .",
725 );
726
727 do_test(JoinType::LeftAnti, None, false, expected).await;
728 }
729
730 #[tokio::test]
731 async fn test_inner_join_with_condition() {
732 let expected = DataChunk::from_pretty(
733 "i f i f
734 1 6.1 1 9.2
735 2 5.5 2 5.5
736 2 8.4 2 5.5",
737 );
738 let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
739
740 do_test(JoinType::Inner, Some(condition), false, expected).await;
741 }
742
743 #[tokio::test]
744 async fn test_left_outer_join_with_condition() {
745 let expected = DataChunk::from_pretty(
746 "i f i f
747 1 6.1 1 9.2
748 2 5.5 2 5.5
749 2 8.4 2 5.5
750 3 3.9 . .
751 5 4.1 . .
752 5 9.1 . .
753 . . . .",
754 );
755 let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
756
757 do_test(JoinType::LeftOuter, Some(condition), false, expected).await;
758 }
759
760 #[tokio::test]
761 async fn test_left_semi_join_with_condition() {
762 let expected = DataChunk::from_pretty(
763 "i f
764 1 6.1
765 2 5.5
766 2 8.4",
767 );
768 let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
769
770 do_test(JoinType::LeftSemi, Some(condition), false, expected).await;
771 }
772
773 #[tokio::test]
774 async fn test_left_anti_join_with_condition() {
775 let expected = DataChunk::from_pretty(
776 "i f
777 3 3.9
778 5 4.1
779 5 9.1
780 . .",
781 );
782 let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
783
784 do_test(JoinType::LeftAnti, Some(condition), false, expected).await;
785 }
786}