1use std::collections::HashMap;
16use std::marker::PhantomData;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
22use risingwave_common::catalog::{ColumnDesc, Field, Schema};
23use risingwave_common::hash::table_distribution::TableDistribution;
24use risingwave_common::hash::{
25 ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, VnodeCountCompat,
26 WorkerSlotId,
27};
28use risingwave_common::memory::MemoryContext;
29use risingwave_common::types::{DataType, Datum};
30use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
31use risingwave_common::util::iter_util::ZipEqFast;
32use risingwave_common::util::scan_range::ScanRange;
33use risingwave_common::util::tracing::TracingContext;
34use risingwave_expr::expr::{BoxedExpression, build_from_prost};
35use risingwave_pb::batch_plan::exchange_info::DistributionMode;
36use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
37use risingwave_pb::batch_plan::plan_node::NodeBody;
38use risingwave_pb::batch_plan::{
39 ExchangeInfo, ExchangeNode, LocalExecutePlan, PbExchangeSource, PbTaskId, PlanFragment,
40 PlanNode, RowSeqScanNode, TaskOutputId,
41};
42use risingwave_pb::common::{BatchQueryEpoch, WorkerNode};
43use risingwave_pb::plan_common::StorageTableDesc;
44
45use super::AsOfDesc;
46use crate::error::Result;
47use crate::executor::{
48 AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, DummyExecutor, Executor,
49 ExecutorBuilder, JoinType, LookupJoinBase, unix_timestamp_sec_to_epoch,
50};
51use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
52
53struct InnerSideExecutorBuilder {
55 table_desc: StorageTableDesc,
56 table_distribution: TableDistribution,
57 vnode_mapping: ExpandedWorkerSlotMapping,
58 outer_side_key_types: Vec<DataType>,
59 inner_side_schema: Schema,
60 inner_side_column_ids: Vec<i32>,
61 inner_side_key_types: Vec<DataType>,
62 lookup_prefix_len: usize,
63 context: Arc<dyn BatchTaskContext>,
64 task_id: TaskId,
65 epoch: BatchQueryEpoch,
66 worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode>,
67 worker_slot_to_scan_range_mapping: HashMap<WorkerSlotId, Vec<(ScanRange, VirtualNode)>>,
68 #[expect(dead_code)]
69 chunk_size: usize,
70 shutdown_rx: ShutdownToken,
71 next_stage_id: usize,
72 as_of: Option<AsOf>,
73}
74
75#[async_trait::async_trait]
77pub trait LookupExecutorBuilder: Send {
78 fn reset(&mut self);
79
80 async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()>;
81
82 async fn build_executor(&mut self) -> Result<BoxedExecutor>;
83}
84
85pub type BoxedLookupExecutorBuilder = Box<dyn LookupExecutorBuilder>;
86
87impl InnerSideExecutorBuilder {
88 fn get_virtual_node(&self, scan_range: &ScanRange) -> Result<VirtualNode> {
90 let virtual_node = scan_range
91 .try_compute_vnode(&self.table_distribution)
92 .context("Could not compute vnode for lookup join")?;
93 Ok(virtual_node)
94 }
95
96 fn create_row_seq_scan_node(&self, id: &WorkerSlotId) -> Result<NodeBody> {
99 let list = self.worker_slot_to_scan_range_mapping.get(id).unwrap();
100 let mut scan_ranges = vec![];
101 let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len());
102
103 list.iter().for_each(|(scan_range, vnode)| {
104 scan_ranges.push(scan_range.to_protobuf());
105 vnode_bitmap.set(vnode.to_index(), true);
106 });
107
108 let row_seq_scan_node = NodeBody::RowSeqScan(RowSeqScanNode {
109 table_desc: Some(self.table_desc.clone()),
110 column_ids: self.inner_side_column_ids.clone(),
111 scan_ranges,
112 ordered: false,
113 vnode_bitmap: Some(vnode_bitmap.finish().to_protobuf()),
114 limit: None,
115 as_of: self.as_of.as_ref().map(Into::into),
116 });
117
118 Ok(row_seq_scan_node)
119 }
120
121 fn build_prost_exchange_source(&self, id: &WorkerSlotId) -> Result<PbExchangeSource> {
123 let worker = self
124 .worker_slot_mapping
125 .get(id)
126 .context("No worker node found for the given worker slot id.")?;
127
128 let local_execute_plan = LocalExecutePlan {
129 plan: Some(PlanFragment {
130 root: Some(PlanNode {
131 children: vec![],
132 identity: "SeqScan".to_owned(),
133 node_body: Some(self.create_row_seq_scan_node(id)?),
134 }),
135 exchange_info: Some(ExchangeInfo {
136 mode: DistributionMode::Single as i32,
137 ..Default::default()
138 }),
139 }),
140 epoch: Some(self.epoch),
141 tracing_context: TracingContext::from_current_span().to_protobuf(),
142 };
143
144 let prost_exchange_source = PbExchangeSource {
145 task_output_id: Some(TaskOutputId {
146 task_id: Some(PbTaskId {
147 query_id: self.task_id.query_id.clone(),
152 stage_id: self.task_id.stage_id + 10000 + self.next_stage_id as u32,
153 task_id: (*id).into(),
154 }),
155 output_id: 0,
156 }),
157 host: Some(worker.host.as_ref().unwrap().clone()),
158 local_execute_plan: Some(Plan(local_execute_plan)),
159 };
160
161 Ok(prost_exchange_source)
162 }
163}
164
165#[async_trait::async_trait]
166impl LookupExecutorBuilder for InnerSideExecutorBuilder {
167 fn reset(&mut self) {
168 self.worker_slot_to_scan_range_mapping = HashMap::new();
169 }
170
171 async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()> {
174 let mut scan_range = ScanRange::full_table_scan();
175
176 for ((datum, outer_type), inner_type) in key_datums
177 .into_iter()
178 .zip_eq_fast(
179 self.outer_side_key_types
180 .iter()
181 .take(self.lookup_prefix_len),
182 )
183 .zip_eq_fast(
184 self.inner_side_key_types
185 .iter()
186 .take(self.lookup_prefix_len),
187 )
188 {
189 let datum = if inner_type == outer_type {
190 datum
191 } else {
192 bail!("Join key types are not aligned: LHS: {outer_type:?}, RHS: {inner_type:?}");
193 };
194
195 scan_range.eq_conds.push(datum);
196 }
197
198 let vnode = self.get_virtual_node(&scan_range)?;
199 let worker_slot_id = self.vnode_mapping[vnode.to_index()];
200
201 let list = self
202 .worker_slot_to_scan_range_mapping
203 .entry(worker_slot_id)
204 .or_default();
205 list.push((scan_range, vnode));
206
207 Ok(())
208 }
209
210 async fn build_executor(&mut self) -> Result<BoxedExecutor> {
213 self.next_stage_id += 1;
214 let mut sources = vec![];
215 for id in self.worker_slot_to_scan_range_mapping.keys() {
216 sources.push(self.build_prost_exchange_source(id)?);
217 }
218
219 if sources.is_empty() {
220 return Ok(Box::new(DummyExecutor {
221 schema: Schema::default(),
222 }));
223 }
224
225 let exchange_node = NodeBody::Exchange(ExchangeNode {
226 sources,
227 sequential: true,
228 input_schema: self.inner_side_schema.to_prost(),
229 });
230
231 let plan_node = PlanNode {
232 children: vec![],
233 identity: "LocalLookupJoinExchangeExecutor".to_owned(),
234 node_body: Some(exchange_node),
235 };
236
237 let task_id = self.task_id.clone();
238
239 let executor_builder = ExecutorBuilder::new(
240 &plan_node,
241 &task_id,
242 self.context.clone(),
243 self.epoch,
244 self.shutdown_rx.clone(),
245 );
246
247 executor_builder.build().await
248 }
249}
250
251pub struct LocalLookupJoinExecutor<K> {
263 base: LookupJoinBase<K>,
264 _phantom: PhantomData<K>,
265}
266
267impl<K: HashKey> Executor for LocalLookupJoinExecutor<K> {
268 fn schema(&self) -> &Schema {
269 &self.base.schema
270 }
271
272 fn identity(&self) -> &str {
273 &self.base.identity
274 }
275
276 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
277 Box::new(self.base).do_execute()
278 }
279}
280
281impl<K> LocalLookupJoinExecutor<K> {
282 pub fn new(base: LookupJoinBase<K>) -> Self {
283 Self {
284 base,
285 _phantom: PhantomData,
286 }
287 }
288}
289
290pub struct LocalLookupJoinExecutorBuilder {}
291
292impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
293 async fn new_boxed_executor(
294 source: &ExecutorBuilder<'_>,
295 inputs: Vec<BoxedExecutor>,
296 ) -> Result<BoxedExecutor> {
297 let [outer_side_input]: [_; 1] = inputs.try_into().unwrap();
298
299 let lookup_join_node = try_match_expand!(
300 source.plan_node().get_node_body().unwrap(),
301 NodeBody::LocalLookupJoin
302 )?;
303 let as_of = lookup_join_node
305 .as_of
306 .as_ref()
307 .map(AsOf::try_from)
308 .transpose()?;
309 let query_epoch = as_of
310 .as_ref()
311 .map(|a| {
312 let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
313 tracing::debug!(epoch, "time travel");
314 risingwave_pb::common::BatchQueryEpoch {
315 epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
316 epoch,
317 )),
318 }
319 })
320 .unwrap_or_else(|| source.epoch());
321
322 let join_type = JoinType::from_prost(lookup_join_node.get_join_type()?);
323 let condition = match lookup_join_node.get_condition() {
324 Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
325 Err(_) => None,
326 };
327
328 let output_indices: Vec<usize> = lookup_join_node
329 .get_output_indices()
330 .iter()
331 .map(|&x| x as usize)
332 .collect();
333
334 let outer_side_data_types = outer_side_input.schema().data_types();
335
336 let table_desc = lookup_join_node.get_inner_side_table_desc()?;
337 let inner_side_column_ids = lookup_join_node.get_inner_side_column_ids().to_vec();
338
339 let inner_side_schema = Schema {
340 fields: inner_side_column_ids
341 .iter()
342 .map(|&id| {
343 let column = table_desc
344 .columns
345 .iter()
346 .find(|c| c.column_id == id)
347 .unwrap();
348 Field::from(&ColumnDesc::from(column))
349 })
350 .collect_vec(),
351 };
352
353 let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
354 outer_side_input.schema().fields.clone()
355 } else {
356 [
357 outer_side_input.schema().fields.clone(),
358 inner_side_schema.fields.clone(),
359 ]
360 .concat()
361 };
362
363 let original_schema = Schema { fields };
364 let actual_schema = output_indices
365 .iter()
366 .map(|&idx| original_schema[idx].clone())
367 .collect();
368
369 let mut outer_side_key_idxs = vec![];
370 for outer_side_key in lookup_join_node.get_outer_side_key() {
371 outer_side_key_idxs.push(*outer_side_key as usize)
372 }
373
374 let outer_side_key_types: Vec<DataType> = outer_side_key_idxs
375 .iter()
376 .map(|&i| outer_side_data_types[i].clone())
377 .collect_vec();
378
379 let lookup_prefix_len: usize = lookup_join_node.get_lookup_prefix_len() as usize;
380
381 let mut inner_side_key_idxs = vec![];
382 for inner_side_key in lookup_join_node.get_inner_side_key() {
383 inner_side_key_idxs.push(*inner_side_key as usize)
384 }
385
386 let inner_side_key_types = inner_side_key_idxs
387 .iter()
388 .map(|&i| inner_side_schema.fields[i].data_type.clone())
389 .collect_vec();
390
391 let null_safe = lookup_join_node.get_null_safe().to_vec();
392
393 let vnode_mapping = lookup_join_node
394 .get_inner_side_vnode_mapping()
395 .iter()
396 .copied()
397 .map(WorkerSlotId::from)
398 .collect_vec();
399
400 assert!(!vnode_mapping.is_empty());
401
402 let chunk_size = source.context().get_config().developer.chunk_size;
403
404 let asof_desc = lookup_join_node
405 .asof_desc
406 .map(|desc| AsOfDesc::from_protobuf(&desc))
407 .transpose()?;
408
409 let worker_nodes = lookup_join_node.get_worker_nodes();
410 let worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode> = worker_nodes
411 .iter()
412 .flat_map(|worker| {
413 (0..(worker.compute_node_parallelism()))
414 .map(|i| (WorkerSlotId::new(worker.id, i), worker.clone()))
415 })
416 .collect();
417
418 let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into());
419
420 let inner_side_builder = InnerSideExecutorBuilder {
421 table_desc: table_desc.clone(),
422 table_distribution: TableDistribution::new_from_storage_table_desc(vnodes, table_desc),
423 vnode_mapping,
424 outer_side_key_types,
425 inner_side_schema,
426 inner_side_column_ids,
427 inner_side_key_types: inner_side_key_types.clone(),
428 lookup_prefix_len,
429 context: source.context().clone(),
430 task_id: source.task_id.clone(),
431 epoch: query_epoch,
432 worker_slot_to_scan_range_mapping: HashMap::new(),
433 chunk_size,
434 shutdown_rx: source.shutdown_rx().clone(),
435 next_stage_id: 0,
436 worker_slot_mapping,
437 as_of,
438 };
439
440 let identity = source.plan_node().get_identity().clone();
441 Ok(LocalLookupJoinExecutorArgs {
442 join_type,
443 condition,
444 outer_side_input,
445 outer_side_data_types,
446 outer_side_key_idxs,
447 inner_side_builder: Box::new(inner_side_builder),
448 inner_side_key_types,
449 inner_side_key_idxs,
450 null_safe,
451 lookup_prefix_len,
452 chunk_builder: DataChunkBuilder::new(original_schema.data_types(), chunk_size),
453 schema: actual_schema,
454 output_indices,
455 chunk_size,
456 asof_desc,
457 identity: identity.clone(),
458 shutdown_rx: source.shutdown_rx().clone(),
459 mem_ctx: source.context().create_executor_mem_context(&identity),
460 }
461 .dispatch())
462 }
463}
464
465struct LocalLookupJoinExecutorArgs {
466 join_type: JoinType,
467 condition: Option<BoxedExpression>,
468 outer_side_input: BoxedExecutor,
469 outer_side_data_types: Vec<DataType>,
470 outer_side_key_idxs: Vec<usize>,
471 inner_side_builder: Box<dyn LookupExecutorBuilder>,
472 inner_side_key_types: Vec<DataType>,
473 inner_side_key_idxs: Vec<usize>,
474 null_safe: Vec<bool>,
475 lookup_prefix_len: usize,
476 chunk_builder: DataChunkBuilder,
477 schema: Schema,
478 output_indices: Vec<usize>,
479 chunk_size: usize,
480 asof_desc: Option<AsOfDesc>,
481 identity: String,
482 shutdown_rx: ShutdownToken,
483 mem_ctx: MemoryContext,
484}
485
486impl HashKeyDispatcher for LocalLookupJoinExecutorArgs {
487 type Output = BoxedExecutor;
488
489 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
490 Box::new(LocalLookupJoinExecutor::<K>::new(LookupJoinBase::<K> {
491 join_type: self.join_type,
492 condition: self.condition,
493 outer_side_input: self.outer_side_input,
494 outer_side_data_types: self.outer_side_data_types,
495 outer_side_key_idxs: self.outer_side_key_idxs,
496 inner_side_builder: self.inner_side_builder,
497 inner_side_key_types: self.inner_side_key_types,
498 inner_side_key_idxs: self.inner_side_key_idxs,
499 null_safe: self.null_safe,
500 lookup_prefix_len: self.lookup_prefix_len,
501 chunk_builder: self.chunk_builder,
502 schema: self.schema,
503 output_indices: self.output_indices,
504 chunk_size: self.chunk_size,
505 asof_desc: self.asof_desc,
506 identity: self.identity,
507 shutdown_rx: self.shutdown_rx,
508 mem_ctx: self.mem_ctx,
509 _phantom: PhantomData,
510 }))
511 }
512
513 fn data_types(&self) -> &[DataType] {
514 &self.inner_side_key_types
515 }
516}
517
518#[cfg(test)]
519mod tests {
520 use std::sync::Arc;
521
522 use risingwave_common::array::{DataChunk, DataChunkTestExt};
523 use risingwave_common::catalog::{Field, Schema};
524 use risingwave_common::hash::HashKeyDispatcher;
525 use risingwave_common::memory::MemoryContext;
526 use risingwave_common::types::DataType;
527 use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
528 use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
529 use risingwave_expr::expr::{BoxedExpression, build_from_pretty};
530
531 use super::LocalLookupJoinExecutorArgs;
532 use crate::executor::join::JoinType;
533 use crate::executor::test_utils::{
534 FakeInnerSideExecutorBuilder, MockExecutor, diff_executor_output,
535 };
536 use crate::executor::{BoxedExecutor, SortExecutor};
537 use crate::monitor::BatchSpillMetrics;
538 use crate::task::ShutdownToken;
539
540 const CHUNK_SIZE: usize = 1024;
541
542 fn create_outer_side_input() -> BoxedExecutor {
543 let schema = Schema {
544 fields: vec![
545 Field::unnamed(DataType::Int32),
546 Field::unnamed(DataType::Float32),
547 ],
548 };
549 let mut executor = MockExecutor::new(schema);
550
551 executor.add(DataChunk::from_pretty(
552 "i f
553 1 6.1
554 2 8.4
555 3 3.9",
556 ));
557
558 executor.add(DataChunk::from_pretty(
559 "i f
560 2 5.5
561 5 4.1
562 5 9.1
563 . .",
564 ));
565
566 Box::new(executor)
567 }
568
569 fn create_lookup_join_executor(
570 join_type: JoinType,
571 condition: Option<BoxedExpression>,
572 null_safe: bool,
573 ) -> BoxedExecutor {
574 let outer_side_input = create_outer_side_input();
575
576 let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
577 outer_side_input.schema().fields.clone()
578 } else {
579 [
580 outer_side_input.schema().fields.clone(),
581 outer_side_input.schema().fields.clone(),
582 ]
583 .concat()
584 };
585 let original_schema = Schema { fields };
586
587 let inner_side_schema = Schema {
588 fields: outer_side_input.schema().fields.clone(),
589 };
590
591 let inner_side_data_types = inner_side_schema.data_types();
592 let outer_side_data_types = outer_side_input.schema().data_types();
593
594 LocalLookupJoinExecutorArgs {
595 join_type,
596 condition,
597 outer_side_input,
598 outer_side_data_types,
599 outer_side_key_idxs: vec![0],
600 inner_side_builder: Box::new(FakeInnerSideExecutorBuilder::new(inner_side_schema)),
601 inner_side_key_types: vec![inner_side_data_types[0].clone()],
602 inner_side_key_idxs: vec![0],
603 null_safe: vec![null_safe],
604 lookup_prefix_len: 1,
605 chunk_builder: DataChunkBuilder::new(original_schema.data_types(), CHUNK_SIZE),
606 schema: original_schema.clone(),
607 output_indices: (0..original_schema.len()).collect(),
608 chunk_size: CHUNK_SIZE,
609 asof_desc: None,
610 identity: "TestLookupJoinExecutor".to_owned(),
611 shutdown_rx: ShutdownToken::empty(),
612 mem_ctx: MemoryContext::none(),
613 }
614 .dispatch()
615 }
616
617 fn create_order_by_executor(child: BoxedExecutor) -> BoxedExecutor {
618 let column_orders = vec![
619 ColumnOrder {
620 column_index: 0,
621 order_type: OrderType::ascending(),
622 },
623 ColumnOrder {
624 column_index: 1,
625 order_type: OrderType::ascending(),
626 },
627 ];
628
629 Box::new(SortExecutor::new(
630 child,
631 Arc::new(column_orders),
632 "SortExecutor".into(),
633 CHUNK_SIZE,
634 MemoryContext::none(),
635 None,
636 BatchSpillMetrics::for_test(),
637 ))
638 }
639
640 async fn do_test(
641 join_type: JoinType,
642 condition: Option<BoxedExpression>,
643 null_safe: bool,
644 expected: DataChunk,
645 ) {
646 let lookup_join_executor = create_lookup_join_executor(join_type, condition, null_safe);
647 let order_by_executor = create_order_by_executor(lookup_join_executor);
648 let mut expected_mock_exec = MockExecutor::new(order_by_executor.schema().clone());
649 expected_mock_exec.add(expected);
650 diff_executor_output(order_by_executor, Box::new(expected_mock_exec)).await;
651 }
652
653 #[tokio::test]
654 async fn test_inner_join() {
655 let expected = DataChunk::from_pretty(
656 "i f i f
657 1 6.1 1 9.2
658 2 5.5 2 5.5
659 2 5.5 2 4.4
660 2 8.4 2 5.5
661 2 8.4 2 4.4
662 5 4.1 5 2.3
663 5 4.1 5 3.7
664 5 9.1 5 2.3
665 5 9.1 5 3.7",
666 );
667
668 do_test(JoinType::Inner, None, false, expected).await;
669 }
670
671 #[tokio::test]
672 async fn test_null_safe_inner_join() {
673 let expected = DataChunk::from_pretty(
674 "i f i f
675 1 6.1 1 9.2
676 2 5.5 2 5.5
677 2 5.5 2 4.4
678 2 8.4 2 5.5
679 2 8.4 2 4.4
680 5 4.1 5 2.3
681 5 4.1 5 3.7
682 5 9.1 5 2.3
683 5 9.1 5 3.7
684 . . . .",
685 );
686
687 do_test(JoinType::Inner, None, true, expected).await;
688 }
689
690 #[tokio::test]
691 async fn test_left_outer_join() {
692 let expected = DataChunk::from_pretty(
693 "i f i f
694 1 6.1 1 9.2
695 2 5.5 2 5.5
696 2 5.5 2 4.4
697 2 8.4 2 5.5
698 2 8.4 2 4.4
699 3 3.9 . .
700 5 4.1 5 2.3
701 5 4.1 5 3.7
702 5 9.1 5 2.3
703 5 9.1 5 3.7
704 . . . .",
705 );
706
707 do_test(JoinType::LeftOuter, None, false, expected).await;
708 }
709
710 #[tokio::test]
711 async fn test_left_semi_join() {
712 let expected = DataChunk::from_pretty(
713 "i f
714 1 6.1
715 2 5.5
716 2 8.4
717 5 4.1
718 5 9.1",
719 );
720
721 do_test(JoinType::LeftSemi, None, false, expected).await;
722 }
723
724 #[tokio::test]
725 async fn test_left_anti_join() {
726 let expected = DataChunk::from_pretty(
727 "i f
728 3 3.9
729 . .",
730 );
731
732 do_test(JoinType::LeftAnti, None, false, expected).await;
733 }
734
735 #[tokio::test]
736 async fn test_inner_join_with_condition() {
737 let expected = DataChunk::from_pretty(
738 "i f i f
739 1 6.1 1 9.2
740 2 5.5 2 5.5
741 2 8.4 2 5.5",
742 );
743 let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
744
745 do_test(JoinType::Inner, Some(condition), false, expected).await;
746 }
747
748 #[tokio::test]
749 async fn test_left_outer_join_with_condition() {
750 let expected = DataChunk::from_pretty(
751 "i f i f
752 1 6.1 1 9.2
753 2 5.5 2 5.5
754 2 8.4 2 5.5
755 3 3.9 . .
756 5 4.1 . .
757 5 9.1 . .
758 . . . .",
759 );
760 let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
761
762 do_test(JoinType::LeftOuter, Some(condition), false, expected).await;
763 }
764
765 #[tokio::test]
766 async fn test_left_semi_join_with_condition() {
767 let expected = DataChunk::from_pretty(
768 "i f
769 1 6.1
770 2 5.5
771 2 8.4",
772 );
773 let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
774
775 do_test(JoinType::LeftSemi, Some(condition), false, expected).await;
776 }
777
778 #[tokio::test]
779 async fn test_left_anti_join_with_condition() {
780 let expected = DataChunk::from_pretty(
781 "i f
782 3 3.9
783 5 4.1
784 5 9.1
785 . .",
786 );
787 let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
788
789 do_test(JoinType::LeftAnti, Some(condition), false, expected).await;
790 }
791}