risingwave_frontend/optimizer/rule/
apply_project_set_transpose_rule.rsuse itertools::Itertools;
use risingwave_pb::plan_common::JoinType;
use super::{ApplyOffsetRewriter, BoxedRule, Rule};
use crate::expr::{ExprImpl, ExprRewriter, InputRef};
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalApply, LogicalProject, LogicalProjectSet};
use crate::optimizer::PlanRef;
pub struct ApplyProjectSetTransposeRule {}
impl Rule for ApplyProjectSetTransposeRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let apply: &LogicalApply = plan.as_logical_apply()?;
let (left, right, on, join_type, correlated_id, correlated_indices, max_one_row) =
apply.clone().decompose();
let project_set: &LogicalProjectSet = right.as_logical_project_set()?;
let left_schema_len = left.schema().len();
assert_eq!(join_type, JoinType::Inner);
let mut exprs: Vec<ExprImpl> = left
.schema()
.data_types()
.into_iter()
.enumerate()
.map(|(index, data_type)| InputRef::new(index, data_type).into())
.collect();
let (proj_exprs, proj_input) = project_set.clone().decompose();
let mut rewriter =
ApplyOffsetRewriter::new(left.schema().len(), &correlated_indices, correlated_id);
let new_proj_exprs: Vec<ExprImpl> = proj_exprs
.into_iter()
.map(|expr| rewriter.rewrite_expr(expr))
.collect_vec();
exprs.extend(new_proj_exprs.clone());
let mut rewriter =
ApplyOnCondRewriterForProjectSet::new(left.schema().len(), new_proj_exprs);
let new_on = on.rewrite_expr(&mut rewriter);
if rewriter.refer_table_function {
return None;
}
let new_apply = LogicalApply::create(
left,
proj_input,
join_type,
new_on,
correlated_id,
correlated_indices,
max_one_row,
);
let new_project_set = LogicalProjectSet::create(new_apply, exprs);
let out_col_idxs = (1..=left_schema_len)
.chain(vec![0])
.chain((left_schema_len + 1)..new_project_set.schema().len());
let reorder_project = LogicalProject::with_out_col_idx(new_project_set, out_col_idxs);
Some(reorder_project.into())
}
}
impl ApplyProjectSetTransposeRule {
pub fn create() -> BoxedRule {
Box::new(ApplyProjectSetTransposeRule {})
}
}
pub struct ApplyOnCondRewriterForProjectSet {
pub left_input_len: usize,
pub mapping: Vec<ExprImpl>,
pub refer_table_function: bool,
}
impl ApplyOnCondRewriterForProjectSet {
pub fn new(left_input_len: usize, mapping: Vec<ExprImpl>) -> Self {
Self {
left_input_len,
mapping,
refer_table_function: false,
}
}
}
impl ExprRewriter for ApplyOnCondRewriterForProjectSet {
fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
if input_ref.index >= self.left_input_len {
let expr = self.mapping[input_ref.index() - self.left_input_len - 1].clone();
if matches!(expr, ExprImpl::TableFunction(_)) {
self.refer_table_function = true;
}
expr
} else {
input_ref.into()
}
}
}