risingwave_frontend/optimizer/plan_node/
stream_simple_agg.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pretty_xmlish::XmlNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::generic::{self, PlanAggCall};
20use super::stream::prelude::*;
21use super::utils::{Distill, childless_record, plan_node_name};
22use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
26use crate::stream_fragmenter::BuildFragmentGraphState;
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct StreamSimpleAgg {
30    pub base: PlanBase<Stream>,
31    core: generic::Agg<PlanRef>,
32
33    /// The index of `count(*)` in `agg_calls`.
34    row_count_idx: usize,
35
36    // Required by the downstream `RowMerge`,
37    // currently only used by the `approx_percentile`'s two phase plan
38    must_output_per_barrier: bool,
39}
40
41impl StreamSimpleAgg {
42    pub fn new(
43        core: generic::Agg<PlanRef>,
44        row_count_idx: usize,
45        must_output_per_barrier: bool,
46    ) -> Self {
47        assert_eq!(core.agg_calls[row_count_idx], PlanAggCall::count_star());
48
49        let input = core.input.clone();
50        let input_dist = input.distribution();
51        let dist = match input_dist {
52            Distribution::Single => Distribution::Single,
53            _ => panic!(),
54        };
55
56        // Empty because watermark column(s) must be in group key and simple agg have no group key.
57        let watermark_columns = WatermarkColumns::new();
58
59        // Simple agg executor might change the append-only behavior of the stream.
60        let base = PlanBase::new_stream_with_core(
61            &core,
62            dist,
63            false,
64            false,
65            watermark_columns,
66            MonotonicityMap::new(),
67        );
68        StreamSimpleAgg {
69            base,
70            core,
71            row_count_idx,
72            must_output_per_barrier,
73        }
74    }
75
76    pub fn agg_calls(&self) -> &[PlanAggCall] {
77        &self.core.agg_calls
78    }
79}
80
81impl Distill for StreamSimpleAgg {
82    fn distill<'a>(&self) -> XmlNode<'a> {
83        let name = plan_node_name!("StreamSimpleAgg",
84            { "append_only", self.input().append_only() },
85        );
86        let mut vec = self.core.fields_pretty();
87        if self.must_output_per_barrier {
88            vec.push(("must_output_per_barrier", "true".into()));
89        }
90        childless_record(name, vec)
91    }
92}
93
94impl PlanTreeNodeUnary for StreamSimpleAgg {
95    fn input(&self) -> PlanRef {
96        self.core.input.clone()
97    }
98
99    fn clone_with_input(&self, input: PlanRef) -> Self {
100        let logical = generic::Agg {
101            input,
102            ..self.core.clone()
103        };
104        Self::new(logical, self.row_count_idx, self.must_output_per_barrier)
105    }
106}
107impl_plan_tree_node_for_unary! { StreamSimpleAgg }
108
109impl StreamNode for StreamSimpleAgg {
110    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
111        use risingwave_pb::stream_plan::*;
112        let (intermediate_state_table, agg_states, distinct_dedup_tables) =
113            self.core.infer_tables(&self.base, None, None);
114
115        PbNodeBody::SimpleAgg(Box::new(SimpleAggNode {
116            agg_calls: self
117                .agg_calls()
118                .iter()
119                .map(PlanAggCall::to_protobuf)
120                .collect(),
121            distribution_key: self
122                .base
123                .distribution()
124                .dist_column_indices()
125                .iter()
126                .map(|idx| *idx as u32)
127                .collect(),
128            is_append_only: self.input().append_only(),
129            agg_call_states: agg_states
130                .into_iter()
131                .map(|s| s.into_prost(state))
132                .collect(),
133            intermediate_state_table: Some(
134                intermediate_state_table
135                    .with_id(state.gen_table_id_wrapped())
136                    .to_internal_table_prost(),
137            ),
138            distinct_dedup_tables: distinct_dedup_tables
139                .into_iter()
140                .sorted_by_key(|(i, _)| *i)
141                .map(|(key_idx, table)| {
142                    (
143                        key_idx as u32,
144                        table
145                            .with_id(state.gen_table_id_wrapped())
146                            .to_internal_table_prost(),
147                    )
148                })
149                .collect(),
150            row_count_index: self.row_count_idx as u32,
151            version: PbAggNodeVersion::LATEST as _,
152            must_output_per_barrier: self.must_output_per_barrier,
153        }))
154    }
155}
156
157impl ExprRewritable for StreamSimpleAgg {
158    fn has_rewritable_expr(&self) -> bool {
159        true
160    }
161
162    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
163        let mut core = self.core.clone();
164        core.rewrite_exprs(r);
165        Self::new(core, self.row_count_idx, self.must_output_per_barrier).into()
166    }
167}
168
169impl ExprVisitable for StreamSimpleAgg {
170    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
171        self.core.visit_exprs(v);
172    }
173}