risingwave_frontend/optimizer/plan_node/
batch_lookup_join.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::catalog::ColumnId;
19use risingwave_pb::batch_plan::plan_node::NodeBody;
20use risingwave_pb::batch_plan::{DistributedLookupJoinNode, LocalLookupJoinNode};
21use risingwave_pb::plan_common::AsOfJoinDesc;
22use risingwave_sqlparser::ast::AsOf;
23
24use super::batch::prelude::*;
25use super::utils::{Distill, childless_record, to_batch_query_epoch};
26use super::{BatchPlanRef as PlanRef, BatchSeqScan, ExprRewritable, generic};
27use crate::TableCatalog;
28use crate::error::Result;
29use crate::expr::{Expr, ExprRewriter, ExprVisitor};
30use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
31use crate::optimizer::plan_node::utils::IndicesDisplay;
32use crate::optimizer::plan_node::{
33    EqJoinPredicate, EqJoinPredicateDisplay, PlanBase, PlanTreeNodeUnary, ToDistributedBatch,
34    ToLocalBatch, TryToBatchPb,
35};
36use crate::optimizer::property::{Distribution, Order, RequiredDist};
37use crate::scheduler::SchedulerResult;
38use crate::utils::ColIndexMappingRewriteExt;
39
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct BatchLookupJoin {
42    pub base: PlanBase<Batch>,
43    core: generic::Join<PlanRef>,
44
45    /// Table description of the right side table
46    right_table: Arc<TableCatalog>,
47
48    /// Output column ids of the right side table
49    right_output_column_ids: Vec<ColumnId>,
50
51    /// The prefix length of the order key of right side table.
52    lookup_prefix_len: usize,
53
54    /// If `distributed_lookup` is true, it will generate `DistributedLookupJoinNode` for
55    /// `ToBatchPb`. Otherwise, it will generate `LookupJoinNode`.
56    distributed_lookup: bool,
57
58    as_of: Option<AsOf>,
59    // `AsOf` join description
60    asof_desc: Option<AsOfJoinDesc>,
61}
62
63impl BatchLookupJoin {
64    pub fn new(
65        core: generic::Join<PlanRef>,
66        right_table: Arc<TableCatalog>,
67        right_output_column_ids: Vec<ColumnId>,
68        lookup_prefix_len: usize,
69        distributed_lookup: bool,
70        as_of: Option<AsOf>,
71        asof_desc: Option<AsOfJoinDesc>,
72    ) -> Self {
73        // We cannot create a `BatchLookupJoin` without any eq keys. We require eq keys to do the
74        // lookup.
75        let eq_join_predicate = core
76            .on
77            .as_eq_predicate_ref()
78            .expect("BatchLookupJoin requires JoinOn::EqPredicate in core");
79        assert!(eq_join_predicate.has_eq());
80        assert!(eq_join_predicate.eq_keys_are_type_aligned());
81        let dist = Self::derive_dist(core.left.distribution(), &core);
82        let base = PlanBase::new_batch_with_core(&core, dist, Order::any());
83        Self {
84            base,
85            core,
86            right_table,
87            right_output_column_ids,
88            lookup_prefix_len,
89            distributed_lookup,
90            as_of,
91            asof_desc,
92        }
93    }
94
95    fn derive_dist(left: &Distribution, core: &generic::Join<PlanRef>) -> Distribution {
96        match left {
97            Distribution::Single => Distribution::Single,
98            Distribution::HashShard(_) | Distribution::UpstreamHashShard(_, _) => {
99                let l2o = core.l2i_col_mapping().composite(&core.i2o_col_mapping());
100                l2o.rewrite_provided_distribution(left)
101            }
102            _ => unreachable!(),
103        }
104    }
105
106    fn eq_join_predicate(&self) -> &EqJoinPredicate {
107        self.core
108            .on
109            .as_eq_predicate_ref()
110            .expect("BatchLookupJoin should store predicate as EqJoinPredicate")
111    }
112
113    pub fn right_table(&self) -> &TableCatalog {
114        &self.right_table
115    }
116
117    fn clone_with_distributed_lookup(&self, input: PlanRef, distributed_lookup: bool) -> Self {
118        let mut batch_lookup_join = self.clone_with_input(input);
119        batch_lookup_join.distributed_lookup = distributed_lookup;
120        batch_lookup_join
121    }
122
123    pub fn lookup_prefix_len(&self) -> usize {
124        self.lookup_prefix_len
125    }
126}
127
128impl Distill for BatchLookupJoin {
129    fn distill<'a>(&self) -> XmlNode<'a> {
130        let verbose = self.base.ctx().is_explain_verbose();
131        let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
132        vec.push(("type", Pretty::debug(&self.core.join_type)));
133
134        let concat_schema = self.core.concat_schema();
135        vec.push((
136            "predicate",
137            Pretty::debug(&EqJoinPredicateDisplay {
138                eq_join_predicate: self.eq_join_predicate(),
139                input_schema: &concat_schema,
140            }),
141        ));
142
143        if verbose {
144            let data = IndicesDisplay::from_join(&self.core, &concat_schema);
145            vec.push(("output", data));
146        }
147
148        let scan: &BatchSeqScan = self.core.right.as_batch_seq_scan().unwrap();
149
150        vec.push(("lookup table", Pretty::display(&scan.core().table_name())));
151
152        if let Some(as_of) = &self.as_of {
153            vec.push(("as_of", Pretty::debug(as_of)));
154        }
155
156        childless_record("BatchLookupJoin", vec)
157    }
158}
159
160impl PlanTreeNodeUnary<Batch> for BatchLookupJoin {
161    fn input(&self) -> PlanRef {
162        self.core.left.clone()
163    }
164
165    // Only change left side
166    fn clone_with_input(&self, input: PlanRef) -> Self {
167        let mut core = self.core.clone();
168        core.left = input;
169        Self::new(
170            core,
171            self.right_table.clone(),
172            self.right_output_column_ids.clone(),
173            self.lookup_prefix_len,
174            self.distributed_lookup,
175            self.as_of.clone(),
176            self.asof_desc,
177        )
178    }
179}
180
181impl_plan_tree_node_for_unary! { Batch, BatchLookupJoin }
182
183impl ToDistributedBatch for BatchLookupJoin {
184    fn to_distributed(&self) -> Result<PlanRef> {
185        // Align left distribution keys with the right table.
186        let mut exchange_dist_keys = vec![];
187        let left_eq_indexes = self.eq_join_predicate().left_eq_indexes();
188        let right_table = &self.right_table;
189        for dist_col_index in &right_table.distribution_key {
190            let dist_col_id = right_table.columns[*dist_col_index].column_desc.column_id;
191            let output_pos = self
192                .right_output_column_ids
193                .iter()
194                .position(|p| *p == dist_col_id)
195                .unwrap();
196            let dist_in_eq_indexes = self
197                .eq_join_predicate()
198                .right_eq_indexes()
199                .iter()
200                .position(|col| *col == output_pos)
201                .unwrap();
202            assert!(dist_in_eq_indexes < self.lookup_prefix_len);
203            exchange_dist_keys.push(left_eq_indexes[dist_in_eq_indexes]);
204        }
205
206        assert!(!exchange_dist_keys.is_empty());
207
208        let input = self.input().to_distributed_with_required(
209            &Order::any(),
210            &RequiredDist::PhysicalDist(Distribution::UpstreamHashShard(
211                exchange_dist_keys,
212                self.right_table.id,
213            )),
214        )?;
215
216        Ok(self.clone_with_distributed_lookup(input, true).into())
217    }
218}
219
220impl TryToBatchPb for BatchLookupJoin {
221    fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
222        let eq_join_predicate = self.eq_join_predicate();
223        Ok(if self.distributed_lookup {
224            NodeBody::DistributedLookupJoin(DistributedLookupJoinNode {
225                join_type: self.core.join_type as i32,
226                condition: self
227                    .eq_join_predicate()
228                    .other_cond()
229                    .as_expr_unless_true()
230                    .map(|x| x.to_expr_proto()),
231                outer_side_key: self
232                    .eq_join_predicate()
233                    .left_eq_indexes()
234                    .into_iter()
235                    .map(|a| a as _)
236                    .collect(),
237                inner_side_key: self
238                    .eq_join_predicate()
239                    .right_eq_indexes()
240                    .into_iter()
241                    .map(|a| a as _)
242                    .collect(),
243                inner_side_table_desc: Some(self.right_table.table_desc().try_to_protobuf()?),
244                inner_side_column_ids: self
245                    .right_output_column_ids
246                    .iter()
247                    .map(ColumnId::get_id)
248                    .collect(),
249                output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
250                null_safe: eq_join_predicate.null_safes(),
251                lookup_prefix_len: self.lookup_prefix_len as u32,
252                query_epoch: to_batch_query_epoch(&self.as_of)?,
253                asof_desc: self.asof_desc,
254            })
255        } else {
256            NodeBody::LocalLookupJoin(LocalLookupJoinNode {
257                join_type: self.core.join_type as i32,
258                condition: self
259                    .eq_join_predicate()
260                    .other_cond()
261                    .as_expr_unless_true()
262                    .map(|x| x.to_expr_proto()),
263                outer_side_key: self
264                    .eq_join_predicate()
265                    .left_eq_indexes()
266                    .into_iter()
267                    .map(|a| a as _)
268                    .collect(),
269                inner_side_key: self
270                    .eq_join_predicate()
271                    .right_eq_indexes()
272                    .into_iter()
273                    .map(|a| a as _)
274                    .collect(),
275                inner_side_table_desc: Some(self.right_table.table_desc().try_to_protobuf()?),
276                inner_side_vnode_mapping: vec![], // To be filled in at local.rs
277                inner_side_column_ids: self
278                    .right_output_column_ids
279                    .iter()
280                    .map(ColumnId::get_id)
281                    .collect(),
282                output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
283                worker_nodes: vec![], // To be filled in at local.rs
284                null_safe: eq_join_predicate.null_safes(),
285                lookup_prefix_len: self.lookup_prefix_len as u32,
286                query_epoch: to_batch_query_epoch(&self.as_of)?,
287                asof_desc: self.asof_desc,
288            })
289        })
290    }
291}
292
293impl ToLocalBatch for BatchLookupJoin {
294    fn to_local(&self) -> Result<PlanRef> {
295        let input = RequiredDist::single()
296            .batch_enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
297
298        Ok(self.clone_with_distributed_lookup(input, false).into())
299    }
300}
301
302impl ExprRewritable<Batch> for BatchLookupJoin {
303    fn has_rewritable_expr(&self) -> bool {
304        true
305    }
306
307    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
308        let base = self.base.clone_with_new_plan_id();
309        let mut core = self.core.clone();
310        core.rewrite_exprs(r);
311        let mut new = self.clone();
312        new.base = base;
313        new.core = core;
314        new.into()
315    }
316}
317
318impl ExprVisitable for BatchLookupJoin {
319    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
320        self.core.visit_exprs(v);
321    }
322}