risingwave_frontend/optimizer/plan_node/
batch_nested_loop_join.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::batch_plan::NestedLoopJoinNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::utils::{Distill, childless_record};
21use super::{
22 ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, ToBatchPb, ToDistributedBatch, generic,
23};
24use crate::error::Result;
25use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
26use crate::optimizer::plan_node::ToLocalBatch;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::utils::IndicesDisplay;
29use crate::optimizer::property::{Distribution, Order, RequiredDist};
30use crate::utils::ConditionDisplay;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct BatchNestedLoopJoin {
36 pub base: PlanBase<Batch>,
37 core: generic::Join<PlanRef>,
38}
39
40impl BatchNestedLoopJoin {
41 pub fn new(core: generic::Join<PlanRef>) -> Self {
42 let dist = Self::derive_dist(core.left.distribution(), core.right.distribution());
43 let base = PlanBase::new_batch_with_core(&core, dist, Order::any());
44 Self { base, core }
45 }
46
47 fn derive_dist(left: &Distribution, right: &Distribution) -> Distribution {
48 match (left, right) {
49 (Distribution::Single, Distribution::Single) => Distribution::Single,
50 (_, _) => unreachable!("{}{}", left, right),
51 }
52 }
53}
54
55impl Distill for BatchNestedLoopJoin {
56 fn distill<'a>(&self) -> XmlNode<'a> {
57 let verbose = self.base.ctx().is_explain_verbose();
58 let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
59 vec.push(("type", Pretty::debug(&self.core.join_type)));
60
61 let concat_schema = self.core.concat_schema();
62 vec.push((
63 "predicate",
64 Pretty::debug(&ConditionDisplay {
65 condition: &self.core.on,
66 input_schema: &concat_schema,
67 }),
68 ));
69
70 if verbose {
71 let data = IndicesDisplay::from_join(&self.core, &concat_schema);
72 vec.push(("output", data));
73 }
74
75 childless_record("BatchNestedLoopJoin", vec)
76 }
77}
78
79impl PlanTreeNodeBinary for BatchNestedLoopJoin {
80 fn left(&self) -> PlanRef {
81 self.core.left.clone()
82 }
83
84 fn right(&self) -> PlanRef {
85 self.core.right.clone()
86 }
87
88 fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
89 let mut core = self.core.clone();
90 core.left = left;
91 core.right = right;
92 Self::new(core)
93 }
94}
95
96impl_plan_tree_node_for_binary! { BatchNestedLoopJoin }
97
98impl ToDistributedBatch for BatchNestedLoopJoin {
99 fn to_distributed(&self) -> Result<PlanRef> {
100 let left = self
101 .left()
102 .to_distributed_with_required(&Order::any(), &RequiredDist::single())?;
103 let right = self
104 .right()
105 .to_distributed_with_required(&Order::any(), &RequiredDist::single())?;
106
107 Ok(self.clone_with_left_right(left, right).into())
108 }
109}
110
111impl ToBatchPb for BatchNestedLoopJoin {
112 fn to_batch_prost_body(&self) -> NodeBody {
113 NodeBody::NestedLoopJoin(NestedLoopJoinNode {
114 join_type: self.core.join_type as i32,
115 join_cond: Some(ExprImpl::from(self.core.on.clone()).to_expr_proto()),
116 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
117 })
118 }
119}
120
121impl ToLocalBatch for BatchNestedLoopJoin {
122 fn to_local(&self) -> Result<PlanRef> {
123 let left = RequiredDist::single()
124 .enforce_if_not_satisfies(self.left().to_local()?, &Order::any())?;
125
126 let right = RequiredDist::single()
127 .enforce_if_not_satisfies(self.right().to_local()?, &Order::any())?;
128
129 Ok(self.clone_with_left_right(left, right).into())
130 }
131}
132
133impl ExprRewritable for BatchNestedLoopJoin {
134 fn has_rewritable_expr(&self) -> bool {
135 true
136 }
137
138 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
139 let mut core = self.core.clone();
140 core.rewrite_exprs(r);
141 Self::new(core).into()
142 }
143}
144
145impl ExprVisitable for BatchNestedLoopJoin {
146 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
147 self.core.visit_exprs(v);
148 }
149}