risingwave_frontend/optimizer/plan_node/
batch_nested_loop_join.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::batch_plan::NestedLoopJoinNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::utils::{Distill, childless_record};
21use super::{
22 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeBinary, ToBatchPb,
23 ToDistributedBatch, generic,
24};
25use crate::error::Result;
26use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
27use crate::optimizer::plan_node::ToLocalBatch;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::utils::IndicesDisplay;
30use crate::optimizer::property::{Distribution, Order, RequiredDist};
31use crate::utils::ConditionDisplay;
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct BatchNestedLoopJoin {
37 pub base: PlanBase<Batch>,
38 core: generic::Join<PlanRef>,
39}
40
41impl BatchNestedLoopJoin {
42 pub fn new(core: generic::Join<PlanRef>) -> Self {
43 let dist = Self::derive_dist(core.left.distribution(), core.right.distribution());
44 let base = PlanBase::new_batch_with_core(&core, dist, Order::any());
45 Self { base, core }
46 }
47
48 fn derive_dist(left: &Distribution, right: &Distribution) -> Distribution {
49 match (left, right) {
50 (Distribution::Single, Distribution::Single) => Distribution::Single,
51 (_, _) => unreachable!("{}{}", left, right),
52 }
53 }
54}
55
56impl Distill for BatchNestedLoopJoin {
57 fn distill<'a>(&self) -> XmlNode<'a> {
58 let verbose = self.base.ctx().is_explain_verbose();
59 let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
60 vec.push(("type", Pretty::debug(&self.core.join_type)));
61
62 let concat_schema = self.core.concat_schema();
63 let on = self.core.on.as_condition();
64 vec.push((
65 "predicate",
66 Pretty::debug(&ConditionDisplay {
67 condition: &on,
68 input_schema: &concat_schema,
69 }),
70 ));
71
72 if verbose {
73 let data = IndicesDisplay::from_join(&self.core, &concat_schema);
74 vec.push(("output", data));
75 }
76
77 childless_record("BatchNestedLoopJoin", vec)
78 }
79}
80
81impl PlanTreeNodeBinary<Batch> for BatchNestedLoopJoin {
82 fn left(&self) -> PlanRef {
83 self.core.left.clone()
84 }
85
86 fn right(&self) -> PlanRef {
87 self.core.right.clone()
88 }
89
90 fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
91 let mut core = self.core.clone();
92 core.left = left;
93 core.right = right;
94 Self::new(core)
95 }
96}
97
98impl_plan_tree_node_for_binary! { Batch, BatchNestedLoopJoin }
99
100impl ToDistributedBatch for BatchNestedLoopJoin {
101 fn to_distributed(&self) -> Result<PlanRef> {
102 let left = self
103 .left()
104 .to_distributed_with_required(&Order::any(), &RequiredDist::single())?;
105 let right = self
106 .right()
107 .to_distributed_with_required(&Order::any(), &RequiredDist::single())?;
108
109 Ok(self.clone_with_left_right(left, right).into())
110 }
111}
112
113impl ToBatchPb for BatchNestedLoopJoin {
114 fn to_batch_prost_body(&self) -> NodeBody {
115 NodeBody::NestedLoopJoin(NestedLoopJoinNode {
116 join_type: self.core.join_type as i32,
117 join_cond: Some(ExprImpl::from(self.core.on.as_condition()).to_expr_proto()),
118 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
119 })
120 }
121}
122
123impl ToLocalBatch for BatchNestedLoopJoin {
124 fn to_local(&self) -> Result<PlanRef> {
125 let left = RequiredDist::single()
126 .batch_enforce_if_not_satisfies(self.left().to_local()?, &Order::any())?;
127
128 let right = RequiredDist::single()
129 .batch_enforce_if_not_satisfies(self.right().to_local()?, &Order::any())?;
130
131 Ok(self.clone_with_left_right(left, right).into())
132 }
133}
134
135impl ExprRewritable<Batch> for BatchNestedLoopJoin {
136 fn has_rewritable_expr(&self) -> bool {
137 true
138 }
139
140 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
141 let mut core = self.core.clone();
142 core.rewrite_exprs(r);
143 Self::new(core).into()
144 }
145}
146
147impl ExprVisitable for BatchNestedLoopJoin {
148 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
149 self.core.visit_exprs(v);
150 }
151}