risingwave_frontend/optimizer/rule/
apply_expand_transpose_rule.rs1use fixedbitset::FixedBitSet;
16use itertools::Itertools;
17use risingwave_pb::plan_common::JoinType;
18
19use super::{BoxedRule, Rule};
20use crate::optimizer::PlanRef;
21use crate::optimizer::plan_node::generic::GenericPlanRef;
22use crate::optimizer::plan_node::{LogicalApply, LogicalExpand, LogicalFilter, LogicalProject};
23use crate::utils::Condition;
24
25pub struct ApplyExpandTransposeRule {}
49impl Rule for ApplyExpandTransposeRule {
50 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
51 let apply: &LogicalApply = plan.as_logical_apply()?;
52 let (left, right, on, join_type, correlated_id, correlated_indices, max_one_row) =
53 apply.clone().decompose();
54 assert_eq!(join_type, JoinType::Inner);
55 let logical_expand: &LogicalExpand = right.as_logical_expand()?;
56 let (expand_input, mut column_subsets) = logical_expand.clone().decompose();
57
58 let apply_left_len = left.schema().len();
59
60 if max_one_row {
61 return None;
62 }
63
64 let new_apply: PlanRef = LogicalApply::create(
65 left,
66 expand_input,
67 JoinType::Inner,
68 Condition::true_cond(),
69 correlated_id,
70 correlated_indices,
71 false,
72 );
73
74 let new_apply_schema_len = new_apply.schema().len();
75
76 let new_expand = {
77 column_subsets.iter_mut().for_each(|subset| {
80 subset.iter_mut().for_each(|i| *i += apply_left_len);
81 *subset = (0..apply_left_len).chain(subset.drain(..)).collect_vec();
82 });
83 LogicalExpand::new(new_apply, column_subsets)
84 };
85
86 let mut fixed_bit_set = FixedBitSet::with_capacity(new_expand.base.schema().len());
90 fixed_bit_set.toggle_range(..);
91 fixed_bit_set.toggle_range(new_apply_schema_len..(new_apply_schema_len + apply_left_len));
92 let project = LogicalProject::with_out_fields(new_expand.into(), &fixed_bit_set);
93
94 let filter = LogicalFilter::create(project.into(), on);
95 Some(filter)
96 }
97}
98
99impl ApplyExpandTransposeRule {
100 pub fn create() -> BoxedRule {
101 Box::new(ApplyExpandTransposeRule {})
102 }
103}