risingwave_frontend/optimizer/rule/
over_window_merge_rule.rs1use super::{BoxedRule, Rule};
16use crate::PlanRef;
17use crate::optimizer::plan_node::{LogicalOverWindow, PlanTreeNodeUnary};
18
19pub struct OverWindowMergeRule;
22
23impl OverWindowMergeRule {
24 pub fn create() -> BoxedRule {
25 Box::new(OverWindowMergeRule)
26 }
27}
28
29impl Rule for OverWindowMergeRule {
30 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
31 let over_window = plan.as_logical_over_window()?;
32 let mut window_functions_rev = over_window
33 .window_functions()
34 .iter()
35 .rev()
36 .cloned()
37 .collect::<Vec<_>>();
38 let partition_key = over_window.partition_key_indices();
39 let order_key = over_window.order_key();
40
41 let mut curr = plan.clone();
42 let mut curr_input = over_window.input();
43 while let Some(input_over_window) = curr_input.as_logical_over_window() {
44 if input_over_window.partition_key_indices() != partition_key
45 || input_over_window.order_key() != order_key
46 {
47 break;
49 }
50 window_functions_rev.extend(input_over_window.window_functions().iter().rev().cloned());
51 curr = curr_input.clone();
52 curr_input = input_over_window.input();
53 }
54
55 if curr.as_logical_over_window().unwrap() == over_window {
56 return None;
58 }
59
60 let window_functions = window_functions_rev.into_iter().rev().collect::<Vec<_>>();
61 Some(LogicalOverWindow::new(window_functions, curr_input).into())
62 }
63}