risingwave_frontend/optimizer/plan_node/
stream_simple_agg.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pretty_xmlish::XmlNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::generic::{self, PlanAggCall};
20use super::stream::prelude::*;
21use super::utils::{Distill, childless_record, plan_node_name};
22use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamPlanRef as PlanRef, TryToStreamPb};
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
26use crate::scheduler::SchedulerResult;
27use crate::stream_fragmenter::BuildFragmentGraphState;
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct StreamSimpleAgg {
31    pub base: PlanBase<Stream>,
32    core: generic::Agg<PlanRef>,
33
34    /// The index of `count(*)` in `agg_calls`.
35    row_count_idx: usize,
36
37    // Required by the downstream `RowMerge`,
38    // currently only used by the `approx_percentile`'s two phase plan
39    must_output_per_barrier: bool,
40}
41
42impl StreamSimpleAgg {
43    pub fn new(
44        core: generic::Agg<PlanRef>,
45        row_count_idx: usize,
46        must_output_per_barrier: bool,
47    ) -> Result<Self> {
48        assert_eq!(core.agg_calls[row_count_idx], PlanAggCall::count_star());
49        reject_upsert_input!(core.input);
50
51        let input = core.input.clone();
52        let input_dist = input.distribution();
53        let dist = match input_dist {
54            Distribution::Single => Distribution::Single,
55            _ => panic!(),
56        };
57
58        // Empty because watermark column(s) must be in group key and simple agg have no group key.
59        let watermark_columns = WatermarkColumns::new();
60
61        // Simple agg executor might change the append-only behavior of the stream.
62        let base = PlanBase::new_stream_with_core(
63            &core,
64            dist,
65            StreamKind::Retract,
66            false,
67            watermark_columns,
68            MonotonicityMap::new(),
69        );
70
71        Ok(StreamSimpleAgg {
72            base,
73            core,
74            row_count_idx,
75            must_output_per_barrier,
76        })
77    }
78
79    pub fn agg_calls(&self) -> &[PlanAggCall] {
80        &self.core.agg_calls
81    }
82}
83
84impl Distill for StreamSimpleAgg {
85    fn distill<'a>(&self) -> XmlNode<'a> {
86        let name = plan_node_name!("StreamSimpleAgg",
87            { "append_only", self.input().append_only() },
88        );
89        let mut vec = self.core.fields_pretty();
90        if self.must_output_per_barrier {
91            vec.push(("must_output_per_barrier", "true".into()));
92        }
93        childless_record(name, vec)
94    }
95}
96
97impl PlanTreeNodeUnary<Stream> for StreamSimpleAgg {
98    fn input(&self) -> PlanRef {
99        self.core.input.clone()
100    }
101
102    fn clone_with_input(&self, input: PlanRef) -> Self {
103        let logical = generic::Agg {
104            input,
105            ..self.core.clone()
106        };
107        Self::new(logical, self.row_count_idx, self.must_output_per_barrier).unwrap()
108    }
109}
110impl_plan_tree_node_for_unary! { Stream, StreamSimpleAgg }
111
112impl TryToStreamPb for StreamSimpleAgg {
113    fn try_to_stream_prost_body(
114        &self,
115        state: &mut BuildFragmentGraphState,
116    ) -> SchedulerResult<PbNodeBody> {
117        use risingwave_pb::stream_plan::*;
118        let (intermediate_state_table, agg_states, distinct_dedup_tables) =
119            self.core.infer_tables(&self.base, None, None);
120
121        let append_only = self.input().append_only();
122        let agg_calls = self
123            .agg_calls()
124            .iter()
125            .map(|call| call.to_protobuf_checked_pure(self.input().stream_kind().is_retract()))
126            .collect::<crate::error::Result<Vec<_>>>()?;
127
128        Ok(PbNodeBody::SimpleAgg(Box::new(SimpleAggNode {
129            agg_calls,
130            is_append_only: append_only,
131            agg_call_states: agg_states
132                .into_iter()
133                .map(|s| s.into_prost(state))
134                .collect(),
135            intermediate_state_table: Some(
136                intermediate_state_table
137                    .with_id(state.gen_table_id_wrapped())
138                    .to_internal_table_prost(),
139            ),
140            distinct_dedup_tables: distinct_dedup_tables
141                .into_iter()
142                .sorted_by_key(|(i, _)| *i)
143                .map(|(key_idx, table)| {
144                    (
145                        key_idx as u32,
146                        table
147                            .with_id(state.gen_table_id_wrapped())
148                            .to_internal_table_prost(),
149                    )
150                })
151                .collect(),
152            row_count_index: self.row_count_idx as u32,
153            version: PbAggNodeVersion::LATEST as _,
154            must_output_per_barrier: self.must_output_per_barrier,
155        })))
156    }
157}
158
159impl ExprRewritable<Stream> for StreamSimpleAgg {
160    fn has_rewritable_expr(&self) -> bool {
161        true
162    }
163
164    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
165        let mut core = self.core.clone();
166        core.rewrite_exprs(r);
167        Self::new(core, self.row_count_idx, self.must_output_per_barrier)
168            .unwrap()
169            .into()
170    }
171}
172
173impl ExprVisitable for StreamSimpleAgg {
174    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
175        self.core.visit_exprs(v);
176    }
177}