risingwave_frontend/optimizer/plan_node/
batch_lookup_join.rs1use std::sync::Arc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::catalog::ColumnId;
19use risingwave_pb::batch_plan::plan_node::NodeBody;
20use risingwave_pb::batch_plan::{DistributedLookupJoinNode, LocalLookupJoinNode};
21use risingwave_pb::plan_common::AsOfJoinDesc;
22use risingwave_sqlparser::ast::AsOf;
23
24use super::batch::prelude::*;
25use super::utils::{Distill, childless_record, to_batch_query_epoch};
26use super::{BatchPlanRef as PlanRef, BatchSeqScan, ExprRewritable, generic};
27use crate::TableCatalog;
28use crate::error::Result;
29use crate::expr::{Expr, ExprRewriter, ExprVisitor};
30use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
31use crate::optimizer::plan_node::utils::IndicesDisplay;
32use crate::optimizer::plan_node::{
33 EqJoinPredicate, EqJoinPredicateDisplay, PlanBase, PlanTreeNodeUnary, ToDistributedBatch,
34 ToLocalBatch, TryToBatchPb,
35};
36use crate::optimizer::property::{Distribution, Order, RequiredDist};
37use crate::scheduler::SchedulerResult;
38use crate::utils::ColIndexMappingRewriteExt;
39
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct BatchLookupJoin {
42 pub base: PlanBase<Batch>,
43 core: generic::Join<PlanRef>,
44
45 eq_join_predicate: EqJoinPredicate,
48
49 right_table: Arc<TableCatalog>,
51
52 right_output_column_ids: Vec<ColumnId>,
54
55 lookup_prefix_len: usize,
57
58 distributed_lookup: bool,
61
62 as_of: Option<AsOf>,
63 asof_desc: Option<AsOfJoinDesc>,
65}
66
67impl BatchLookupJoin {
68 pub fn new(
69 core: generic::Join<PlanRef>,
70 eq_join_predicate: EqJoinPredicate,
71 right_table: Arc<TableCatalog>,
72 right_output_column_ids: Vec<ColumnId>,
73 lookup_prefix_len: usize,
74 distributed_lookup: bool,
75 as_of: Option<AsOf>,
76 asof_desc: Option<AsOfJoinDesc>,
77 ) -> Self {
78 assert!(eq_join_predicate.has_eq());
81 assert!(eq_join_predicate.eq_keys_are_type_aligned());
82 let dist = Self::derive_dist(core.left.distribution(), &core);
83 let base = PlanBase::new_batch_with_core(&core, dist, Order::any());
84 Self {
85 base,
86 core,
87 eq_join_predicate,
88 right_table,
89 right_output_column_ids,
90 lookup_prefix_len,
91 distributed_lookup,
92 as_of,
93 asof_desc,
94 }
95 }
96
97 fn derive_dist(left: &Distribution, core: &generic::Join<PlanRef>) -> Distribution {
98 match left {
99 Distribution::Single => Distribution::Single,
100 Distribution::HashShard(_) | Distribution::UpstreamHashShard(_, _) => {
101 let l2o = core.l2i_col_mapping().composite(&core.i2o_col_mapping());
102 l2o.rewrite_provided_distribution(left)
103 }
104 _ => unreachable!(),
105 }
106 }
107
108 fn eq_join_predicate(&self) -> &EqJoinPredicate {
109 &self.eq_join_predicate
110 }
111
112 pub fn right_table(&self) -> &TableCatalog {
113 &self.right_table
114 }
115
116 fn clone_with_distributed_lookup(&self, input: PlanRef, distributed_lookup: bool) -> Self {
117 let mut batch_lookup_join = self.clone_with_input(input);
118 batch_lookup_join.distributed_lookup = distributed_lookup;
119 batch_lookup_join
120 }
121
122 pub fn lookup_prefix_len(&self) -> usize {
123 self.lookup_prefix_len
124 }
125}
126
127impl Distill for BatchLookupJoin {
128 fn distill<'a>(&self) -> XmlNode<'a> {
129 let verbose = self.base.ctx().is_explain_verbose();
130 let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
131 vec.push(("type", Pretty::debug(&self.core.join_type)));
132
133 let concat_schema = self.core.concat_schema();
134 vec.push((
135 "predicate",
136 Pretty::debug(&EqJoinPredicateDisplay {
137 eq_join_predicate: self.eq_join_predicate(),
138 input_schema: &concat_schema,
139 }),
140 ));
141
142 if verbose {
143 let data = IndicesDisplay::from_join(&self.core, &concat_schema);
144 vec.push(("output", data));
145 }
146
147 let scan: &BatchSeqScan = self.core.right.as_batch_seq_scan().unwrap();
148
149 vec.push(("lookup table", Pretty::display(&scan.core().table_name())));
150
151 if let Some(as_of) = &self.as_of {
152 vec.push(("as_of", Pretty::debug(as_of)));
153 }
154
155 childless_record("BatchLookupJoin", vec)
156 }
157}
158
159impl PlanTreeNodeUnary<Batch> for BatchLookupJoin {
160 fn input(&self) -> PlanRef {
161 self.core.left.clone()
162 }
163
164 fn clone_with_input(&self, input: PlanRef) -> Self {
166 let mut core = self.core.clone();
167 core.left = input;
168 Self::new(
169 core,
170 self.eq_join_predicate.clone(),
171 self.right_table.clone(),
172 self.right_output_column_ids.clone(),
173 self.lookup_prefix_len,
174 self.distributed_lookup,
175 self.as_of.clone(),
176 self.asof_desc,
177 )
178 }
179}
180
181impl_plan_tree_node_for_unary! { Batch, BatchLookupJoin }
182
183impl ToDistributedBatch for BatchLookupJoin {
184 fn to_distributed(&self) -> Result<PlanRef> {
185 let mut exchange_dist_keys = vec![];
187 let left_eq_indexes = self.eq_join_predicate.left_eq_indexes();
188 let right_table = &self.right_table;
189 for dist_col_index in &right_table.distribution_key {
190 let dist_col_id = right_table.columns[*dist_col_index].column_desc.column_id;
191 let output_pos = self
192 .right_output_column_ids
193 .iter()
194 .position(|p| *p == dist_col_id)
195 .unwrap();
196 let dist_in_eq_indexes = self
197 .eq_join_predicate
198 .right_eq_indexes()
199 .iter()
200 .position(|col| *col == output_pos)
201 .unwrap();
202 assert!(dist_in_eq_indexes < self.lookup_prefix_len);
203 exchange_dist_keys.push(left_eq_indexes[dist_in_eq_indexes]);
204 }
205
206 assert!(!exchange_dist_keys.is_empty());
207
208 let input = self.input().to_distributed_with_required(
209 &Order::any(),
210 &RequiredDist::PhysicalDist(Distribution::UpstreamHashShard(
211 exchange_dist_keys,
212 self.right_table.id,
213 )),
214 )?;
215
216 Ok(self.clone_with_distributed_lookup(input, true).into())
217 }
218}
219
220impl TryToBatchPb for BatchLookupJoin {
221 fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
222 Ok(if self.distributed_lookup {
223 NodeBody::DistributedLookupJoin(DistributedLookupJoinNode {
224 join_type: self.core.join_type as i32,
225 condition: self
226 .eq_join_predicate
227 .other_cond()
228 .as_expr_unless_true()
229 .map(|x| x.to_expr_proto()),
230 outer_side_key: self
231 .eq_join_predicate
232 .left_eq_indexes()
233 .into_iter()
234 .map(|a| a as _)
235 .collect(),
236 inner_side_key: self
237 .eq_join_predicate
238 .right_eq_indexes()
239 .into_iter()
240 .map(|a| a as _)
241 .collect(),
242 inner_side_table_desc: Some(self.right_table.table_desc().try_to_protobuf()?),
243 inner_side_column_ids: self
244 .right_output_column_ids
245 .iter()
246 .map(ColumnId::get_id)
247 .collect(),
248 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
249 null_safe: self.eq_join_predicate.null_safes(),
250 lookup_prefix_len: self.lookup_prefix_len as u32,
251 query_epoch: to_batch_query_epoch(&self.as_of)?,
252 asof_desc: self.asof_desc,
253 })
254 } else {
255 NodeBody::LocalLookupJoin(LocalLookupJoinNode {
256 join_type: self.core.join_type as i32,
257 condition: self
258 .eq_join_predicate
259 .other_cond()
260 .as_expr_unless_true()
261 .map(|x| x.to_expr_proto()),
262 outer_side_key: self
263 .eq_join_predicate
264 .left_eq_indexes()
265 .into_iter()
266 .map(|a| a as _)
267 .collect(),
268 inner_side_key: self
269 .eq_join_predicate
270 .right_eq_indexes()
271 .into_iter()
272 .map(|a| a as _)
273 .collect(),
274 inner_side_table_desc: Some(self.right_table.table_desc().try_to_protobuf()?),
275 inner_side_vnode_mapping: vec![], inner_side_column_ids: self
277 .right_output_column_ids
278 .iter()
279 .map(ColumnId::get_id)
280 .collect(),
281 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
282 worker_nodes: vec![], null_safe: self.eq_join_predicate.null_safes(),
284 lookup_prefix_len: self.lookup_prefix_len as u32,
285 query_epoch: to_batch_query_epoch(&self.as_of)?,
286 asof_desc: self.asof_desc,
287 })
288 })
289 }
290}
291
292impl ToLocalBatch for BatchLookupJoin {
293 fn to_local(&self) -> Result<PlanRef> {
294 let input = RequiredDist::single()
295 .batch_enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
296
297 Ok(self.clone_with_distributed_lookup(input, false).into())
298 }
299}
300
301impl ExprRewritable<Batch> for BatchLookupJoin {
302 fn has_rewritable_expr(&self) -> bool {
303 true
304 }
305
306 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
307 let base = self.base.clone_with_new_plan_id();
308 let mut core = self.core.clone();
309 core.rewrite_exprs(r);
310 Self {
311 base,
312 core,
313 eq_join_predicate: self.eq_join_predicate.rewrite_exprs(r),
314 ..Self::clone(self)
315 }
316 .into()
317 }
318}
319
320impl ExprVisitable for BatchLookupJoin {
321 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
322 self.core.visit_exprs(v);
323 }
324}