risingwave_frontend/optimizer/rule/
over_window_merge_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{BoxedRule, Rule};
16use crate::PlanRef;
17use crate::optimizer::plan_node::{LogicalOverWindow, PlanTreeNodeUnary};
18
19/// Merge chaining `LogicalOverWindow`s with same `PARTITION BY` and `ORDER BY`.
20/// Should be applied after `OverWindowSplitRule`.
21pub struct OverWindowMergeRule;
22
23impl OverWindowMergeRule {
24    pub fn create() -> BoxedRule {
25        Box::new(OverWindowMergeRule)
26    }
27}
28
29impl Rule for OverWindowMergeRule {
30    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
31        let over_window = plan.as_logical_over_window()?;
32        let mut window_functions_rev = over_window
33            .window_functions()
34            .iter()
35            .rev()
36            .cloned()
37            .collect::<Vec<_>>();
38        let partition_key = over_window.partition_key_indices();
39        let order_key = over_window.order_key();
40
41        let mut curr = plan.clone();
42        let mut curr_input = over_window.input();
43        while let Some(input_over_window) = curr_input.as_logical_over_window() {
44            if input_over_window.partition_key_indices() != partition_key
45                || input_over_window.order_key() != order_key
46            {
47                // cannot merge `OverWindow`s with different partition key or order key
48                break;
49            }
50            window_functions_rev.extend(input_over_window.window_functions().iter().rev().cloned());
51            curr = curr_input.clone();
52            curr_input = input_over_window.input();
53        }
54
55        if curr.as_logical_over_window().unwrap() == over_window {
56            // unchanged
57            return None;
58        }
59
60        let window_functions = window_functions_rev.into_iter().rev().collect::<Vec<_>>();
61        Some(LogicalOverWindow::new(window_functions, curr_input).into())
62    }
63}