risingwave_frontend/optimizer/rule/
apply_project_set_transpose_rule.rs1use itertools::Itertools;
16use risingwave_pb::plan_common::JoinType;
17
18use super::{ApplyOffsetRewriter, BoxedRule, Rule};
19use crate::expr::{ExprImpl, ExprRewriter, InputRef};
20use crate::optimizer::PlanRef;
21use crate::optimizer::plan_node::generic::GenericPlanRef;
22use crate::optimizer::plan_node::{LogicalApply, LogicalProject, LogicalProjectSet};
23
24pub struct ApplyProjectSetTransposeRule {}
48impl Rule for ApplyProjectSetTransposeRule {
49 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
50 let apply: &LogicalApply = plan.as_logical_apply()?;
51 let (left, right, on, join_type, correlated_id, correlated_indices, max_one_row) =
52 apply.clone().decompose();
53 let project_set: &LogicalProjectSet = right.as_logical_project_set()?;
54 let left_schema_len = left.schema().len();
55 assert_eq!(join_type, JoinType::Inner);
56
57 let mut exprs: Vec<ExprImpl> = left
60 .schema()
61 .data_types()
62 .into_iter()
63 .enumerate()
64 .map(|(index, data_type)| InputRef::new(index, data_type).into())
65 .collect();
66
67 let (proj_exprs, proj_input) = project_set.clone().decompose();
68
69 let mut rewriter =
71 ApplyOffsetRewriter::new(left.schema().len(), &correlated_indices, correlated_id);
72
73 let new_proj_exprs: Vec<ExprImpl> = proj_exprs
74 .into_iter()
75 .map(|expr| rewriter.rewrite_expr(expr))
76 .collect_vec();
77
78 exprs.extend(new_proj_exprs.clone());
79
80 let mut rewriter =
81 ApplyOnCondRewriterForProjectSet::new(left.schema().len(), new_proj_exprs);
82 let new_on = on.rewrite_expr(&mut rewriter);
83
84 if rewriter.refer_table_function {
85 return None;
88 }
89
90 let new_apply = LogicalApply::create(
91 left,
92 proj_input,
93 join_type,
94 new_on,
95 correlated_id,
96 correlated_indices,
97 max_one_row,
98 );
99
100 let new_project_set = LogicalProjectSet::create(new_apply, exprs);
101
102 let out_col_idxs = (1..=left_schema_len)
105 .chain(vec![0])
106 .chain((left_schema_len + 1)..new_project_set.schema().len());
107 let reorder_project = LogicalProject::with_out_col_idx(new_project_set, out_col_idxs);
108
109 Some(reorder_project.into())
110 }
111}
112
113impl ApplyProjectSetTransposeRule {
114 pub fn create() -> BoxedRule {
115 Box::new(ApplyProjectSetTransposeRule {})
116 }
117}
118
119pub struct ApplyOnCondRewriterForProjectSet {
120 pub left_input_len: usize,
121 pub mapping: Vec<ExprImpl>,
122 pub refer_table_function: bool,
123}
124
125impl ApplyOnCondRewriterForProjectSet {
126 pub fn new(left_input_len: usize, mapping: Vec<ExprImpl>) -> Self {
127 Self {
128 left_input_len,
129 mapping,
130 refer_table_function: false,
131 }
132 }
133}
134
135impl ExprRewriter for ApplyOnCondRewriterForProjectSet {
136 fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
137 if input_ref.index >= self.left_input_len {
138 let expr = self.mapping[input_ref.index() - self.left_input_len - 1].clone();
140 if matches!(expr, ExprImpl::TableFunction(_)) {
141 self.refer_table_function = true;
142 }
143 expr
144 } else {
145 input_ref.into()
146 }
147 }
148}