risingwave_frontend/optimizer/rule/
apply_over_window_transpose_rule.rs1use risingwave_pb::plan_common::JoinType;
16
17use super::prelude::{PlanRef, *};
18use crate::expr::InputRef;
19use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalOverWindow};
20use crate::utils::Condition;
21
22pub struct ApplyOverWindowTransposeRule {}
44impl Rule<Logical> for ApplyOverWindowTransposeRule {
45 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
46 let apply: &LogicalApply = plan.as_logical_apply()?;
47 let (left, right, on, join_type, correlated_id, correlated_indices, max_one_row) =
48 apply.clone().decompose();
49 assert_eq!(join_type, JoinType::Inner);
50 let over_window: &LogicalOverWindow = right.as_logical_over_window()?;
51 let (window_input, mut window_functions) = over_window.clone().decompose();
52
53 if max_one_row {
54 return None;
55 }
56
57 let apply_left_len = left.schema().len();
58 let apply_left_schema = left.schema().clone();
59
60 let new_apply = LogicalApply::create(
61 left,
62 window_input,
63 JoinType::Inner,
64 Condition::true_cond(),
65 correlated_id,
66 correlated_indices,
67 false,
68 );
69
70 let new_over_window = {
71 window_functions.iter_mut().for_each(|func| {
74 func.args
75 .iter_mut()
76 .for_each(|arg| arg.shift_with_offset(apply_left_len as isize));
77 func.order_by
78 .iter_mut()
79 .for_each(|c| c.column_index += apply_left_len);
80 func.partition_by
81 .iter_mut()
82 .for_each(|x| x.shift_with_offset(apply_left_len as isize));
83 func.partition_by = (0..apply_left_len)
85 .map(|i| InputRef::new(i, apply_left_schema.fields[i].data_type()))
86 .chain(func.partition_by.drain(..))
87 .collect();
88 });
89
90 LogicalOverWindow::new(window_functions, new_apply)
91 };
92
93 let filter = LogicalFilter::create(new_over_window.into(), on);
94 Some(filter)
95 }
96}
97
98impl ApplyOverWindowTransposeRule {
99 pub fn create() -> BoxedRule {
100 Box::new(ApplyOverWindowTransposeRule {})
101 }
102}