1use pretty_xmlish::{Pretty, XmlNode};
17use risingwave_common::catalog::Schema;
18use risingwave_pb::plan_common::JoinType;
19
20use super::generic::{
21 self, GenericPlanNode, GenericPlanRef, push_down_into_join, push_down_join_condition,
22};
23use super::utils::{Distill, childless_record};
24use super::{
25 BatchPlanRef, ColPrunable, Logical, LogicalJoin, LogicalPlanRef as PlanRef, LogicalProject,
26 PlanBase, PlanTreeNodeBinary, PredicatePushdown, StreamPlanRef, ToBatch, ToStream,
27};
28use crate::error::{ErrorCode, Result, RwError};
29use crate::expr::{CorrelatedId, Expr, ExprImpl, ExprRewriter, ExprVisitor, InputRef};
30use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
31use crate::optimizer::plan_node::{
32 ColumnPruningContext, ExprRewritable, LogicalFilter, PredicatePushdownContext,
33 RewriteStreamContext, ToStreamContext,
34};
35use crate::optimizer::property::FunctionalDependencySet;
36use crate::utils::{ColIndexMapping, Condition, ConditionDisplay};
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct LogicalApply {
42 pub base: PlanBase<Logical>,
43 left: PlanRef,
44 right: PlanRef,
45 on: Condition,
46 join_type: JoinType,
47
48 correlated_id: CorrelatedId,
51 correlated_indices: Vec<usize>,
53 max_one_row: bool,
56
57 translated: bool,
60}
61
62impl Distill for LogicalApply {
63 fn distill<'a>(&self) -> XmlNode<'a> {
64 let mut vec = Vec::with_capacity(if self.max_one_row { 4 } else { 3 });
65 vec.push(("type", Pretty::debug(&self.join_type)));
66
67 let concat_schema = self.concat_schema();
68 let cond = Pretty::debug(&ConditionDisplay {
69 condition: &self.on,
70 input_schema: &concat_schema,
71 });
72 vec.push(("on", cond));
73
74 vec.push(("correlated_id", Pretty::debug(&self.correlated_id)));
75 if self.max_one_row {
76 vec.push(("max_one_row", Pretty::debug(&true)));
77 }
78
79 childless_record("LogicalApply", vec)
80 }
81}
82
83impl LogicalApply {
84 pub(crate) fn new(
85 left: PlanRef,
86 right: PlanRef,
87 join_type: JoinType,
88 on: Condition,
89 correlated_id: CorrelatedId,
90 correlated_indices: Vec<usize>,
91 max_one_row: bool,
92 translated: bool,
93 ) -> Self {
94 let ctx = left.ctx();
95 let join_core = generic::Join::with_full_output(left, right, join_type, on);
96 let schema = join_core.schema();
97 let stream_key = join_core.stream_key();
98 let functional_dependency = match &stream_key {
99 Some(stream_key) => FunctionalDependencySet::with_key(schema.len(), stream_key),
100 None => FunctionalDependencySet::new(schema.len()),
101 };
102 let (left, right, on, join_type, _output_indices) = join_core.decompose();
103 let base = PlanBase::new_logical(ctx, schema, stream_key, functional_dependency);
104 LogicalApply {
105 base,
106 left,
107 right,
108 on,
109 join_type,
110 correlated_id,
111 correlated_indices,
112 max_one_row,
113 translated,
114 }
115 }
116
117 pub fn create(
118 left: PlanRef,
119 right: PlanRef,
120 join_type: JoinType,
121 on: Condition,
122 correlated_id: CorrelatedId,
123 correlated_indices: Vec<usize>,
124 max_one_row: bool,
125 ) -> PlanRef {
126 Self::new(
127 left,
128 right,
129 join_type,
130 on,
131 correlated_id,
132 correlated_indices,
133 max_one_row,
134 false,
135 )
136 .into()
137 }
138
139 pub fn join_type(&self) -> JoinType {
141 self.join_type
142 }
143
144 pub fn decompose(
145 self,
146 ) -> (
147 PlanRef,
148 PlanRef,
149 Condition,
150 JoinType,
151 CorrelatedId,
152 Vec<usize>,
153 bool,
154 ) {
155 (
156 self.left,
157 self.right,
158 self.on,
159 self.join_type,
160 self.correlated_id,
161 self.correlated_indices,
162 self.max_one_row,
163 )
164 }
165
166 pub fn correlated_id(&self) -> CorrelatedId {
167 self.correlated_id
168 }
169
170 pub fn correlated_indices(&self) -> Vec<usize> {
171 self.correlated_indices.clone()
172 }
173
174 pub fn on_condition(&self) -> &Condition {
175 &self.on
176 }
177
178 pub fn translated(&self) -> bool {
179 self.translated
180 }
181
182 pub fn max_one_row(&self) -> bool {
183 self.max_one_row
184 }
185
186 pub fn translate_apply(self, domain: PlanRef, eq_predicates: Vec<ExprImpl>) -> PlanRef {
208 let (
209 apply_left,
210 apply_right,
211 on,
212 apply_type,
213 correlated_id,
214 correlated_indices,
215 max_one_row,
216 ) = self.decompose();
217 let apply_left_len = apply_left.schema().len();
218 let correlated_indices_len = correlated_indices.len();
219
220 let new_apply = LogicalApply::new(
221 domain,
222 apply_right,
223 JoinType::Inner,
224 Condition::true_cond(),
225 correlated_id,
226 correlated_indices,
227 max_one_row,
228 true,
229 )
230 .into();
231
232 let on = Self::rewrite_on(on, correlated_indices_len, apply_left_len).and(Condition {
233 conjunctions: eq_predicates,
234 });
235 let new_join = LogicalJoin::new(apply_left, new_apply, apply_type, on);
236
237 if new_join.join_type() == JoinType::LeftSemi {
238 new_join.into()
240 } else {
241 let mut exprs: Vec<ExprImpl> = vec![];
244 new_join
245 .schema()
246 .data_types()
247 .into_iter()
248 .enumerate()
249 .for_each(|(index, data_type)| {
250 if index < apply_left_len || index >= apply_left_len + correlated_indices_len {
251 exprs.push(InputRef::new(index, data_type).into());
252 }
253 });
254 LogicalProject::create(new_join.into(), exprs)
255 }
256 }
257
258 fn rewrite_on(on: Condition, offset: usize, apply_left_len: usize) -> Condition {
259 struct Rewriter {
260 offset: usize,
261 apply_left_len: usize,
262 }
263 impl ExprRewriter for Rewriter {
264 fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
265 let index = input_ref.index();
266 if index >= self.apply_left_len {
267 InputRef::new(index + self.offset, input_ref.return_type()).into()
268 } else {
269 input_ref.into()
270 }
271 }
272 }
273 let mut rewriter = Rewriter {
274 offset,
275 apply_left_len,
276 };
277 on.rewrite_expr(&mut rewriter)
278 }
279
280 fn concat_schema(&self) -> Schema {
281 let mut concat_schema = self.left().schema().fields.clone();
282 concat_schema.extend(self.right().schema().fields.clone());
283 Schema::new(concat_schema)
284 }
285}
286
287impl PlanTreeNodeBinary<Logical> for LogicalApply {
288 fn left(&self) -> PlanRef {
289 self.left.clone()
290 }
291
292 fn right(&self) -> PlanRef {
293 self.right.clone()
294 }
295
296 fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
297 Self::new(
298 left,
299 right,
300 self.join_type,
301 self.on.clone(),
302 self.correlated_id,
303 self.correlated_indices.clone(),
304 self.max_one_row,
305 self.translated,
306 )
307 }
308}
309
310impl_plan_tree_node_for_binary! { Logical, LogicalApply }
311
312impl ColPrunable for LogicalApply {
313 fn prune_col(&self, _required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
314 panic!("LogicalApply should be unnested")
315 }
316}
317
318impl ExprRewritable<Logical> for LogicalApply {
319 fn has_rewritable_expr(&self) -> bool {
320 true
321 }
322
323 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
324 let mut new = self.clone();
325 new.on = new.on.rewrite_expr(r);
326 new.base = new.base.clone_with_new_plan_id();
327 new.into()
328 }
329}
330
331impl ExprVisitable for LogicalApply {
332 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
333 self.on.visit_expr(v)
334 }
335}
336
337impl PredicatePushdown for LogicalApply {
338 fn predicate_pushdown(
339 &self,
340 mut predicate: Condition,
341 ctx: &mut PredicatePushdownContext,
342 ) -> PlanRef {
343 let left_col_num = self.left().schema().len();
344 let right_col_num = self.right().schema().len();
345 let join_type = self.join_type();
346
347 let (left_from_filter, right_from_filter, on) =
348 push_down_into_join(&mut predicate, left_col_num, right_col_num, join_type, true);
349
350 let mut new_on = self.on.clone().and(on);
351 let (left_from_on, right_from_on) =
352 push_down_join_condition(&mut new_on, left_col_num, right_col_num, join_type, true);
353
354 let left_predicate = left_from_filter.and(left_from_on);
355 let right_predicate = right_from_filter.and(right_from_on);
356
357 let new_left = self.left().predicate_pushdown(left_predicate, ctx);
358 let new_right = self.right().predicate_pushdown(right_predicate, ctx);
359
360 let new_apply = LogicalApply::create(
361 new_left,
362 new_right,
363 join_type,
364 new_on,
365 self.correlated_id,
366 self.correlated_indices.clone(),
367 self.max_one_row,
368 );
369 LogicalFilter::create(new_apply, predicate)
370 }
371}
372
373impl ToBatch for LogicalApply {
374 fn to_batch(&self) -> Result<BatchPlanRef> {
375 Err(RwError::from(ErrorCode::InternalError(
376 "LogicalApply should be unnested".to_owned(),
377 )))
378 }
379}
380
381impl ToStream for LogicalApply {
382 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<StreamPlanRef> {
383 Err(RwError::from(ErrorCode::InternalError(
384 "LogicalApply should be unnested".to_owned(),
385 )))
386 }
387
388 fn logical_rewrite_for_stream(
389 &self,
390 _ctx: &mut RewriteStreamContext,
391 ) -> Result<(PlanRef, ColIndexMapping)> {
392 Err(RwError::from(ErrorCode::InternalError(
393 "LogicalApply should be unnested".to_owned(),
394 )))
395 }
396}