risingwave_frontend/optimizer/plan_node/
stream_stateless_simple_agg.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::stream_plan::stream_node::PbNodeBody;
16
17use super::generic::{self, PlanAggCall};
18use super::stream::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21    ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef,
22    StreamPlanRef,
23};
24use crate::expr::{ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::{MonotonicityMap, RequiredDist, WatermarkColumns};
27use crate::stream_fragmenter::BuildFragmentGraphState;
28
29/// Streaming stateless simple agg.
30///
31/// Should only be used for stateless agg, including `sum`, `count` and *append-only* `min`/`max`.
32///
33/// The output of `StreamStatelessSimpleAgg` doesn't have pk columns, so the result can only be used
34/// by `StreamSimpleAgg`.
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamStatelessSimpleAgg {
37    pub base: PlanBase<Stream>,
38    core: generic::Agg<PlanRef>,
39}
40
41impl StreamStatelessSimpleAgg {
42    pub fn new(core: generic::Agg<StreamPlanRef>) -> Result<Self> {
43        let input = core.input.clone();
44        reject_upsert_input!(input);
45        let input_dist = input.distribution();
46        debug_assert!(input_dist.satisfies(&RequiredDist::AnyShard));
47
48        let mut watermark_columns = WatermarkColumns::new();
49        // Watermark column(s) must be in group key.
50        for (idx, input_idx) in core.group_key.indices().enumerate() {
51            if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
52                watermark_columns.insert(idx, wtmk_group);
53            }
54        }
55
56        let base = PlanBase::new_stream_with_core(
57            &core,
58            input_dist.clone(),
59            // Stateless simple agg outputs one `Insert` row per epoch to the global phase.
60            StreamKind::AppendOnly,
61            input.emit_on_window_close(),
62            watermark_columns,
63            MonotonicityMap::new(),
64        );
65
66        Ok(StreamStatelessSimpleAgg { base, core })
67    }
68
69    pub fn agg_calls(&self) -> &[PlanAggCall] {
70        &self.core.agg_calls
71    }
72}
73impl_distill_by_unit!(StreamStatelessSimpleAgg, core, "StreamStatelessSimpleAgg");
74
75impl PlanTreeNodeUnary<Stream> for StreamStatelessSimpleAgg {
76    fn input(&self) -> PlanRef {
77        self.core.input.clone()
78    }
79
80    fn clone_with_input(&self, input: PlanRef) -> Self {
81        let mut core = self.core.clone();
82        core.input = input;
83        Self::new(core).unwrap()
84    }
85}
86impl_plan_tree_node_for_unary! { Stream, StreamStatelessSimpleAgg }
87
88impl StreamNode for StreamStatelessSimpleAgg {
89    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
90        use risingwave_pb::stream_plan::*;
91        PbNodeBody::StatelessSimpleAgg(Box::new(SimpleAggNode {
92            agg_calls: self
93                .agg_calls()
94                .iter()
95                .map(PlanAggCall::to_protobuf)
96                .collect(),
97            row_count_index: u32::MAX, // this is not used
98            agg_call_states: vec![],
99            intermediate_state_table: None,
100            is_append_only: self.input().append_only(),
101            distinct_dedup_tables: Default::default(),
102            version: AggNodeVersion::Issue13465 as _,
103            must_output_per_barrier: false, // this is not used
104        }))
105    }
106}
107
108impl ExprRewritable<Stream> for StreamStatelessSimpleAgg {
109    fn has_rewritable_expr(&self) -> bool {
110        true
111    }
112
113    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
114        let mut core = self.core.clone();
115        core.rewrite_exprs(r);
116        Self::new(core).unwrap().into()
117    }
118}
119
120impl ExprVisitable for StreamStatelessSimpleAgg {
121    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
122        self.core.visit_exprs(v);
123    }
124}