1use std::collections::HashMap;
16use std::marker::PhantomData;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
22use risingwave_common::catalog::{ColumnDesc, Field, Schema};
23use risingwave_common::hash::table_distribution::TableDistribution;
24use risingwave_common::hash::{
25 ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, VnodeCountCompat,
26 WorkerSlotId,
27};
28use risingwave_common::memory::MemoryContext;
29use risingwave_common::types::{DataType, Datum};
30use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
31use risingwave_common::util::iter_util::ZipEqFast;
32use risingwave_common::util::scan_range::ScanRange;
33use risingwave_common::util::tracing::TracingContext;
34use risingwave_expr::expr::{BoxedExpression, build_from_prost};
35use risingwave_pb::batch_plan::exchange_info::DistributionMode;
36use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
37use risingwave_pb::batch_plan::plan_node::NodeBody;
38use risingwave_pb::batch_plan::{
39 ExchangeInfo, ExchangeNode, LocalExecutePlan, PbExchangeSource, PbTaskId, PlanFragment,
40 PlanNode, RowSeqScanNode, TaskOutputId,
41};
42use risingwave_pb::common::{BatchQueryEpoch, WorkerNode};
43use risingwave_pb::plan_common::StorageTableDesc;
44
45use super::AsOfDesc;
46use crate::error::Result;
47use crate::executor::{
48 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, DummyExecutor, Executor,
49 ExecutorBuilder, JoinType, LookupJoinBase,
50};
51use crate::task::{BatchTaskContext, ShutdownToken, TaskId};
52
53struct InnerSideExecutorBuilder {
55 table_desc: StorageTableDesc,
56 table_distribution: TableDistribution,
57 vnode_mapping: ExpandedWorkerSlotMapping,
58 outer_side_key_types: Vec<DataType>,
59 inner_side_schema: Schema,
60 inner_side_column_ids: Vec<i32>,
61 inner_side_key_types: Vec<DataType>,
62 lookup_prefix_len: usize,
63 context: Arc<dyn BatchTaskContext>,
64 task_id: TaskId,
65 query_epoch: BatchQueryEpoch,
66 worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode>,
67 worker_slot_to_scan_range_mapping: HashMap<WorkerSlotId, Vec<(ScanRange, VirtualNode)>>,
68 #[expect(dead_code)]
69 chunk_size: usize,
70 shutdown_rx: ShutdownToken,
71 next_stage_id: usize,
72}
73
74pub trait LookupExecutorBuilder: Send + 'static {
76 fn reset(&mut self);
77
78 fn add_scan_range(
79 &mut self,
80 key_datums: Vec<Datum>,
81 ) -> impl Future<Output = Result<()>> + Send + '_;
82
83 fn build_executor(&mut self) -> impl Future<Output = Result<BoxedExecutor>> + Send + '_;
84}
85
86impl InnerSideExecutorBuilder {
87 fn get_virtual_node(&self, scan_range: &ScanRange) -> Result<VirtualNode> {
89 let virtual_node = scan_range
90 .try_compute_vnode(&self.table_distribution)
91 .context("Could not compute vnode for lookup join")?;
92 Ok(virtual_node)
93 }
94
95 fn create_row_seq_scan_node(&self, id: &WorkerSlotId) -> Result<NodeBody> {
98 let list = self.worker_slot_to_scan_range_mapping.get(id).unwrap();
99 let mut scan_ranges = vec![];
100 let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len());
101
102 list.iter().for_each(|(scan_range, vnode)| {
103 scan_ranges.push(scan_range.to_protobuf());
104 vnode_bitmap.set(vnode.to_index(), true);
105 });
106
107 let row_seq_scan_node = NodeBody::RowSeqScan(RowSeqScanNode {
108 table_desc: Some(self.table_desc.clone()),
109 column_ids: self.inner_side_column_ids.clone(),
110 scan_ranges,
111 ordered: false,
112 vnode_bitmap: Some(vnode_bitmap.finish().to_protobuf()),
113 limit: None,
114 query_epoch: Some(self.query_epoch),
115 });
116
117 Ok(row_seq_scan_node)
118 }
119
120 fn build_prost_exchange_source(&self, id: &WorkerSlotId) -> Result<PbExchangeSource> {
122 let worker = self
123 .worker_slot_mapping
124 .get(id)
125 .context("No worker node found for the given worker slot id.")?;
126
127 let local_execute_plan = LocalExecutePlan {
128 plan: Some(PlanFragment {
129 root: Some(PlanNode {
130 children: vec![],
131 identity: "SeqScan".to_owned(),
132 node_body: Some(self.create_row_seq_scan_node(id)?),
133 }),
134 exchange_info: Some(ExchangeInfo {
135 mode: DistributionMode::Single as i32,
136 ..Default::default()
137 }),
138 }),
139 tracing_context: TracingContext::from_current_span().to_protobuf(),
140 };
141
142 let prost_exchange_source = PbExchangeSource {
143 task_output_id: Some(TaskOutputId {
144 task_id: Some(PbTaskId {
145 query_id: self.task_id.query_id.clone(),
150 stage_id: self.task_id.stage_id + 10000 + self.next_stage_id as u32,
151 task_id: (*id).into(),
152 }),
153 output_id: 0,
154 }),
155 host: Some(worker.host.as_ref().unwrap().clone()),
156 local_execute_plan: Some(Plan(local_execute_plan)),
157 };
158
159 Ok(prost_exchange_source)
160 }
161}
162
163impl LookupExecutorBuilder for InnerSideExecutorBuilder {
164 fn reset(&mut self) {
165 self.worker_slot_to_scan_range_mapping = HashMap::new();
166 }
167
168 async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()> {
171 let mut scan_range = ScanRange::full_table_scan();
172
173 for ((datum, outer_type), inner_type) in key_datums
174 .into_iter()
175 .zip_eq_fast(
176 self.outer_side_key_types
177 .iter()
178 .take(self.lookup_prefix_len),
179 )
180 .zip_eq_fast(
181 self.inner_side_key_types
182 .iter()
183 .take(self.lookup_prefix_len),
184 )
185 {
186 let datum = if inner_type == outer_type {
187 datum
188 } else {
189 bail!("Join key types are not aligned: LHS: {outer_type:?}, RHS: {inner_type:?}");
190 };
191
192 scan_range.eq_conds.push(datum);
193 }
194
195 let vnode = self.get_virtual_node(&scan_range)?;
196 let worker_slot_id = self.vnode_mapping[vnode.to_index()];
197
198 let list = self
199 .worker_slot_to_scan_range_mapping
200 .entry(worker_slot_id)
201 .or_default();
202 list.push((scan_range, vnode));
203
204 Ok(())
205 }
206
207 async fn build_executor(&mut self) -> Result<BoxedExecutor> {
210 self.next_stage_id += 1;
211 let mut sources = vec![];
212 for id in self.worker_slot_to_scan_range_mapping.keys() {
213 sources.push(self.build_prost_exchange_source(id)?);
214 }
215
216 if sources.is_empty() {
217 return Ok(Box::new(DummyExecutor {
218 schema: Schema::default(),
219 }));
220 }
221
222 let exchange_node = NodeBody::Exchange(ExchangeNode {
223 sources,
224 sequential: true,
225 input_schema: self.inner_side_schema.to_prost(),
226 });
227
228 let plan_node = PlanNode {
229 children: vec![],
230 identity: "LocalLookupJoinExchangeExecutor".to_owned(),
231 node_body: Some(exchange_node),
232 };
233
234 let task_id = self.task_id.clone();
235
236 let executor_builder = ExecutorBuilder::new(
237 &plan_node,
238 &task_id,
239 self.context.clone(),
240 self.shutdown_rx.clone(),
241 );
242
243 executor_builder.build().await
244 }
245}
246
247pub struct LocalLookupJoinExecutor<K, B: LookupExecutorBuilder> {
259 base: LookupJoinBase<K, B>,
260}
261
262impl<K: HashKey, B: LookupExecutorBuilder> Executor for LocalLookupJoinExecutor<K, B> {
263 fn schema(&self) -> &Schema {
264 &self.base.schema
265 }
266
267 fn identity(&self) -> &str {
268 &self.base.identity
269 }
270
271 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
272 Box::new(self.base).do_execute()
273 }
274}
275
276impl<K, B: LookupExecutorBuilder> LocalLookupJoinExecutor<K, B> {
277 pub fn new(base: LookupJoinBase<K, B>) -> Self {
278 Self { base }
279 }
280}
281
282pub struct LocalLookupJoinExecutorBuilder {}
283
284impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
285 async fn new_boxed_executor(
286 source: &ExecutorBuilder<'_>,
287 inputs: Vec<BoxedExecutor>,
288 ) -> Result<BoxedExecutor> {
289 let [outer_side_input]: [_; 1] = inputs.try_into().unwrap();
290
291 let lookup_join_node = try_match_expand!(
292 source.plan_node().get_node_body().unwrap(),
293 NodeBody::LocalLookupJoin
294 )?;
295
296 let join_type = JoinType::from_prost(lookup_join_node.get_join_type()?);
297 let condition = match lookup_join_node.get_condition() {
298 Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
299 Err(_) => None,
300 };
301
302 let output_indices: Vec<usize> = lookup_join_node
303 .get_output_indices()
304 .iter()
305 .map(|&x| x as usize)
306 .collect();
307
308 let outer_side_data_types = outer_side_input.schema().data_types();
309
310 let table_desc = lookup_join_node.get_inner_side_table_desc()?;
311 let inner_side_column_ids = lookup_join_node.get_inner_side_column_ids().clone();
312
313 let inner_side_schema = Schema {
314 fields: inner_side_column_ids
315 .iter()
316 .map(|&id| {
317 let column = table_desc
318 .columns
319 .iter()
320 .find(|c| c.column_id == id)
321 .unwrap();
322 Field::from(&ColumnDesc::from(column))
323 })
324 .collect_vec(),
325 };
326
327 let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
328 outer_side_input.schema().fields.clone()
329 } else {
330 [
331 outer_side_input.schema().fields.clone(),
332 inner_side_schema.fields.clone(),
333 ]
334 .concat()
335 };
336
337 let original_schema = Schema { fields };
338 let actual_schema = output_indices
339 .iter()
340 .map(|&idx| original_schema[idx].clone())
341 .collect();
342
343 let mut outer_side_key_idxs = vec![];
344 for outer_side_key in lookup_join_node.get_outer_side_key() {
345 outer_side_key_idxs.push(*outer_side_key as usize)
346 }
347
348 let outer_side_key_types: Vec<DataType> = outer_side_key_idxs
349 .iter()
350 .map(|&i| outer_side_data_types[i].clone())
351 .collect_vec();
352
353 let lookup_prefix_len: usize = lookup_join_node.get_lookup_prefix_len() as usize;
354
355 let mut inner_side_key_idxs = vec![];
356 for inner_side_key in lookup_join_node.get_inner_side_key() {
357 inner_side_key_idxs.push(*inner_side_key as usize)
358 }
359
360 let inner_side_key_types = inner_side_key_idxs
361 .iter()
362 .map(|&i| inner_side_schema.fields[i].data_type.clone())
363 .collect_vec();
364
365 let null_safe = lookup_join_node.get_null_safe().clone();
366
367 let vnode_mapping = lookup_join_node
368 .get_inner_side_vnode_mapping()
369 .iter()
370 .copied()
371 .map(WorkerSlotId::from)
372 .collect_vec();
373
374 assert!(!vnode_mapping.is_empty());
375
376 let chunk_size = source.context().get_config().developer.chunk_size;
377
378 let asof_desc = lookup_join_node
379 .asof_desc
380 .map(|desc| AsOfDesc::from_protobuf(&desc))
381 .transpose()?;
382
383 let worker_nodes = lookup_join_node.get_worker_nodes();
384 let worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode> = worker_nodes
385 .iter()
386 .flat_map(|worker| {
387 (0..(worker.compute_node_parallelism()))
388 .map(|i| (WorkerSlotId::new(worker.id, i), worker.clone()))
389 })
390 .collect();
391
392 let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into());
393
394 let inner_side_builder = InnerSideExecutorBuilder {
395 table_desc: table_desc.clone(),
396 table_distribution: TableDistribution::new_from_storage_table_desc(vnodes, table_desc),
397 vnode_mapping,
398 outer_side_key_types,
399 inner_side_schema,
400 inner_side_column_ids,
401 inner_side_key_types: inner_side_key_types.clone(),
402 lookup_prefix_len,
403 context: source.context().clone(),
404 task_id: source.task_id.clone(),
405 query_epoch: lookup_join_node
406 .query_epoch
407 .ok_or_else(|| anyhow!("query_epoch not set in local lookup join"))?,
408 worker_slot_to_scan_range_mapping: HashMap::new(),
409 chunk_size,
410 shutdown_rx: source.shutdown_rx().clone(),
411 next_stage_id: 0,
412 worker_slot_mapping,
413 };
414
415 let identity = source.plan_node().get_identity().clone();
416 Ok(LocalLookupJoinExecutorArgs {
417 join_type,
418 condition,
419 outer_side_input,
420 outer_side_data_types,
421 outer_side_key_idxs,
422 inner_side_builder,
423 inner_side_key_types,
424 inner_side_key_idxs,
425 null_safe,
426 lookup_prefix_len,
427 chunk_builder: DataChunkBuilder::new(original_schema.data_types(), chunk_size),
428 schema: actual_schema,
429 output_indices,
430 chunk_size,
431 asof_desc,
432 identity: identity.clone(),
433 shutdown_rx: source.shutdown_rx().clone(),
434 mem_ctx: source.context().create_executor_mem_context(&identity),
435 }
436 .dispatch())
437 }
438}
439
440struct LocalLookupJoinExecutorArgs<B: LookupExecutorBuilder> {
441 join_type: JoinType,
442 condition: Option<BoxedExpression>,
443 outer_side_input: BoxedExecutor,
444 outer_side_data_types: Vec<DataType>,
445 outer_side_key_idxs: Vec<usize>,
446 inner_side_builder: B,
447 inner_side_key_types: Vec<DataType>,
448 inner_side_key_idxs: Vec<usize>,
449 null_safe: Vec<bool>,
450 lookup_prefix_len: usize,
451 chunk_builder: DataChunkBuilder,
452 schema: Schema,
453 output_indices: Vec<usize>,
454 chunk_size: usize,
455 asof_desc: Option<AsOfDesc>,
456 identity: String,
457 shutdown_rx: ShutdownToken,
458 mem_ctx: MemoryContext,
459}
460
461impl<B: LookupExecutorBuilder> HashKeyDispatcher for LocalLookupJoinExecutorArgs<B> {
462 type Output = BoxedExecutor;
463
464 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
465 Box::new(LocalLookupJoinExecutor::<K, B>::new(LookupJoinBase {
466 join_type: self.join_type,
467 condition: self.condition,
468 outer_side_input: self.outer_side_input,
469 outer_side_data_types: self.outer_side_data_types,
470 outer_side_key_idxs: self.outer_side_key_idxs,
471 inner_side_builder: self.inner_side_builder,
472 inner_side_key_types: self.inner_side_key_types,
473 inner_side_key_idxs: self.inner_side_key_idxs,
474 null_safe: self.null_safe,
475 lookup_prefix_len: self.lookup_prefix_len,
476 chunk_builder: self.chunk_builder,
477 schema: self.schema,
478 output_indices: self.output_indices,
479 chunk_size: self.chunk_size,
480 asof_desc: self.asof_desc,
481 identity: self.identity,
482 shutdown_rx: self.shutdown_rx,
483 mem_ctx: self.mem_ctx,
484 _phantom: PhantomData,
485 }))
486 }
487
488 fn data_types(&self) -> &[DataType] {
489 &self.inner_side_key_types
490 }
491}
492
493#[cfg(test)]
494mod tests {
495 use std::sync::Arc;
496
497 use risingwave_common::array::{DataChunk, DataChunkTestExt};
498 use risingwave_common::catalog::{Field, Schema};
499 use risingwave_common::hash::HashKeyDispatcher;
500 use risingwave_common::memory::MemoryContext;
501 use risingwave_common::types::DataType;
502 use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
503 use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
504 use risingwave_expr::expr::{BoxedExpression, build_from_pretty};
505
506 use crate::executor::join::JoinType;
507 use crate::executor::test_utils::{
508 FakeInnerSideExecutorBuilder, MockExecutor, diff_executor_output,
509 };
510 use crate::executor::{BoxedExecutor, SortExecutor};
511 use crate::local_lookup_join::LocalLookupJoinExecutorArgs;
512 use crate::monitor::BatchSpillMetrics;
513 use crate::task::ShutdownToken;
514
515 const CHUNK_SIZE: usize = 1024;
516
517 fn create_outer_side_input() -> BoxedExecutor {
518 let schema = Schema {
519 fields: vec![
520 Field::unnamed(DataType::Int32),
521 Field::unnamed(DataType::Float32),
522 ],
523 };
524 let mut executor = MockExecutor::new(schema);
525
526 executor.add(DataChunk::from_pretty(
527 "i f
528 1 6.1
529 2 8.4
530 3 3.9",
531 ));
532
533 executor.add(DataChunk::from_pretty(
534 "i f
535 2 5.5
536 5 4.1
537 5 9.1
538 . .",
539 ));
540
541 Box::new(executor)
542 }
543
544 fn create_lookup_join_executor(
545 join_type: JoinType,
546 condition: Option<BoxedExpression>,
547 null_safe: bool,
548 ) -> BoxedExecutor {
549 let outer_side_input = create_outer_side_input();
550
551 let fields = if join_type == JoinType::LeftSemi || join_type == JoinType::LeftAnti {
552 outer_side_input.schema().fields.clone()
553 } else {
554 [
555 outer_side_input.schema().fields.clone(),
556 outer_side_input.schema().fields.clone(),
557 ]
558 .concat()
559 };
560 let original_schema = Schema { fields };
561
562 let inner_side_schema = Schema {
563 fields: outer_side_input.schema().fields.clone(),
564 };
565
566 let inner_side_data_types = inner_side_schema.data_types();
567 let outer_side_data_types = outer_side_input.schema().data_types();
568
569 LocalLookupJoinExecutorArgs {
570 join_type,
571 condition,
572 outer_side_input,
573 outer_side_data_types,
574 outer_side_key_idxs: vec![0],
575 inner_side_builder: FakeInnerSideExecutorBuilder::new(inner_side_schema),
576 inner_side_key_types: vec![inner_side_data_types[0].clone()],
577 inner_side_key_idxs: vec![0],
578 null_safe: vec![null_safe],
579 lookup_prefix_len: 1,
580 chunk_builder: DataChunkBuilder::new(original_schema.data_types(), CHUNK_SIZE),
581 schema: original_schema.clone(),
582 output_indices: (0..original_schema.len()).collect(),
583 chunk_size: CHUNK_SIZE,
584 asof_desc: None,
585 identity: "TestLookupJoinExecutor".to_owned(),
586 shutdown_rx: ShutdownToken::empty(),
587 mem_ctx: MemoryContext::none(),
588 }
589 .dispatch()
590 }
591
592 fn create_order_by_executor(child: BoxedExecutor) -> BoxedExecutor {
593 let column_orders = vec![
594 ColumnOrder {
595 column_index: 0,
596 order_type: OrderType::ascending(),
597 },
598 ColumnOrder {
599 column_index: 1,
600 order_type: OrderType::ascending(),
601 },
602 ];
603
604 Box::new(SortExecutor::new(
605 child,
606 Arc::new(column_orders),
607 "SortExecutor".into(),
608 CHUNK_SIZE,
609 MemoryContext::none(),
610 None,
611 BatchSpillMetrics::for_test(),
612 ))
613 }
614
615 async fn do_test(
616 join_type: JoinType,
617 condition: Option<BoxedExpression>,
618 null_safe: bool,
619 expected: DataChunk,
620 ) {
621 let lookup_join_executor = create_lookup_join_executor(join_type, condition, null_safe);
622 let order_by_executor = create_order_by_executor(lookup_join_executor);
623 let mut expected_mock_exec = MockExecutor::new(order_by_executor.schema().clone());
624 expected_mock_exec.add(expected);
625 diff_executor_output(order_by_executor, Box::new(expected_mock_exec)).await;
626 }
627
628 #[tokio::test]
629 async fn test_inner_join() {
630 let expected = DataChunk::from_pretty(
631 "i f i f
632 1 6.1 1 9.2
633 2 5.5 2 5.5
634 2 5.5 2 4.4
635 2 8.4 2 5.5
636 2 8.4 2 4.4
637 5 4.1 5 2.3
638 5 4.1 5 3.7
639 5 9.1 5 2.3
640 5 9.1 5 3.7",
641 );
642
643 do_test(JoinType::Inner, None, false, expected).await;
644 }
645
646 #[tokio::test]
647 async fn test_null_safe_inner_join() {
648 let expected = DataChunk::from_pretty(
649 "i f i f
650 1 6.1 1 9.2
651 2 5.5 2 5.5
652 2 5.5 2 4.4
653 2 8.4 2 5.5
654 2 8.4 2 4.4
655 5 4.1 5 2.3
656 5 4.1 5 3.7
657 5 9.1 5 2.3
658 5 9.1 5 3.7
659 . . . .",
660 );
661
662 do_test(JoinType::Inner, None, true, expected).await;
663 }
664
665 #[tokio::test]
666 async fn test_left_outer_join() {
667 let expected = DataChunk::from_pretty(
668 "i f i f
669 1 6.1 1 9.2
670 2 5.5 2 5.5
671 2 5.5 2 4.4
672 2 8.4 2 5.5
673 2 8.4 2 4.4
674 3 3.9 . .
675 5 4.1 5 2.3
676 5 4.1 5 3.7
677 5 9.1 5 2.3
678 5 9.1 5 3.7
679 . . . .",
680 );
681
682 do_test(JoinType::LeftOuter, None, false, expected).await;
683 }
684
685 #[tokio::test]
686 async fn test_left_semi_join() {
687 let expected = DataChunk::from_pretty(
688 "i f
689 1 6.1
690 2 5.5
691 2 8.4
692 5 4.1
693 5 9.1",
694 );
695
696 do_test(JoinType::LeftSemi, None, false, expected).await;
697 }
698
699 #[tokio::test]
700 async fn test_left_anti_join() {
701 let expected = DataChunk::from_pretty(
702 "i f
703 3 3.9
704 . .",
705 );
706
707 do_test(JoinType::LeftAnti, None, false, expected).await;
708 }
709
710 #[tokio::test]
711 async fn test_inner_join_with_condition() {
712 let expected = DataChunk::from_pretty(
713 "i f i f
714 1 6.1 1 9.2
715 2 5.5 2 5.5
716 2 8.4 2 5.5",
717 );
718 let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
719
720 do_test(JoinType::Inner, Some(condition), false, expected).await;
721 }
722
723 #[tokio::test]
724 async fn test_left_outer_join_with_condition() {
725 let expected = DataChunk::from_pretty(
726 "i f i f
727 1 6.1 1 9.2
728 2 5.5 2 5.5
729 2 8.4 2 5.5
730 3 3.9 . .
731 5 4.1 . .
732 5 9.1 . .
733 . . . .",
734 );
735 let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
736
737 do_test(JoinType::LeftOuter, Some(condition), false, expected).await;
738 }
739
740 #[tokio::test]
741 async fn test_left_semi_join_with_condition() {
742 let expected = DataChunk::from_pretty(
743 "i f
744 1 6.1
745 2 5.5
746 2 8.4",
747 );
748 let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
749
750 do_test(JoinType::LeftSemi, Some(condition), false, expected).await;
751 }
752
753 #[tokio::test]
754 async fn test_left_anti_join_with_condition() {
755 let expected = DataChunk::from_pretty(
756 "i f
757 3 3.9
758 5 4.1
759 5 9.1
760 . .",
761 );
762 let condition = build_from_pretty("(less_than:boolean 5:float4 $3:float4)");
763
764 do_test(JoinType::LeftAnti, Some(condition), false, expected).await;
765 }
766}