risingwave_frontend/optimizer/rule/
apply_expand_transpose_rule.rs1use fixedbitset::FixedBitSet;
16use itertools::Itertools;
17use risingwave_pb::plan_common::JoinType;
18
19use super::prelude::{PlanRef, *};
20use crate::optimizer::plan_node::generic::GenericPlanRef;
21use crate::optimizer::plan_node::{LogicalApply, LogicalExpand, LogicalFilter, LogicalProject};
22use crate::utils::Condition;
23
24pub struct ApplyExpandTransposeRule {}
48impl Rule<Logical> for ApplyExpandTransposeRule {
49 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
50 let apply: &LogicalApply = plan.as_logical_apply()?;
51 let (left, right, on, join_type, correlated_id, correlated_indices, max_one_row) =
52 apply.clone().decompose();
53 assert_eq!(join_type, JoinType::Inner);
54 let logical_expand: &LogicalExpand = right.as_logical_expand()?;
55 let (expand_input, mut column_subsets) = logical_expand.clone().decompose();
56
57 let apply_left_len = left.schema().len();
58
59 if max_one_row {
60 return None;
61 }
62
63 let new_apply: PlanRef = LogicalApply::create(
64 left,
65 expand_input,
66 JoinType::Inner,
67 Condition::true_cond(),
68 correlated_id,
69 correlated_indices,
70 false,
71 );
72
73 let new_apply_schema_len = new_apply.schema().len();
74
75 let new_expand = {
76 column_subsets.iter_mut().for_each(|subset| {
79 subset.iter_mut().for_each(|i| *i += apply_left_len);
80 *subset = (0..apply_left_len).chain(subset.drain(..)).collect_vec();
81 });
82 LogicalExpand::new(new_apply, column_subsets)
83 };
84
85 let mut fixed_bit_set = FixedBitSet::with_capacity(new_expand.base.schema().len());
89 fixed_bit_set.toggle_range(..);
90 fixed_bit_set.toggle_range(new_apply_schema_len..(new_apply_schema_len + apply_left_len));
91 let project = LogicalProject::with_out_fields(new_expand.into(), &fixed_bit_set);
92
93 let filter = LogicalFilter::create(project.into(), on);
94 Some(filter)
95 }
96}
97
98impl ApplyExpandTransposeRule {
99 pub fn create() -> BoxedRule {
100 Box::new(ApplyExpandTransposeRule {})
101 }
102}