risingwave_frontend/optimizer/plan_node/
stream_hash_agg.rs1use itertools::Itertools;
16use pretty_xmlish::XmlNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::generic::{self, PlanAggCall};
20use super::stream::prelude::*;
21use super::utils::{Distill, childless_record, plan_node_name, watermark_pretty};
22use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
23use crate::error::Result;
24use crate::expr::{ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
27use crate::stream_fragmenter::BuildFragmentGraphState;
28use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, IndexSet};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamHashAgg {
32 pub base: PlanBase<Stream>,
33 core: generic::Agg<PlanRef>,
34
35 vnode_col_idx: Option<usize>,
38
39 row_count_idx: usize,
41
42 emit_on_window_close: bool,
44
45 window_col_idx: Option<usize>,
47}
48
49impl StreamHashAgg {
50 pub fn new(
51 core: generic::Agg<PlanRef>,
52 vnode_col_idx: Option<usize>,
53 row_count_idx: usize,
54 ) -> Self {
55 Self::new_with_eowc(core, vnode_col_idx, row_count_idx, false)
56 }
57
58 pub fn new_with_eowc(
59 core: generic::Agg<PlanRef>,
60 vnode_col_idx: Option<usize>,
61 row_count_idx: usize,
62 emit_on_window_close: bool,
63 ) -> Self {
64 assert_eq!(core.agg_calls[row_count_idx], PlanAggCall::count_star());
65
66 let input = core.input.clone();
67 let input_dist = input.distribution();
68 let dist = core
69 .i2o_col_mapping()
70 .rewrite_provided_distribution(input_dist);
71
72 let mut watermark_columns = WatermarkColumns::new();
73 let mut window_col_idx = None;
74 let mapping = core.i2o_col_mapping();
75 if emit_on_window_close {
76 let window_col = core
77 .eowc_window_column(input.watermark_columns())
78 .expect("checked in `to_eowc_version`");
79 watermark_columns.insert(
81 mapping.map(window_col),
82 input.watermark_columns().get_group(window_col).unwrap(),
83 );
84 window_col_idx = Some(window_col);
85 } else {
86 for idx in core.group_key.indices() {
87 if let Some(wtmk_group) = input.watermark_columns().get_group(idx) {
88 watermark_columns.insert(mapping.map(idx), wtmk_group);
90 }
91 }
92 }
93
94 let base = PlanBase::new_stream_with_core(
96 &core,
97 dist,
98 emit_on_window_close, emit_on_window_close,
100 watermark_columns,
101 MonotonicityMap::new(), );
103 StreamHashAgg {
104 base,
105 core,
106 vnode_col_idx,
107 row_count_idx,
108 emit_on_window_close,
109 window_col_idx,
110 }
111 }
112
113 pub fn agg_calls(&self) -> &[PlanAggCall] {
114 &self.core.agg_calls
115 }
116
117 pub fn group_key(&self) -> &IndexSet {
118 &self.core.group_key
119 }
120
121 pub(crate) fn i2o_col_mapping(&self) -> ColIndexMapping {
122 self.core.i2o_col_mapping()
123 }
124
125 pub fn to_eowc_version(&self) -> Result<PlanRef> {
128 let input = self.input();
129
130 let _ = self.core.eowc_window_column(input.watermark_columns())?;
132
133 Ok(Self::new_with_eowc(
134 self.core.clone(),
135 self.vnode_col_idx,
136 self.row_count_idx,
137 true,
138 )
139 .into())
140 }
141}
142
143impl Distill for StreamHashAgg {
144 fn distill<'a>(&self) -> XmlNode<'a> {
145 let mut vec = self.core.fields_pretty();
146 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
147 vec.push(("output_watermarks", ow));
148 }
149 childless_record(
150 plan_node_name!(
151 "StreamHashAgg",
152 { "append_only", self.input().append_only() },
153 { "eowc", self.emit_on_window_close },
154 ),
155 vec,
156 )
157 }
158}
159
160impl PlanTreeNodeUnary for StreamHashAgg {
161 fn input(&self) -> PlanRef {
162 self.core.input.clone()
163 }
164
165 fn clone_with_input(&self, input: PlanRef) -> Self {
166 let logical = generic::Agg {
167 input,
168 ..self.core.clone()
169 };
170 Self::new_with_eowc(
171 logical,
172 self.vnode_col_idx,
173 self.row_count_idx,
174 self.emit_on_window_close,
175 )
176 }
177}
178impl_plan_tree_node_for_unary! { StreamHashAgg }
179
180impl StreamNode for StreamHashAgg {
181 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
182 use risingwave_pb::stream_plan::*;
183 let (intermediate_state_table, agg_states, distinct_dedup_tables) =
184 self.core
185 .infer_tables(&self.base, self.vnode_col_idx, self.window_col_idx);
186
187 PbNodeBody::HashAgg(Box::new(HashAggNode {
188 group_key: self.group_key().to_vec_as_u32(),
189 agg_calls: self
190 .agg_calls()
191 .iter()
192 .map(PlanAggCall::to_protobuf)
193 .collect(),
194
195 is_append_only: self.input().append_only(),
196 agg_call_states: agg_states
197 .into_iter()
198 .map(|s| s.into_prost(state))
199 .collect(),
200 intermediate_state_table: Some(
201 intermediate_state_table
202 .with_id(state.gen_table_id_wrapped())
203 .to_internal_table_prost(),
204 ),
205 distinct_dedup_tables: distinct_dedup_tables
206 .into_iter()
207 .sorted_by_key(|(i, _)| *i)
208 .map(|(key_idx, table)| {
209 (
210 key_idx as u32,
211 table
212 .with_id(state.gen_table_id_wrapped())
213 .to_internal_table_prost(),
214 )
215 })
216 .collect(),
217 row_count_index: self.row_count_idx as u32,
218 emit_on_window_close: self.base.emit_on_window_close(),
219 version: PbAggNodeVersion::LATEST as _,
220 }))
221 }
222}
223
224impl ExprRewritable for StreamHashAgg {
225 fn has_rewritable_expr(&self) -> bool {
226 true
227 }
228
229 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
230 let mut core = self.core.clone();
231 core.rewrite_exprs(r);
232 Self::new_with_eowc(
233 core,
234 self.vnode_col_idx,
235 self.row_count_idx,
236 self.emit_on_window_close,
237 )
238 .into()
239 }
240}
241
242impl ExprVisitable for StreamHashAgg {
243 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
244 self.core.visit_exprs(v);
245 }
246}