risingwave_frontend/optimizer/plan_node/
batch_lookup_join.rs1use std::sync::Arc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::catalog::ColumnId;
19use risingwave_pb::batch_plan::plan_node::NodeBody;
20use risingwave_pb::batch_plan::{DistributedLookupJoinNode, LocalLookupJoinNode};
21use risingwave_pb::plan_common::AsOfJoinDesc;
22use risingwave_sqlparser::ast::AsOf;
23
24use super::batch::prelude::*;
25use super::utils::{Distill, childless_record, to_batch_query_epoch};
26use super::{BatchPlanRef as PlanRef, BatchSeqScan, ExprRewritable, generic};
27use crate::TableCatalog;
28use crate::error::Result;
29use crate::expr::{Expr, ExprRewriter, ExprVisitor};
30use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
31use crate::optimizer::plan_node::utils::IndicesDisplay;
32use crate::optimizer::plan_node::{
33 EqJoinPredicate, EqJoinPredicateDisplay, PlanBase, PlanTreeNodeUnary, ToDistributedBatch,
34 ToLocalBatch, TryToBatchPb,
35};
36use crate::optimizer::property::{Distribution, Order, RequiredDist};
37use crate::scheduler::SchedulerResult;
38use crate::utils::ColIndexMappingRewriteExt;
39
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct BatchLookupJoin {
42 pub base: PlanBase<Batch>,
43 core: generic::Join<PlanRef>,
44
45 right_table: Arc<TableCatalog>,
47
48 right_output_column_ids: Vec<ColumnId>,
50
51 lookup_prefix_len: usize,
53
54 distributed_lookup: bool,
57
58 as_of: Option<AsOf>,
59 asof_desc: Option<AsOfJoinDesc>,
61}
62
63impl BatchLookupJoin {
64 pub fn new(
65 core: generic::Join<PlanRef>,
66 right_table: Arc<TableCatalog>,
67 right_output_column_ids: Vec<ColumnId>,
68 lookup_prefix_len: usize,
69 distributed_lookup: bool,
70 as_of: Option<AsOf>,
71 asof_desc: Option<AsOfJoinDesc>,
72 ) -> Self {
73 let eq_join_predicate = core
76 .on
77 .as_eq_predicate_ref()
78 .expect("BatchLookupJoin requires JoinOn::EqPredicate in core");
79 assert!(eq_join_predicate.has_eq());
80 assert!(eq_join_predicate.eq_keys_are_type_aligned());
81 let dist = Self::derive_dist(core.left.distribution(), &core);
82 let base = PlanBase::new_batch_with_core(&core, dist, Order::any());
83 Self {
84 base,
85 core,
86 right_table,
87 right_output_column_ids,
88 lookup_prefix_len,
89 distributed_lookup,
90 as_of,
91 asof_desc,
92 }
93 }
94
95 fn derive_dist(left: &Distribution, core: &generic::Join<PlanRef>) -> Distribution {
96 match left {
97 Distribution::Single => Distribution::Single,
98 Distribution::HashShard(_) | Distribution::UpstreamHashShard(_, _) => {
99 let l2o = core.l2i_col_mapping().composite(&core.i2o_col_mapping());
100 l2o.rewrite_provided_distribution(left)
101 }
102 _ => unreachable!(),
103 }
104 }
105
106 fn eq_join_predicate(&self) -> &EqJoinPredicate {
107 self.core
108 .on
109 .as_eq_predicate_ref()
110 .expect("BatchLookupJoin should store predicate as EqJoinPredicate")
111 }
112
113 pub fn right_table(&self) -> &TableCatalog {
114 &self.right_table
115 }
116
117 fn clone_with_distributed_lookup(&self, input: PlanRef, distributed_lookup: bool) -> Self {
118 let mut batch_lookup_join = self.clone_with_input(input);
119 batch_lookup_join.distributed_lookup = distributed_lookup;
120 batch_lookup_join
121 }
122
123 pub fn lookup_prefix_len(&self) -> usize {
124 self.lookup_prefix_len
125 }
126}
127
128impl Distill for BatchLookupJoin {
129 fn distill<'a>(&self) -> XmlNode<'a> {
130 let verbose = self.base.ctx().is_explain_verbose();
131 let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
132 vec.push(("type", Pretty::debug(&self.core.join_type)));
133
134 let concat_schema = self.core.concat_schema();
135 vec.push((
136 "predicate",
137 Pretty::debug(&EqJoinPredicateDisplay {
138 eq_join_predicate: self.eq_join_predicate(),
139 input_schema: &concat_schema,
140 }),
141 ));
142
143 if verbose {
144 let data = IndicesDisplay::from_join(&self.core, &concat_schema);
145 vec.push(("output", data));
146 }
147
148 let scan: &BatchSeqScan = self.core.right.as_batch_seq_scan().unwrap();
149
150 vec.push(("lookup table", Pretty::display(&scan.core().table_name())));
151
152 if let Some(as_of) = &self.as_of {
153 vec.push(("as_of", Pretty::debug(as_of)));
154 }
155
156 childless_record("BatchLookupJoin", vec)
157 }
158}
159
160impl PlanTreeNodeUnary<Batch> for BatchLookupJoin {
161 fn input(&self) -> PlanRef {
162 self.core.left.clone()
163 }
164
165 fn clone_with_input(&self, input: PlanRef) -> Self {
167 let mut core = self.core.clone();
168 core.left = input;
169 Self::new(
170 core,
171 self.right_table.clone(),
172 self.right_output_column_ids.clone(),
173 self.lookup_prefix_len,
174 self.distributed_lookup,
175 self.as_of.clone(),
176 self.asof_desc,
177 )
178 }
179}
180
181impl_plan_tree_node_for_unary! { Batch, BatchLookupJoin }
182
183impl ToDistributedBatch for BatchLookupJoin {
184 fn to_distributed(&self) -> Result<PlanRef> {
185 let mut exchange_dist_keys = vec![];
187 let left_eq_indexes = self.eq_join_predicate().left_eq_indexes();
188 let right_table = &self.right_table;
189 for dist_col_index in &right_table.distribution_key {
190 let dist_col_id = right_table.columns[*dist_col_index].column_desc.column_id;
191 let output_pos = self
192 .right_output_column_ids
193 .iter()
194 .position(|p| *p == dist_col_id)
195 .unwrap();
196 let dist_in_eq_indexes = self
197 .eq_join_predicate()
198 .right_eq_indexes()
199 .iter()
200 .position(|col| *col == output_pos)
201 .unwrap();
202 assert!(dist_in_eq_indexes < self.lookup_prefix_len);
203 exchange_dist_keys.push(left_eq_indexes[dist_in_eq_indexes]);
204 }
205
206 assert!(!exchange_dist_keys.is_empty());
207
208 let input = self.input().to_distributed_with_required(
209 &Order::any(),
210 &RequiredDist::PhysicalDist(Distribution::UpstreamHashShard(
211 exchange_dist_keys,
212 self.right_table.id,
213 )),
214 )?;
215
216 Ok(self.clone_with_distributed_lookup(input, true).into())
217 }
218}
219
220impl TryToBatchPb for BatchLookupJoin {
221 fn try_to_batch_prost_body(&self) -> SchedulerResult<NodeBody> {
222 let eq_join_predicate = self.eq_join_predicate();
223 Ok(if self.distributed_lookup {
224 NodeBody::DistributedLookupJoin(DistributedLookupJoinNode {
225 join_type: self.core.join_type as i32,
226 condition: self
227 .eq_join_predicate()
228 .other_cond()
229 .as_expr_unless_true()
230 .map(|x| x.to_expr_proto()),
231 outer_side_key: self
232 .eq_join_predicate()
233 .left_eq_indexes()
234 .into_iter()
235 .map(|a| a as _)
236 .collect(),
237 inner_side_key: self
238 .eq_join_predicate()
239 .right_eq_indexes()
240 .into_iter()
241 .map(|a| a as _)
242 .collect(),
243 inner_side_table_desc: Some(self.right_table.table_desc().try_to_protobuf()?),
244 inner_side_column_ids: self
245 .right_output_column_ids
246 .iter()
247 .map(ColumnId::get_id)
248 .collect(),
249 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
250 null_safe: eq_join_predicate.null_safes(),
251 lookup_prefix_len: self.lookup_prefix_len as u32,
252 query_epoch: to_batch_query_epoch(&self.as_of)?,
253 asof_desc: self.asof_desc,
254 })
255 } else {
256 NodeBody::LocalLookupJoin(LocalLookupJoinNode {
257 join_type: self.core.join_type as i32,
258 condition: self
259 .eq_join_predicate()
260 .other_cond()
261 .as_expr_unless_true()
262 .map(|x| x.to_expr_proto()),
263 outer_side_key: self
264 .eq_join_predicate()
265 .left_eq_indexes()
266 .into_iter()
267 .map(|a| a as _)
268 .collect(),
269 inner_side_key: self
270 .eq_join_predicate()
271 .right_eq_indexes()
272 .into_iter()
273 .map(|a| a as _)
274 .collect(),
275 inner_side_table_desc: Some(self.right_table.table_desc().try_to_protobuf()?),
276 inner_side_vnode_mapping: vec![], inner_side_column_ids: self
278 .right_output_column_ids
279 .iter()
280 .map(ColumnId::get_id)
281 .collect(),
282 output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(),
283 worker_nodes: vec![], null_safe: eq_join_predicate.null_safes(),
285 lookup_prefix_len: self.lookup_prefix_len as u32,
286 query_epoch: to_batch_query_epoch(&self.as_of)?,
287 asof_desc: self.asof_desc,
288 })
289 })
290 }
291}
292
293impl ToLocalBatch for BatchLookupJoin {
294 fn to_local(&self) -> Result<PlanRef> {
295 let input = RequiredDist::single()
296 .batch_enforce_if_not_satisfies(self.input().to_local()?, &Order::any())?;
297
298 Ok(self.clone_with_distributed_lookup(input, false).into())
299 }
300}
301
302impl ExprRewritable<Batch> for BatchLookupJoin {
303 fn has_rewritable_expr(&self) -> bool {
304 true
305 }
306
307 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
308 let base = self.base.clone_with_new_plan_id();
309 let mut core = self.core.clone();
310 core.rewrite_exprs(r);
311 let mut new = self.clone();
312 new.base = base;
313 new.core = core;
314 new.into()
315 }
316}
317
318impl ExprVisitable for BatchLookupJoin {
319 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
320 self.core.visit_exprs(v);
321 }
322}