risingwave_frontend/optimizer/rule/
over_window_merge_rule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::prelude::{PlanRef, *};
16use crate::optimizer::plan_node::{LogicalOverWindow, PlanTreeNodeUnary};
17
18/// Merge chaining `LogicalOverWindow`s with same `PARTITION BY` and `ORDER BY`.
19/// Should be applied after `OverWindowSplitRule`.
20pub struct OverWindowMergeRule;
21
22impl OverWindowMergeRule {
23    pub fn create() -> BoxedRule {
24        Box::new(OverWindowMergeRule)
25    }
26}
27
28impl Rule<Logical> for OverWindowMergeRule {
29    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
30        let over_window = plan.as_logical_over_window()?;
31        let mut window_functions_rev = over_window
32            .window_functions()
33            .iter()
34            .rev()
35            .cloned()
36            .collect::<Vec<_>>();
37        let partition_key = over_window.partition_key_indices();
38        let order_key = over_window.order_key();
39
40        let mut curr = plan.clone();
41        let mut curr_input = over_window.input();
42        while let Some(input_over_window) = curr_input.as_logical_over_window() {
43            if input_over_window.partition_key_indices() != partition_key
44                || input_over_window.order_key() != order_key
45            {
46                // cannot merge `OverWindow`s with different partition key or order key
47                break;
48            }
49            window_functions_rev.extend(input_over_window.window_functions().iter().rev().cloned());
50            curr = curr_input.clone();
51            curr_input = input_over_window.input();
52        }
53
54        if curr.as_logical_over_window().unwrap() == over_window {
55            // unchanged
56            return None;
57        }
58
59        let window_functions = window_functions_rev.into_iter().rev().collect::<Vec<_>>();
60        Some(LogicalOverWindow::new(window_functions, curr_input).into())
61    }
62}