risingwave_frontend/optimizer/rule/
agg_call_merge_rule.rs1use super::prelude::{PlanRef, *};
16use crate::optimizer::plan_node::generic::Agg;
17use crate::optimizer::plan_node::{LogicalProject, PlanTreeNodeUnary};
18
19pub struct AggCallMergeRule {}
21
22impl Rule<Logical> for AggCallMergeRule {
23 fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
24 let agg = plan.as_logical_agg()?;
25
26 let calls = agg.agg_calls();
27 let mut new_calls = Vec::with_capacity(calls.len());
28 let mut out_fields = (0..agg.group_key().len()).collect::<Vec<_>>();
29 out_fields.extend(calls.iter().map(|call| {
30 let pos = new_calls.iter().position(|c| c == call).unwrap_or_else(|| {
31 let pos = new_calls.len();
32 new_calls.push(call.clone());
33 pos
34 });
35 agg.group_key().len() + pos
36 }));
37
38 if calls.len() == new_calls.len() {
39 None
41 } else {
42 let new_agg = Agg::new(new_calls, agg.group_key().clone(), agg.input())
43 .with_enable_two_phase(agg.core().two_phase_agg_enabled())
44 .into();
45 Some(LogicalProject::with_out_col_idx(new_agg, out_fields.into_iter()).into())
46 }
47 }
48}
49
50impl AggCallMergeRule {
51 pub fn create() -> BoxedRule {
52 Box::new(Self {})
53 }
54}