risingwave_frontend/optimizer/rule/
agg_call_merge_rule.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::{BoxedRule, Rule};
use crate::optimizer::plan_node::generic::Agg;
use crate::optimizer::plan_node::{LogicalProject, PlanTreeNodeUnary};
use crate::PlanRef;

/// Merges duplicated aggregate function calls in `LogicalAgg`, and project them back to the desired schema.
pub struct AggCallMergeRule {}

impl Rule for AggCallMergeRule {
    fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
        let agg = plan.as_logical_agg()?;

        let calls = agg.agg_calls();
        let mut new_calls = Vec::with_capacity(calls.len());
        let mut out_fields = (0..agg.group_key().len()).collect::<Vec<_>>();
        out_fields.extend(calls.iter().map(|call| {
            let pos = new_calls.iter().position(|c| c == call).unwrap_or_else(|| {
                let pos = new_calls.len();
                new_calls.push(call.clone());
                pos
            });
            agg.group_key().len() + pos
        }));

        if calls.len() == new_calls.len() {
            // no change
            None
        } else {
            let new_agg = Agg::new(new_calls, agg.group_key().clone(), agg.input())
                .with_enable_two_phase(agg.core().two_phase_agg_enabled())
                .into();
            Some(LogicalProject::with_out_col_idx(new_agg, out_fields.into_iter()).into())
        }
    }
}

impl AggCallMergeRule {
    pub fn create() -> BoxedRule {
        Box::new(Self {})
    }
}