risingwave_frontend/optimizer/rule/
over_window_merge_rule.rs1use super::prelude::{PlanRef, *};
16use crate::optimizer::plan_node::{LogicalOverWindow, PlanTreeNodeUnary};
17
18pub struct OverWindowMergeRule;
21
22impl OverWindowMergeRule {
23 pub fn create() -> BoxedRule {
24 Box::new(OverWindowMergeRule)
25 }
26}
27
28impl Rule<Logical> for OverWindowMergeRule {
29 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
30 let over_window = plan.as_logical_over_window()?;
31 let mut window_functions_rev = over_window
32 .window_functions()
33 .iter()
34 .rev()
35 .cloned()
36 .collect::<Vec<_>>();
37 let partition_key = over_window.partition_key_indices();
38 let order_key = over_window.order_key();
39
40 let mut curr = plan.clone();
41 let mut curr_input = over_window.input();
42 while let Some(input_over_window) = curr_input.as_logical_over_window() {
43 if input_over_window.partition_key_indices() != partition_key
44 || input_over_window.order_key() != order_key
45 {
46 break;
48 }
49 window_functions_rev.extend(input_over_window.window_functions().iter().rev().cloned());
50 curr = curr_input.clone();
51 curr_input = input_over_window.input();
52 }
53
54 if curr.as_logical_over_window().unwrap() == over_window {
55 return None;
57 }
58
59 let window_functions = window_functions_rev.into_iter().rev().collect::<Vec<_>>();
60 Some(LogicalOverWindow::new(window_functions, curr_input).into())
61 }
62}