risingwave_frontend/optimizer/plan_node/
stream_hash_agg.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pretty_xmlish::XmlNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::generic::{self, PlanAggCall};
20use super::stream::prelude::*;
21use super::utils::{Distill, childless_record, plan_node_name, watermark_pretty};
22use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamPlanRef as PlanRef, TryToStreamPb};
23use crate::error::Result;
24use crate::expr::{ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
27use crate::scheduler::SchedulerResult;
28use crate::stream_fragmenter::BuildFragmentGraphState;
29use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, IndexSet};
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct StreamHashAgg {
33    pub base: PlanBase<Stream>,
34    core: generic::Agg<PlanRef>,
35
36    /// An optional column index which is the vnode of each row computed by the input's consistent
37    /// hash distribution.
38    vnode_col_idx: Option<usize>,
39
40    /// The index of `count(*)` in `agg_calls`.
41    row_count_idx: usize,
42
43    /// Whether to emit output only when the window is closed by watermark.
44    emit_on_window_close: bool,
45
46    /// The watermark column that Emit-On-Window-Close behavior is based on.
47    window_col_idx: Option<usize>,
48}
49
50impl StreamHashAgg {
51    pub fn new(
52        core: generic::Agg<PlanRef>,
53        vnode_col_idx: Option<usize>,
54        row_count_idx: usize,
55    ) -> Result<Self> {
56        Self::new_with_eowc(core, vnode_col_idx, row_count_idx, false)
57    }
58
59    pub fn new_with_eowc(
60        core: generic::Agg<PlanRef>,
61        vnode_col_idx: Option<usize>,
62        row_count_idx: usize,
63        emit_on_window_close: bool,
64    ) -> Result<Self> {
65        assert_eq!(core.agg_calls[row_count_idx], PlanAggCall::count_star());
66        reject_upsert_input!(core.input);
67
68        let input = core.input.clone();
69        let input_dist = input.distribution();
70        let dist = core
71            .i2o_col_mapping()
72            .rewrite_provided_distribution(input_dist);
73
74        let mut watermark_columns = WatermarkColumns::new();
75        let mut window_col_idx = None;
76        let mapping = core.i2o_col_mapping();
77        if emit_on_window_close {
78            let window_col = core
79                .eowc_window_column(input.watermark_columns())
80                .expect("checked in `to_eowc_version`");
81            // EOWC HashAgg only propagate one watermark column, the window column.
82            watermark_columns.insert(
83                mapping.map(window_col),
84                input.watermark_columns().get_group(window_col).unwrap(),
85            );
86            window_col_idx = Some(window_col);
87        } else {
88            for idx in core.group_key.indices() {
89                if let Some(wtmk_group) = input.watermark_columns().get_group(idx) {
90                    // Non-EOWC `StreamHashAgg` simply forwards the watermark messages from the input.
91                    watermark_columns.insert(mapping.map(idx), wtmk_group);
92                }
93            }
94        }
95
96        // Hash agg executor might change the append-only behavior of the stream.
97        let base = PlanBase::new_stream_with_core(
98            &core,
99            dist,
100            if emit_on_window_close {
101                // in EOWC mode, we produce append only output
102                StreamKind::AppendOnly
103            } else {
104                StreamKind::Retract
105            },
106            emit_on_window_close,
107            watermark_columns,
108            MonotonicityMap::new(), // TODO: derive monotonicity
109        );
110
111        Ok(StreamHashAgg {
112            base,
113            core,
114            vnode_col_idx,
115            row_count_idx,
116            emit_on_window_close,
117            window_col_idx,
118        })
119    }
120
121    pub fn agg_calls(&self) -> &[PlanAggCall] {
122        &self.core.agg_calls
123    }
124
125    pub fn group_key(&self) -> &IndexSet {
126        &self.core.group_key
127    }
128
129    pub(crate) fn i2o_col_mapping(&self) -> ColIndexMapping {
130        self.core.i2o_col_mapping()
131    }
132
133    // TODO(rc): It'll be better to force creation of EOWC version through `new`, especially when we
134    // optimize for 2-phase EOWC aggregation later.
135    pub fn to_eowc_version(&self) -> Result<PlanRef> {
136        let input = self.input();
137
138        // check whether the group by columns are valid
139        let _ = self.core.eowc_window_column(input.watermark_columns())?;
140
141        Ok(Self::new_with_eowc(
142            self.core.clone(),
143            self.vnode_col_idx,
144            self.row_count_idx,
145            true,
146        )?
147        .into())
148    }
149}
150
151impl Distill for StreamHashAgg {
152    fn distill<'a>(&self) -> XmlNode<'a> {
153        let mut vec = self.core.fields_pretty();
154        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
155            vec.push(("output_watermarks", ow));
156        }
157        childless_record(
158            plan_node_name!(
159                "StreamHashAgg",
160                { "append_only", self.input().append_only() },
161                { "eowc", self.emit_on_window_close },
162            ),
163            vec,
164        )
165    }
166}
167
168impl PlanTreeNodeUnary<Stream> for StreamHashAgg {
169    fn input(&self) -> PlanRef {
170        self.core.input.clone()
171    }
172
173    fn clone_with_input(&self, input: PlanRef) -> Self {
174        let logical = generic::Agg {
175            input,
176            ..self.core.clone()
177        };
178
179        Self::new_with_eowc(
180            logical,
181            self.vnode_col_idx,
182            self.row_count_idx,
183            self.emit_on_window_close,
184        )
185        .unwrap()
186    }
187}
188impl_plan_tree_node_for_unary! { Stream, StreamHashAgg }
189
190impl TryToStreamPb for StreamHashAgg {
191    fn try_to_stream_prost_body(
192        &self,
193        state: &mut BuildFragmentGraphState,
194    ) -> SchedulerResult<PbNodeBody> {
195        use risingwave_pb::stream_plan::*;
196        let (intermediate_state_table, agg_states, distinct_dedup_tables) =
197            self.core
198                .infer_tables(&self.base, self.vnode_col_idx, self.window_col_idx);
199
200        let agg_calls = self
201            .agg_calls()
202            .iter()
203            .map(|call| call.to_protobuf_checked_pure(self.input().stream_kind().is_retract()))
204            .collect::<crate::error::Result<Vec<_>>>()?;
205
206        Ok(PbNodeBody::HashAgg(Box::new(HashAggNode {
207            group_key: self.group_key().to_vec_as_u32(),
208            agg_calls,
209
210            is_append_only: self.input().append_only(),
211            agg_call_states: agg_states
212                .into_iter()
213                .map(|s| s.into_prost(state))
214                .collect(),
215            intermediate_state_table: Some(
216                intermediate_state_table
217                    .with_id(state.gen_table_id_wrapped())
218                    .to_internal_table_prost(),
219            ),
220            distinct_dedup_tables: distinct_dedup_tables
221                .into_iter()
222                .sorted_by_key(|(i, _)| *i)
223                .map(|(key_idx, table)| {
224                    (
225                        key_idx as u32,
226                        table
227                            .with_id(state.gen_table_id_wrapped())
228                            .to_internal_table_prost(),
229                    )
230                })
231                .collect(),
232            row_count_index: self.row_count_idx as u32,
233            emit_on_window_close: self.base.emit_on_window_close(),
234            version: PbAggNodeVersion::LATEST as _,
235        })))
236    }
237}
238
239impl ExprRewritable<Stream> for StreamHashAgg {
240    fn has_rewritable_expr(&self) -> bool {
241        true
242    }
243
244    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
245        let mut core = self.core.clone();
246        core.rewrite_exprs(r);
247        Self::new_with_eowc(
248            core,
249            self.vnode_col_idx,
250            self.row_count_idx,
251            self.emit_on_window_close,
252        )
253        .unwrap()
254        .into()
255    }
256}
257
258impl ExprVisitable for StreamHashAgg {
259    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
260        self.core.visit_exprs(v);
261    }
262}