risingwave_frontend/optimizer/plan_node/
stream_stateless_simple_agg.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::generic::{self, PlanAggCall};
19use super::stream::prelude::*;
20use super::utils::impl_distill_by_unit;
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
22use crate::expr::{ExprRewriter, ExprVisitor};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::property::{MonotonicityMap, RequiredDist, WatermarkColumns};
25use crate::stream_fragmenter::BuildFragmentGraphState;
26
27/// Streaming stateless simple agg.
28///
29/// Should only be used for stateless agg, including `sum`, `count` and *append-only* `min`/`max`.
30///
31/// The output of `StreamStatelessSimpleAgg` doesn't have pk columns, so the result can only be used
32/// by `StreamSimpleAgg`.
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct StreamStatelessSimpleAgg {
35    pub base: PlanBase<Stream>,
36    core: generic::Agg<PlanRef>,
37}
38
39impl StreamStatelessSimpleAgg {
40    pub fn new(core: generic::Agg<PlanRef>) -> Self {
41        let input = core.input.clone();
42        let input_dist = input.distribution();
43        debug_assert!(input_dist.satisfies(&RequiredDist::AnyShard));
44
45        let mut watermark_columns = WatermarkColumns::new();
46        // Watermark column(s) must be in group key.
47        for (idx, input_idx) in core.group_key.indices().enumerate() {
48            if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
49                watermark_columns.insert(idx, wtmk_group);
50            }
51        }
52
53        let base = PlanBase::new_stream_with_core(
54            &core,
55            input_dist.clone(),
56            input.append_only(),
57            input.emit_on_window_close(),
58            watermark_columns,
59            MonotonicityMap::new(),
60        );
61        StreamStatelessSimpleAgg { base, core }
62    }
63
64    pub fn agg_calls(&self) -> &[PlanAggCall] {
65        &self.core.agg_calls
66    }
67}
68impl_distill_by_unit!(StreamStatelessSimpleAgg, core, "StreamStatelessSimpleAgg");
69
70impl PlanTreeNodeUnary for StreamStatelessSimpleAgg {
71    fn input(&self) -> PlanRef {
72        self.core.input.clone()
73    }
74
75    fn clone_with_input(&self, input: PlanRef) -> Self {
76        let mut core = self.core.clone();
77        core.input = input;
78        Self::new(core)
79    }
80}
81impl_plan_tree_node_for_unary! { StreamStatelessSimpleAgg }
82
83impl StreamNode for StreamStatelessSimpleAgg {
84    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
85        use risingwave_pb::stream_plan::*;
86        PbNodeBody::StatelessSimpleAgg(Box::new(SimpleAggNode {
87            agg_calls: self
88                .agg_calls()
89                .iter()
90                .map(PlanAggCall::to_protobuf)
91                .collect(),
92            row_count_index: u32::MAX, // this is not used
93            distribution_key: self
94                .distribution()
95                .dist_column_indices()
96                .iter()
97                .map(|idx| *idx as u32)
98                .collect_vec(),
99            agg_call_states: vec![],
100            intermediate_state_table: None,
101            is_append_only: self.input().append_only(),
102            distinct_dedup_tables: Default::default(),
103            version: AggNodeVersion::Issue13465 as _,
104            must_output_per_barrier: false, // this is not used
105        }))
106    }
107}
108
109impl ExprRewritable for StreamStatelessSimpleAgg {
110    fn has_rewritable_expr(&self) -> bool {
111        true
112    }
113
114    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
115        let mut core = self.core.clone();
116        core.rewrite_exprs(r);
117        Self::new(core).into()
118    }
119}
120
121impl ExprVisitable for StreamStatelessSimpleAgg {
122    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
123        self.core.visit_exprs(v);
124    }
125}