risingwave_frontend/optimizer/plan_node/
batch_lookup_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{ColumnId, TableDesc};
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18use risingwave_pb::batch_plan::{DistributedLookupJoinNode, LocalLookupJoinNode};
19use risingwave_pb::plan_common::AsOfJoinDesc;
20use risingwave_sqlparser::ast::AsOf;
21
22use super::batch::prelude::*;
23use super::utils::{Distill, childless_record, to_pb_time_travel_as_of};
24use super::{ExprRewritable, generic};
25use crate::error::Result;
26use crate::expr::{Expr, ExprRewriter, ExprVisitor};
27use crate::optimizer::PlanRef;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::utils::IndicesDisplay;
30use crate::optimizer::plan_node::{
31    EqJoinPredicate, EqJoinPredicateDisplay, LogicalScan, PlanBase, PlanTreeNodeUnary,
32    ToDistributedBatch, ToLocalBatch, TryToBatchPb,
33};
34use crate::optimizer::property::{Distribution, Order, RequiredDist};
35use crate::scheduler::SchedulerResult;
36use crate::utils::ColIndexMappingRewriteExt;
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct BatchLookupJoin {
40    pub base: PlanBase<Batch>,
41    core: generic::Join<PlanRef>,
42
43    /// The join condition must be equivalent to `logical.on`, but separated into equal and
44    /// non-equal parts to facilitate execution later
45    eq_join_predicate: EqJoinPredicate,
46
47    /// Table description of the right side table
48    right_table_desc: TableDesc,
49
50    /// Output column ids of the right side table
51    right_output_column_ids: Vec<ColumnId>,
52
53    /// The prefix length of the order key of right side table.
54    lookup_prefix_len: usize,
55
56    /// If `distributed_lookup` is true, it will generate `DistributedLookupJoinNode` for
57    /// `ToBatchPb`. Otherwise, it will generate `LookupJoinNode`.
58    distributed_lookup: bool,
59
60    as_of: Option<AsOf>,
61    // `AsOf` join description
62    asof_desc: Option<AsOfJoinDesc>,
63}
64
65impl BatchLookupJoin {
66    pub fn new(
67        core: generic::Join<PlanRef>,
68        eq_join_predicate: EqJoinPredicate,
69        right_table_desc: TableDesc,
70        right_output_column_ids: Vec<ColumnId>,
71        lookup_prefix_len: usize,
72        distributed_lookup: bool,
73        as_of: Option<AsOf>,
74        asof_desc: Option<AsOfJoinDesc>,
75    ) -> Self {
76        // We cannot create a `BatchLookupJoin` without any eq keys. We require eq keys to do the
77        // lookup.
78        assert!(eq_join_predicate.has_eq());
79        assert!(eq_join_predicate.eq_keys_are_type_aligned());
80        let dist = Self::derive_dist(core.left.distribution(), &core);
81        let base = PlanBase::new_batch_with_core(&core, dist, Order::any());
82        Self {
83            base,
84            core,
85            eq_join_predicate,
86            right_table_desc,
87            right_output_column_ids,
88            lookup_prefix_len,
89            distributed_lookup,
90            as_of,
91            asof_desc,
92        }
93    }
94
95    fn derive_dist(left: &Distribution, core: &generic::Join<PlanRef>) -> Distribution {
96        match left {
97            Distribution::Single => Distribution::Single,
98            Distribution::HashShard(_) | Distribution::UpstreamHashShard(_, _) => {
99                let l2o = core.l2i_col_mapping().composite(&core.i2o_col_mapping());
100                l2o.rewrite_provided_distribution(left)
101            }
102            _ => unreachable!(),
103        }
104    }
105
106    fn eq_join_predicate(&self) -> &EqJoinPredicate {
107        &self.eq_join_predicate
108    }
109
110    pub fn right_table_desc(&self) -> &TableDesc {
111        &self.right_table_desc
112    }
113
114    fn clone_with_distributed_lookup(&self, input: PlanRef, distributed_lookup: bool) -> Self {
115        let mut batch_lookup_join = self.clone_with_input(input);
116        batch_lookup_join.distributed_lookup = distributed_lookup;
117        batch_lookup_join
118    }
119
120    pub fn lookup_prefix_len(&self) -> usize {
121        self.lookup_prefix_len
122    }
123}
124
125impl Distill for BatchLookupJoin {
126    fn distill<'a>(&self) -> XmlNode<'a> {
127        let verbose = self.base.ctx().is_explain_verbose();
128        let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
129        vec.push(("type", Pretty::debug(&self.core.join_type)));
130
131        let concat_schema = self.core.concat_schema();
132        vec.push((
133            "predicate",
134            Pretty::debug(&EqJoinPredicateDisplay {
135                eq_join_predicate: self.eq_join_predicate(),
136                input_schema: &concat_schema,
137            }),
138        ));
139
140        if verbose {
141            let data = IndicesDisplay::from_join(&self.core, &concat_schema);
142            vec.push(("output", data));
143        }
144
145        if let Some(scan) = self.core.right.as_logical_scan() {
146            let scan: &LogicalScan = scan;
147            vec.push(("lookup table", Pretty::display(&scan.table_name())));
148        }
149
150        childless_record("BatchLookupJoin", vec)
151    }
152}
153
154impl PlanTreeNodeUnary for BatchLookupJoin {
155    fn input(&self) -> PlanRef {
156        self.core.left.clone()
157    }
158
159    // Only change left side
160    fn clone_with_input(&self, input: PlanRef) -> Self {
161        let mut core = self.core.clone();
162        core.left = input;
163        Self::new(
164            core,
165            self.eq_join_predicate.clone(),
166            self.right_table_desc.clone(),
167            self.right_output_column_ids.clone(),
168            self.lookup_prefix_len,
169            self.distributed_lookup,
170            self.as_of.clone(),
171            self.asof_desc,
172        )
173    }
174}
175
176impl_plan_tree_node_for_unary! { BatchLookupJoin }
177
178impl ToDistributedBatch for BatchLookupJoin {
179    fn to_distributed(&self) -> Result<PlanRef> {
180        // Align left distribution keys with the right table.
181        let mut exchange_dist_keys = vec![];
182        let left_eq_indexes = self.eq_join_predicate.left_eq_indexes();
183        let right_table_desc = self.right_table_desc();
184        for dist_col_index in &right_table_desc.distribution_key {
185            let dist_col_id = right_table_desc.columns[*dist_col_index].column_id;
186            let output_pos = self
187                .right_output_column_ids
188                .iter()
189                .position(|p| *p == dist_col_id)
190                .unwrap();
191            let dist_in_eq_indexes = self
192                .eq_join_predicate
193                .right_eq_indexes()
194                .iter()
195                .position(|col| *col == output_pos)
196                .unwrap();
197            assert!(dist_in_eq_indexes < self.lookup_prefix_len);
198            exchange_dist_keys.push(left_eq_indexes[dist_in_eq_indexes]);
199        }
200
201        assert!(!exchange_dist_keys.is_empty());
202
203        let input = self.input().to_distributed_with_required(
204            &Order::any(),
205            &RequiredDist::PhysicalDist(Distribution::UpstreamHashShard(
206                exchange_dist_keys,
207                self.right_table_desc.table_id,
208            )),
209        )?;
210
211        Ok(self.clone_with_distributed_lookup(input, true).into())
212    }
213}
214
215impl TryToBatchPb for BatchLookupJoin {
216    fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
217        Ok(if self.distributed_lookup {
218            NodeBody::DistributedLookupJoin(DistributedLookupJoinNode {
219                join_type: self.core.join_type as i32,
220                condition: self
221                    .eq_join_predicate
222                    .other_cond()
223                    .as_expr_unless_true()
224                    .map(|x| x.to_expr_proto()),
225                outer_side_key: self
226                    .eq_join_predicate
227                    .left_eq_indexes()
228                    .into_iter()
229                    .map(|a| a as _)
230                    .collect(),
231                inner_side_key: self
232                    .eq_join_predicate
233                    .right_eq_indexes()
234                    .into_iter()
235                    .map(|a| a as _)
236                    .collect(),
237                inner_side_table_desc: Some(self.right_table_desc.try_to_protobuf()?),
238                inner_side_column_ids: self
239                    .right_output_column_ids
240                    .iter()
241                    .map(ColumnId::get_id)
242                    .collect(),
243                output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
244                null_safe: self.eq_join_predicate.null_safes(),
245                lookup_prefix_len: self.lookup_prefix_len as u32,
246                as_of: to_pb_time_travel_as_of(&self.as_of)?,
247                asof_desc: self.asof_desc,
248            })
249        } else {
250            NodeBody::LocalLookupJoin(LocalLookupJoinNode {
251                join_type: self.core.join_type as i32,
252                condition: self
253                    .eq_join_predicate
254                    .other_cond()
255                    .as_expr_unless_true()
256                    .map(|x| x.to_expr_proto()),
257                outer_side_key: self
258                    .eq_join_predicate
259                    .left_eq_indexes()
260                    .into_iter()
261                    .map(|a| a as _)
262                    .collect(),
263                inner_side_key: self
264                    .eq_join_predicate
265                    .right_eq_indexes()
266                    .into_iter()
267                    .map(|a| a as _)
268                    .collect(),
269                inner_side_table_desc: Some(self.right_table_desc.try_to_protobuf()?),
270                inner_side_vnode_mapping: vec![], // To be filled in at local.rs
271                inner_side_column_ids: self
272                    .right_output_column_ids
273                    .iter()
274                    .map(ColumnId::get_id)
275                    .collect(),
276                output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
277                worker_nodes: vec![], // To be filled in at local.rs
278                null_safe: self.eq_join_predicate.null_safes(),
279                lookup_prefix_len: self.lookup_prefix_len as u32,
280                as_of: to_pb_time_travel_as_of(&self.as_of)?,
281                asof_desc: self.asof_desc,
282            })
283        })
284    }
285}
286
287impl ToLocalBatch for BatchLookupJoin {
288    fn to_local(&self) -> Result<PlanRef> {
289        let input = RequiredDist::single()
290            .enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
291
292        Ok(self.clone_with_distributed_lookup(input, false).into())
293    }
294}
295
296impl ExprRewritable for BatchLookupJoin {
297    fn has_rewritable_expr(&self) -> bool {
298        true
299    }
300
301    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
302        let base = self.base.clone_with_new_plan_id();
303        let mut core = self.core.clone();
304        core.rewrite_exprs(r);
305        Self {
306            base,
307            core,
308            eq_join_predicate: self.eq_join_predicate.rewrite_exprs(r),
309            ..Self::clone(self)
310        }
311        .into()
312    }
313}
314
315impl ExprVisitable for BatchLookupJoin {
316    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
317        self.core.visit_exprs(v);
318    }
319}