risingwave_frontend/optimizer/plan_node/
batch_lookup_join.rs1use pretty_xmlish::{Pretty, XmlNode};
16use risingwave_common::catalog::{ColumnId, TableDesc};
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18use risingwave_pb::batch_plan::{DistributedLookupJoinNode, LocalLookupJoinNode};
19use risingwave_pb::plan_common::AsOfJoinDesc;
20use risingwave_sqlparser::ast::AsOf;
21
22use super::batch::prelude::*;
23use super::utils::{Distill, childless_record, to_pb_time_travel_as_of};
24use super::{ExprRewritable, generic};
25use crate::error::Result;
26use crate::expr::{Expr, ExprRewriter, ExprVisitor};
27use crate::optimizer::PlanRef;
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::utils::IndicesDisplay;
30use crate::optimizer::plan_node::{
31 EqJoinPredicate, EqJoinPredicateDisplay, LogicalScan, PlanBase, PlanTreeNodeUnary,
32 ToDistributedBatch, ToLocalBatch, TryToBatchPb,
33};
34use crate::optimizer::property::{Distribution, Order, RequiredDist};
35use crate::scheduler::SchedulerResult;
36use crate::utils::ColIndexMappingRewriteExt;
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct BatchLookupJoin {
40 pub base: PlanBase<Batch>,
41 core: generic::Join<PlanRef>,
42
43 eq_join_predicate: EqJoinPredicate,
46
47 right_table_desc: TableDesc,
49
50 right_output_column_ids: Vec<ColumnId>,
52
53 lookup_prefix_len: usize,
55
56 distributed_lookup: bool,
59
60 as_of: Option<AsOf>,
61 asof_desc: Option<AsOfJoinDesc>,
63}
64
65impl BatchLookupJoin {
66 pub fn new(
67 core: generic::Join<PlanRef>,
68 eq_join_predicate: EqJoinPredicate,
69 right_table_desc: TableDesc,
70 right_output_column_ids: Vec<ColumnId>,
71 lookup_prefix_len: usize,
72 distributed_lookup: bool,
73 as_of: Option<AsOf>,
74 asof_desc: Option<AsOfJoinDesc>,
75 ) -> Self {
76 assert!(eq_join_predicate.has_eq());
79 assert!(eq_join_predicate.eq_keys_are_type_aligned());
80 let dist = Self::derive_dist(core.left.distribution(), &core);
81 let base = PlanBase::new_batch_with_core(&core, dist, Order::any());
82 Self {
83 base,
84 core,
85 eq_join_predicate,
86 right_table_desc,
87 right_output_column_ids,
88 lookup_prefix_len,
89 distributed_lookup,
90 as_of,
91 asof_desc,
92 }
93 }
94
95 fn derive_dist(left: &Distribution, core: &generic::Join<PlanRef>) -> Distribution {
96 match left {
97 Distribution::Single => Distribution::Single,
98 Distribution::HashShard(_) | Distribution::UpstreamHashShard(_, _) => {
99 let l2o = core.l2i_col_mapping().composite(&core.i2o_col_mapping());
100 l2o.rewrite_provided_distribution(left)
101 }
102 _ => unreachable!(),
103 }
104 }
105
106 fn eq_join_predicate(&self) -> &EqJoinPredicate {
107 &self.eq_join_predicate
108 }
109
110 pub fn right_table_desc(&self) -> &TableDesc {
111 &self.right_table_desc
112 }
113
114 fn clone_with_distributed_lookup(&self, input: PlanRef, distributed_lookup: bool) -> Self {
115 let mut batch_lookup_join = self.clone_with_input(input);
116 batch_lookup_join.distributed_lookup = distributed_lookup;
117 batch_lookup_join
118 }
119
120 pub fn lookup_prefix_len(&self) -> usize {
121 self.lookup_prefix_len
122 }
123}
124
125impl Distill for BatchLookupJoin {
126 fn distill<'a>(&self) -> XmlNode<'a> {
127 let verbose = self.base.ctx().is_explain_verbose();
128 let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
129 vec.push(("type", Pretty::debug(&self.core.join_type)));
130
131 let concat_schema = self.core.concat_schema();
132 vec.push((
133 "predicate",
134 Pretty::debug(&EqJoinPredicateDisplay {
135 eq_join_predicate: self.eq_join_predicate(),
136 input_schema: &concat_schema,
137 }),
138 ));
139
140 if verbose {
141 let data = IndicesDisplay::from_join(&self.core, &concat_schema);
142 vec.push(("output", data));
143 }
144
145 if let Some(scan) = self.core.right.as_logical_scan() {
146 let scan: &LogicalScan = scan;
147 vec.push(("lookup table", Pretty::display(&scan.table_name())));
148 }
149
150 childless_record("BatchLookupJoin", vec)
151 }
152}
153
154impl PlanTreeNodeUnary for BatchLookupJoin {
155 fn input(&self) -> PlanRef {
156 self.core.left.clone()
157 }
158
159 fn clone_with_input(&self, input: PlanRef) -> Self {
161 let mut core = self.core.clone();
162 core.left = input;
163 Self::new(
164 core,
165 self.eq_join_predicate.clone(),
166 self.right_table_desc.clone(),
167 self.right_output_column_ids.clone(),
168 self.lookup_prefix_len,
169 self.distributed_lookup,
170 self.as_of.clone(),
171 self.asof_desc,
172 )
173 }
174}
175
176impl_plan_tree_node_for_unary! { BatchLookupJoin }
177
178impl ToDistributedBatch for BatchLookupJoin {
179 fn to_distributed(&self) -> Result<PlanRef> {
180 let mut exchange_dist_keys = vec![];
182 let left_eq_indexes = self.eq_join_predicate.left_eq_indexes();
183 let right_table_desc = self.right_table_desc();
184 for dist_col_index in &right_table_desc.distribution_key {
185 let dist_col_id = right_table_desc.columns[*dist_col_index].column_id;
186 let output_pos = self
187 .right_output_column_ids
188 .iter()
189 .position(|p| *p == dist_col_id)
190 .unwrap();
191 let dist_in_eq_indexes = self
192 .eq_join_predicate
193 .right_eq_indexes()
194 .iter()
195 .position(|col| *col == output_pos)
196 .unwrap();
197 assert!(dist_in_eq_indexes < self.lookup_prefix_len);
198 exchange_dist_keys.push(left_eq_indexes[dist_in_eq_indexes]);
199 }
200
201 assert!(!exchange_dist_keys.is_empty());
202
203 let input = self.input().to_distributed_with_required(
204 &Order::any(),
205 &RequiredDist::PhysicalDist(Distribution::UpstreamHashShard(
206 exchange_dist_keys,
207 self.right_table_desc.table_id,
208 )),
209 )?;
210
211 Ok(self.clone_with_distributed_lookup(input, true).into())
212 }
213}
214
215impl TryToBatchPb for BatchLookupJoin {
216 fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
217 Ok(if self.distributed_lookup {
218 NodeBody::DistributedLookupJoin(DistributedLookupJoinNode {
219 join_type: self.core.join_type as i32,
220 condition: self
221 .eq_join_predicate
222 .other_cond()
223 .as_expr_unless_true()
224 .map(|x| x.to_expr_proto()),
225 outer_side_key: self
226 .eq_join_predicate
227 .left_eq_indexes()
228 .into_iter()
229 .map(|a| a as _)
230 .collect(),
231 inner_side_key: self
232 .eq_join_predicate
233 .right_eq_indexes()
234 .into_iter()
235 .map(|a| a as _)
236 .collect(),
237 inner_side_table_desc: Some(self.right_table_desc.try_to_protobuf()?),
238 inner_side_column_ids: self
239 .right_output_column_ids
240 .iter()
241 .map(ColumnId::get_id)
242 .collect(),
243 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
244 null_safe: self.eq_join_predicate.null_safes(),
245 lookup_prefix_len: self.lookup_prefix_len as u32,
246 as_of: to_pb_time_travel_as_of(&self.as_of)?,
247 asof_desc: self.asof_desc,
248 })
249 } else {
250 NodeBody::LocalLookupJoin(LocalLookupJoinNode {
251 join_type: self.core.join_type as i32,
252 condition: self
253 .eq_join_predicate
254 .other_cond()
255 .as_expr_unless_true()
256 .map(|x| x.to_expr_proto()),
257 outer_side_key: self
258 .eq_join_predicate
259 .left_eq_indexes()
260 .into_iter()
261 .map(|a| a as _)
262 .collect(),
263 inner_side_key: self
264 .eq_join_predicate
265 .right_eq_indexes()
266 .into_iter()
267 .map(|a| a as _)
268 .collect(),
269 inner_side_table_desc: Some(self.right_table_desc.try_to_protobuf()?),
270 inner_side_vnode_mapping: vec![], inner_side_column_ids: self
272 .right_output_column_ids
273 .iter()
274 .map(ColumnId::get_id)
275 .collect(),
276 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
277 worker_nodes: vec![], null_safe: self.eq_join_predicate.null_safes(),
279 lookup_prefix_len: self.lookup_prefix_len as u32,
280 as_of: to_pb_time_travel_as_of(&self.as_of)?,
281 asof_desc: self.asof_desc,
282 })
283 })
284 }
285}
286
287impl ToLocalBatch for BatchLookupJoin {
288 fn to_local(&self) -> Result<PlanRef> {
289 let input = RequiredDist::single()
290 .enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
291
292 Ok(self.clone_with_distributed_lookup(input, false).into())
293 }
294}
295
296impl ExprRewritable for BatchLookupJoin {
297 fn has_rewritable_expr(&self) -> bool {
298 true
299 }
300
301 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
302 let base = self.base.clone_with_new_plan_id();
303 let mut core = self.core.clone();
304 core.rewrite_exprs(r);
305 Self {
306 base,
307 core,
308 eq_join_predicate: self.eq_join_predicate.rewrite_exprs(r),
309 ..Self::clone(self)
310 }
311 .into()
312 }
313}
314
315impl ExprVisitable for BatchLookupJoin {
316 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
317 self.core.visit_exprs(v);
318 }
319}