risingwave_frontend/optimizer/plan_node/
stream_stateless_simple_agg.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::stream_plan::stream_node::PbNodeBody;
16
17use super::generic::{self, PlanAggCall};
18use super::stream::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21    ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamPlanRef as PlanRef, StreamPlanRef,
22    TryToStreamPb,
23};
24use crate::expr::{ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::{MonotonicityMap, RequiredDist, WatermarkColumns};
27use crate::scheduler::SchedulerResult;
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30/// Streaming stateless simple agg.
31///
32/// Should only be used for stateless agg, including `sum`, `count` and *append-only* `min`/`max`.
33///
34/// The output of `StreamStatelessSimpleAgg` doesn't have pk columns, so the result can only be used
35/// by `StreamSimpleAgg`.
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct StreamStatelessSimpleAgg {
38    pub base: PlanBase<Stream>,
39    core: generic::Agg<PlanRef>,
40}
41
42impl StreamStatelessSimpleAgg {
43    pub fn new(core: generic::Agg<StreamPlanRef>) -> Result<Self> {
44        let input = core.input.clone();
45        reject_upsert_input!(input);
46        let input_dist = input.distribution();
47        debug_assert!(input_dist.satisfies(&RequiredDist::AnyShard));
48
49        let mut watermark_columns = WatermarkColumns::new();
50        // Watermark column(s) must be in group key.
51        for (idx, input_idx) in core.group_key.indices().enumerate() {
52            if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
53                watermark_columns.insert(idx, wtmk_group);
54            }
55        }
56
57        let base = PlanBase::new_stream_with_core(
58            &core,
59            input_dist.clone(),
60            // Stateless simple agg outputs one `Insert` row per epoch to the global phase.
61            StreamKind::AppendOnly,
62            input.emit_on_window_close(),
63            watermark_columns,
64            MonotonicityMap::new(),
65        );
66
67        Ok(StreamStatelessSimpleAgg { base, core })
68    }
69
70    pub fn agg_calls(&self) -> &[PlanAggCall] {
71        &self.core.agg_calls
72    }
73}
74impl_distill_by_unit!(StreamStatelessSimpleAgg, core, "StreamStatelessSimpleAgg");
75
76impl PlanTreeNodeUnary<Stream> for StreamStatelessSimpleAgg {
77    fn input(&self) -> PlanRef {
78        self.core.input.clone()
79    }
80
81    fn clone_with_input(&self, input: PlanRef) -> Self {
82        let mut core = self.core.clone();
83        core.input = input;
84        Self::new(core).unwrap()
85    }
86}
87impl_plan_tree_node_for_unary! { Stream, StreamStatelessSimpleAgg }
88
89impl TryToStreamPb for StreamStatelessSimpleAgg {
90    fn try_to_stream_prost_body(
91        &self,
92        _state: &mut BuildFragmentGraphState,
93    ) -> SchedulerResult<PbNodeBody> {
94        use risingwave_pb::stream_plan::*;
95        let append_only = self.input().append_only();
96        let agg_calls = self
97            .agg_calls()
98            .iter()
99            .map(|call| call.to_protobuf_checked_pure(self.input().stream_kind().is_retract()))
100            .collect::<crate::error::Result<Vec<_>>>()?;
101        Ok(PbNodeBody::StatelessSimpleAgg(Box::new(SimpleAggNode {
102            agg_calls,
103            row_count_index: u32::MAX, // this is not used
104            agg_call_states: vec![],
105            intermediate_state_table: None,
106            is_append_only: append_only,
107            distinct_dedup_tables: Default::default(),
108            version: AggNodeVersion::Issue13465 as _,
109            must_output_per_barrier: false, // this is not used
110        })))
111    }
112}
113
114impl ExprRewritable<Stream> for StreamStatelessSimpleAgg {
115    fn has_rewritable_expr(&self) -> bool {
116        true
117    }
118
119    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
120        let mut core = self.core.clone();
121        core.rewrite_exprs(r);
122        Self::new(core).unwrap().into()
123    }
124}
125
126impl ExprVisitable for StreamStatelessSimpleAgg {
127    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
128        self.core.visit_exprs(v);
129    }
130}