risingwave_frontend/optimizer/plan_node/
batch_lookup_join.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::catalog::ColumnId;
19use risingwave_pb::batch_plan::plan_node::NodeBody;
20use risingwave_pb::batch_plan::{DistributedLookupJoinNode, LocalLookupJoinNode};
21use risingwave_pb::plan_common::AsOfJoinDesc;
22use risingwave_sqlparser::ast::AsOf;
23
24use super::batch::prelude::*;
25use super::utils::{Distill, childless_record, to_batch_query_epoch};
26use super::{BatchPlanRef as PlanRef, BatchSeqScan, ExprRewritable, generic};
27use crate::TableCatalog;
28use crate::error::Result;
29use crate::expr::{Expr, ExprRewriter, ExprVisitor};
30use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
31use crate::optimizer::plan_node::utils::IndicesDisplay;
32use crate::optimizer::plan_node::{
33    EqJoinPredicate, EqJoinPredicateDisplay, PlanBase, PlanTreeNodeUnary, ToDistributedBatch,
34    ToLocalBatch, TryToBatchPb,
35};
36use crate::optimizer::property::{Distribution, Order, RequiredDist};
37use crate::scheduler::SchedulerResult;
38use crate::utils::ColIndexMappingRewriteExt;
39
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct BatchLookupJoin {
42    pub base: PlanBase<Batch>,
43    core: generic::Join<PlanRef>,
44
45    /// The join condition must be equivalent to `logical.on`, but separated into equal and
46    /// non-equal parts to facilitate execution later
47    eq_join_predicate: EqJoinPredicate,
48
49    /// Table description of the right side table
50    right_table: Arc<TableCatalog>,
51
52    /// Output column ids of the right side table
53    right_output_column_ids: Vec<ColumnId>,
54
55    /// The prefix length of the order key of right side table.
56    lookup_prefix_len: usize,
57
58    /// If `distributed_lookup` is true, it will generate `DistributedLookupJoinNode` for
59    /// `ToBatchPb`. Otherwise, it will generate `LookupJoinNode`.
60    distributed_lookup: bool,
61
62    as_of: Option<AsOf>,
63    // `AsOf` join description
64    asof_desc: Option<AsOfJoinDesc>,
65}
66
67impl BatchLookupJoin {
68    pub fn new(
69        core: generic::Join<PlanRef>,
70        eq_join_predicate: EqJoinPredicate,
71        right_table: Arc<TableCatalog>,
72        right_output_column_ids: Vec<ColumnId>,
73        lookup_prefix_len: usize,
74        distributed_lookup: bool,
75        as_of: Option<AsOf>,
76        asof_desc: Option<AsOfJoinDesc>,
77    ) -> Self {
78        // We cannot create a `BatchLookupJoin` without any eq keys. We require eq keys to do the
79        // lookup.
80        assert!(eq_join_predicate.has_eq());
81        assert!(eq_join_predicate.eq_keys_are_type_aligned());
82        let dist = Self::derive_dist(core.left.distribution(), &core);
83        let base = PlanBase::new_batch_with_core(&core, dist, Order::any());
84        Self {
85            base,
86            core,
87            eq_join_predicate,
88            right_table,
89            right_output_column_ids,
90            lookup_prefix_len,
91            distributed_lookup,
92            as_of,
93            asof_desc,
94        }
95    }
96
97    fn derive_dist(left: &Distribution, core: &generic::Join<PlanRef>) -> Distribution {
98        match left {
99            Distribution::Single => Distribution::Single,
100            Distribution::HashShard(_) | Distribution::UpstreamHashShard(_, _) => {
101                let l2o = core.l2i_col_mapping().composite(&core.i2o_col_mapping());
102                l2o.rewrite_provided_distribution(left)
103            }
104            _ => unreachable!(),
105        }
106    }
107
108    fn eq_join_predicate(&self) -> &EqJoinPredicate {
109        &self.eq_join_predicate
110    }
111
112    pub fn right_table(&self) -> &TableCatalog {
113        &self.right_table
114    }
115
116    fn clone_with_distributed_lookup(&self, input: PlanRef, distributed_lookup: bool) -> Self {
117        let mut batch_lookup_join = self.clone_with_input(input);
118        batch_lookup_join.distributed_lookup = distributed_lookup;
119        batch_lookup_join
120    }
121
122    pub fn lookup_prefix_len(&self) -> usize {
123        self.lookup_prefix_len
124    }
125}
126
127impl Distill for BatchLookupJoin {
128    fn distill<'a>(&self) -> XmlNode<'a> {
129        let verbose = self.base.ctx().is_explain_verbose();
130        let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
131        vec.push(("type", Pretty::debug(&self.core.join_type)));
132
133        let concat_schema = self.core.concat_schema();
134        vec.push((
135            "predicate",
136            Pretty::debug(&EqJoinPredicateDisplay {
137                eq_join_predicate: self.eq_join_predicate(),
138                input_schema: &concat_schema,
139            }),
140        ));
141
142        if verbose {
143            let data = IndicesDisplay::from_join(&self.core, &concat_schema);
144            vec.push(("output", data));
145        }
146
147        let scan: &BatchSeqScan = self.core.right.as_batch_seq_scan().unwrap();
148
149        vec.push(("lookup table", Pretty::display(&scan.core().table_name())));
150
151        if let Some(as_of) = &self.as_of {
152            vec.push(("as_of", Pretty::debug(as_of)));
153        }
154
155        childless_record("BatchLookupJoin", vec)
156    }
157}
158
159impl PlanTreeNodeUnary<Batch> for BatchLookupJoin {
160    fn input(&self) -> PlanRef {
161        self.core.left.clone()
162    }
163
164    // Only change left side
165    fn clone_with_input(&self, input: PlanRef) -> Self {
166        let mut core = self.core.clone();
167        core.left = input;
168        Self::new(
169            core,
170            self.eq_join_predicate.clone(),
171            self.right_table.clone(),
172            self.right_output_column_ids.clone(),
173            self.lookup_prefix_len,
174            self.distributed_lookup,
175            self.as_of.clone(),
176            self.asof_desc,
177        )
178    }
179}
180
181impl_plan_tree_node_for_unary! { Batch, BatchLookupJoin }
182
183impl ToDistributedBatch for BatchLookupJoin {
184    fn to_distributed(&self) -> Result<PlanRef> {
185        // Align left distribution keys with the right table.
186        let mut exchange_dist_keys = vec![];
187        let left_eq_indexes = self.eq_join_predicate.left_eq_indexes();
188        let right_table = &self.right_table;
189        for dist_col_index in &right_table.distribution_key {
190            let dist_col_id = right_table.columns[*dist_col_index].column_desc.column_id;
191            let output_pos = self
192                .right_output_column_ids
193                .iter()
194                .position(|p| *p == dist_col_id)
195                .unwrap();
196            let dist_in_eq_indexes = self
197                .eq_join_predicate
198                .right_eq_indexes()
199                .iter()
200                .position(|col| *col == output_pos)
201                .unwrap();
202            assert!(dist_in_eq_indexes < self.lookup_prefix_len);
203            exchange_dist_keys.push(left_eq_indexes[dist_in_eq_indexes]);
204        }
205
206        assert!(!exchange_dist_keys.is_empty());
207
208        let input = self.input().to_distributed_with_required(
209            &Order::any(),
210            &RequiredDist::PhysicalDist(Distribution::UpstreamHashShard(
211                exchange_dist_keys,
212                self.right_table.id,
213            )),
214        )?;
215
216        Ok(self.clone_with_distributed_lookup(input, true).into())
217    }
218}
219
220impl TryToBatchPb for BatchLookupJoin {
221    fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
222        Ok(if self.distributed_lookup {
223            NodeBody::DistributedLookupJoin(DistributedLookupJoinNode {
224                join_type: self.core.join_type as i32,
225                condition: self
226                    .eq_join_predicate
227                    .other_cond()
228                    .as_expr_unless_true()
229                    .map(|x| x.to_expr_proto()),
230                outer_side_key: self
231                    .eq_join_predicate
232                    .left_eq_indexes()
233                    .into_iter()
234                    .map(|a| a as _)
235                    .collect(),
236                inner_side_key: self
237                    .eq_join_predicate
238                    .right_eq_indexes()
239                    .into_iter()
240                    .map(|a| a as _)
241                    .collect(),
242                inner_side_table_desc: Some(self.right_table.table_desc().try_to_protobuf()?),
243                inner_side_column_ids: self
244                    .right_output_column_ids
245                    .iter()
246                    .map(ColumnId::get_id)
247                    .collect(),
248                output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
249                null_safe: self.eq_join_predicate.null_safes(),
250                lookup_prefix_len: self.lookup_prefix_len as u32,
251                query_epoch: to_batch_query_epoch(&self.as_of)?,
252                asof_desc: self.asof_desc,
253            })
254        } else {
255            NodeBody::LocalLookupJoin(LocalLookupJoinNode {
256                join_type: self.core.join_type as i32,
257                condition: self
258                    .eq_join_predicate
259                    .other_cond()
260                    .as_expr_unless_true()
261                    .map(|x| x.to_expr_proto()),
262                outer_side_key: self
263                    .eq_join_predicate
264                    .left_eq_indexes()
265                    .into_iter()
266                    .map(|a| a as _)
267                    .collect(),
268                inner_side_key: self
269                    .eq_join_predicate
270                    .right_eq_indexes()
271                    .into_iter()
272                    .map(|a| a as _)
273                    .collect(),
274                inner_side_table_desc: Some(self.right_table.table_desc().try_to_protobuf()?),
275                inner_side_vnode_mapping: vec![], // To be filled in at local.rs
276                inner_side_column_ids: self
277                    .right_output_column_ids
278                    .iter()
279                    .map(ColumnId::get_id)
280                    .collect(),
281                output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
282                worker_nodes: vec![], // To be filled in at local.rs
283                null_safe: self.eq_join_predicate.null_safes(),
284                lookup_prefix_len: self.lookup_prefix_len as u32,
285                query_epoch: to_batch_query_epoch(&self.as_of)?,
286                asof_desc: self.asof_desc,
287            })
288        })
289    }
290}
291
292impl ToLocalBatch for BatchLookupJoin {
293    fn to_local(&self) -> Result<PlanRef> {
294        let input = RequiredDist::single()
295            .batch_enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
296
297        Ok(self.clone_with_distributed_lookup(input, false).into())
298    }
299}
300
301impl ExprRewritable<Batch> for BatchLookupJoin {
302    fn has_rewritable_expr(&self) -> bool {
303        true
304    }
305
306    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
307        let base = self.base.clone_with_new_plan_id();
308        let mut core = self.core.clone();
309        core.rewrite_exprs(r);
310        Self {
311            base,
312            core,
313            eq_join_predicate: self.eq_join_predicate.rewrite_exprs(r),
314            ..Self::clone(self)
315        }
316        .into()
317    }
318}
319
320impl ExprVisitable for BatchLookupJoin {
321    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
322        self.core.visit_exprs(v);
323    }
324}