risingwave_frontend/optimizer/plan_node/
batch_nested_loop_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_pb::batch_plan::NestedLoopJoinNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::utils::{Distill, childless_record};
21use super::{
22    ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, ToBatchPb, ToDistributedBatch, generic,
23};
24use crate::error::Result;
25use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
26use crate::optimizer::plan_node::ToLocalBatch;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::utils::IndicesDisplay;
29use crate::optimizer::property::{Distribution, Order, RequiredDist};
30use crate::utils::ConditionDisplay;
31
32/// `BatchNestedLoopJoin` implements [`super::LogicalJoin`] by checking the join condition
33/// against all pairs of rows from inner & outer side within 2 layers of loops.
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct BatchNestedLoopJoin {
36    pub base: PlanBase<Batch>,
37    core: generic::Join<PlanRef>,
38}
39
40impl BatchNestedLoopJoin {
41    pub fn new(core: generic::Join<PlanRef>) -> Self {
42        let dist = Self::derive_dist(core.left.distribution(), core.right.distribution());
43        let base = PlanBase::new_batch_with_core(&core, dist, Order::any());
44        Self { base, core }
45    }
46
47    fn derive_dist(left: &Distribution, right: &Distribution) -> Distribution {
48        match (left, right) {
49            (Distribution::Single, Distribution::Single) => Distribution::Single,
50            (_, _) => unreachable!("{}{}", left, right),
51        }
52    }
53}
54
55impl Distill for BatchNestedLoopJoin {
56    fn distill<'a>(&self) -> XmlNode<'a> {
57        let verbose = self.base.ctx().is_explain_verbose();
58        let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
59        vec.push(("type", Pretty::debug(&self.core.join_type)));
60
61        let concat_schema = self.core.concat_schema();
62        vec.push((
63            "predicate",
64            Pretty::debug(&ConditionDisplay {
65                condition: &self.core.on,
66                input_schema: &concat_schema,
67            }),
68        ));
69
70        if verbose {
71            let data = IndicesDisplay::from_join(&self.core, &concat_schema);
72            vec.push(("output", data));
73        }
74
75        childless_record("BatchNestedLoopJoin", vec)
76    }
77}
78
79impl PlanTreeNodeBinary for BatchNestedLoopJoin {
80    fn left(&self) -> PlanRef {
81        self.core.left.clone()
82    }
83
84    fn right(&self) -> PlanRef {
85        self.core.right.clone()
86    }
87
88    fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
89        let mut core = self.core.clone();
90        core.left = left;
91        core.right = right;
92        Self::new(core)
93    }
94}
95
96impl_plan_tree_node_for_binary! { BatchNestedLoopJoin }
97
98impl ToDistributedBatch for BatchNestedLoopJoin {
99    fn to_distributed(&self) -> Result<PlanRef> {
100        let left = self
101            .left()
102            .to_distributed_with_required(&Order::any(), &RequiredDist::single())?;
103        let right = self
104            .right()
105            .to_distributed_with_required(&Order::any(), &RequiredDist::single())?;
106
107        Ok(self.clone_with_left_right(left, right).into())
108    }
109}
110
111impl ToBatchPb for BatchNestedLoopJoin {
112    fn to_batch_prost_body(&self) -> NodeBody {
113        NodeBody::NestedLoopJoin(NestedLoopJoinNode {
114            join_type: self.core.join_type as i32,
115            join_cond: Some(ExprImpl::from(self.core.on.clone()).to_expr_proto()),
116            output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
117        })
118    }
119}
120
121impl ToLocalBatch for BatchNestedLoopJoin {
122    fn to_local(&self) -> Result<PlanRef> {
123        let left = RequiredDist::single()
124            .enforce_if_not_satisfies(self.left().to_local()?, &Order::any())?;
125
126        let right = RequiredDist::single()
127            .enforce_if_not_satisfies(self.right().to_local()?, &Order::any())?;
128
129        Ok(self.clone_with_left_right(left, right).into())
130    }
131}
132
133impl ExprRewritable for BatchNestedLoopJoin {
134    fn has_rewritable_expr(&self) -> bool {
135        true
136    }
137
138    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
139        let mut core = self.core.clone();
140        core.rewrite_exprs(r);
141        Self::new(core).into()
142    }
143}
144
145impl ExprVisitable for BatchNestedLoopJoin {
146    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
147        self.core.visit_exprs(v);
148    }
149}