risingwave_frontend/optimizer/plan_node/
stream_hash_agg.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pretty_xmlish::XmlNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::generic::{self, PlanAggCall};
20use super::stream::prelude::*;
21use super::utils::{Distill, childless_record, plan_node_name, watermark_pretty};
22use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
23use crate::error::Result;
24use crate::expr::{ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
27use crate::stream_fragmenter::BuildFragmentGraphState;
28use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, IndexSet};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamHashAgg {
32    pub base: PlanBase<Stream>,
33    core: generic::Agg<PlanRef>,
34
35    /// An optional column index which is the vnode of each row computed by the input's consistent
36    /// hash distribution.
37    vnode_col_idx: Option<usize>,
38
39    /// The index of `count(*)` in `agg_calls`.
40    row_count_idx: usize,
41
42    /// Whether to emit output only when the window is closed by watermark.
43    emit_on_window_close: bool,
44
45    /// The watermark column that Emit-On-Window-Close behavior is based on.
46    window_col_idx: Option<usize>,
47}
48
49impl StreamHashAgg {
50    pub fn new(
51        core: generic::Agg<PlanRef>,
52        vnode_col_idx: Option<usize>,
53        row_count_idx: usize,
54    ) -> Self {
55        Self::new_with_eowc(core, vnode_col_idx, row_count_idx, false)
56    }
57
58    pub fn new_with_eowc(
59        core: generic::Agg<PlanRef>,
60        vnode_col_idx: Option<usize>,
61        row_count_idx: usize,
62        emit_on_window_close: bool,
63    ) -> Self {
64        assert_eq!(core.agg_calls[row_count_idx], PlanAggCall::count_star());
65
66        let input = core.input.clone();
67        let input_dist = input.distribution();
68        let dist = core
69            .i2o_col_mapping()
70            .rewrite_provided_distribution(input_dist);
71
72        let mut watermark_columns = WatermarkColumns::new();
73        let mut window_col_idx = None;
74        let mapping = core.i2o_col_mapping();
75        if emit_on_window_close {
76            let window_col = core
77                .eowc_window_column(input.watermark_columns())
78                .expect("checked in `to_eowc_version`");
79            // EOWC HashAgg only propagate one watermark column, the window column.
80            watermark_columns.insert(
81                mapping.map(window_col),
82                input.watermark_columns().get_group(window_col).unwrap(),
83            );
84            window_col_idx = Some(window_col);
85        } else {
86            for idx in core.group_key.indices() {
87                if let Some(wtmk_group) = input.watermark_columns().get_group(idx) {
88                    // Non-EOWC `StreamHashAgg` simply forwards the watermark messages from the input.
89                    watermark_columns.insert(mapping.map(idx), wtmk_group);
90                }
91            }
92        }
93
94        // Hash agg executor might change the append-only behavior of the stream.
95        let base = PlanBase::new_stream_with_core(
96            &core,
97            dist,
98            emit_on_window_close, // in EOWC mode, we produce append only output
99            emit_on_window_close,
100            watermark_columns,
101            MonotonicityMap::new(), // TODO: derive monotonicity
102        );
103        StreamHashAgg {
104            base,
105            core,
106            vnode_col_idx,
107            row_count_idx,
108            emit_on_window_close,
109            window_col_idx,
110        }
111    }
112
113    pub fn agg_calls(&self) -> &[PlanAggCall] {
114        &self.core.agg_calls
115    }
116
117    pub fn group_key(&self) -> &IndexSet {
118        &self.core.group_key
119    }
120
121    pub(crate) fn i2o_col_mapping(&self) -> ColIndexMapping {
122        self.core.i2o_col_mapping()
123    }
124
125    // TODO(rc): It'll be better to force creation of EOWC version through `new`, especially when we
126    // optimize for 2-phase EOWC aggregation later.
127    pub fn to_eowc_version(&self) -> Result<PlanRef> {
128        let input = self.input();
129
130        // check whether the group by columns are valid
131        let _ = self.core.eowc_window_column(input.watermark_columns())?;
132
133        Ok(Self::new_with_eowc(
134            self.core.clone(),
135            self.vnode_col_idx,
136            self.row_count_idx,
137            true,
138        )
139        .into())
140    }
141}
142
143impl Distill for StreamHashAgg {
144    fn distill<'a>(&self) -> XmlNode<'a> {
145        let mut vec = self.core.fields_pretty();
146        if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
147            vec.push(("output_watermarks", ow));
148        }
149        childless_record(
150            plan_node_name!(
151                "StreamHashAgg",
152                { "append_only", self.input().append_only() },
153                { "eowc", self.emit_on_window_close },
154            ),
155            vec,
156        )
157    }
158}
159
160impl PlanTreeNodeUnary for StreamHashAgg {
161    fn input(&self) -> PlanRef {
162        self.core.input.clone()
163    }
164
165    fn clone_with_input(&self, input: PlanRef) -> Self {
166        let logical = generic::Agg {
167            input,
168            ..self.core.clone()
169        };
170        Self::new_with_eowc(
171            logical,
172            self.vnode_col_idx,
173            self.row_count_idx,
174            self.emit_on_window_close,
175        )
176    }
177}
178impl_plan_tree_node_for_unary! { StreamHashAgg }
179
180impl StreamNode for StreamHashAgg {
181    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
182        use risingwave_pb::stream_plan::*;
183        let (intermediate_state_table, agg_states, distinct_dedup_tables) =
184            self.core
185                .infer_tables(&self.base, self.vnode_col_idx, self.window_col_idx);
186
187        PbNodeBody::HashAgg(Box::new(HashAggNode {
188            group_key: self.group_key().to_vec_as_u32(),
189            agg_calls: self
190                .agg_calls()
191                .iter()
192                .map(PlanAggCall::to_protobuf)
193                .collect(),
194
195            is_append_only: self.input().append_only(),
196            agg_call_states: agg_states
197                .into_iter()
198                .map(|s| s.into_prost(state))
199                .collect(),
200            intermediate_state_table: Some(
201                intermediate_state_table
202                    .with_id(state.gen_table_id_wrapped())
203                    .to_internal_table_prost(),
204            ),
205            distinct_dedup_tables: distinct_dedup_tables
206                .into_iter()
207                .sorted_by_key(|(i, _)| *i)
208                .map(|(key_idx, table)| {
209                    (
210                        key_idx as u32,
211                        table
212                            .with_id(state.gen_table_id_wrapped())
213                            .to_internal_table_prost(),
214                    )
215                })
216                .collect(),
217            row_count_index: self.row_count_idx as u32,
218            emit_on_window_close: self.base.emit_on_window_close(),
219            version: PbAggNodeVersion::LATEST as _,
220        }))
221    }
222}
223
224impl ExprRewritable for StreamHashAgg {
225    fn has_rewritable_expr(&self) -> bool {
226        true
227    }
228
229    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
230        let mut core = self.core.clone();
231        core.rewrite_exprs(r);
232        Self::new_with_eowc(
233            core,
234            self.vnode_col_idx,
235            self.row_count_idx,
236            self.emit_on_window_close,
237        )
238        .into()
239    }
240}
241
242impl ExprVisitable for StreamHashAgg {
243    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
244        self.core.visit_exprs(v);
245    }
246}