risingwave_frontend/optimizer/plan_node/
stream_hash_agg.rs1use itertools::Itertools;
16use pretty_xmlish::XmlNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::generic::{self, PlanAggCall};
20use super::stream::prelude::*;
21use super::utils::{Distill, childless_record, plan_node_name, watermark_pretty};
22use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamPlanRef as PlanRef, TryToStreamPb};
23use crate::error::Result;
24use crate::expr::{ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
27use crate::scheduler::SchedulerResult;
28use crate::stream_fragmenter::BuildFragmentGraphState;
29use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, IndexSet};
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct StreamHashAgg {
33 pub base: PlanBase<Stream>,
34 core: generic::Agg<PlanRef>,
35
36 vnode_col_idx: Option<usize>,
39
40 row_count_idx: usize,
42
43 emit_on_window_close: bool,
45
46 window_col_idx: Option<usize>,
48}
49
50impl StreamHashAgg {
51 pub fn new(
52 core: generic::Agg<PlanRef>,
53 vnode_col_idx: Option<usize>,
54 row_count_idx: usize,
55 ) -> Result<Self> {
56 Self::new_with_eowc(core, vnode_col_idx, row_count_idx, false)
57 }
58
59 pub fn new_with_eowc(
60 core: generic::Agg<PlanRef>,
61 vnode_col_idx: Option<usize>,
62 row_count_idx: usize,
63 emit_on_window_close: bool,
64 ) -> Result<Self> {
65 assert_eq!(core.agg_calls[row_count_idx], PlanAggCall::count_star());
66 reject_upsert_input!(core.input);
67
68 let input = core.input.clone();
69 let input_dist = input.distribution();
70 let dist = core
71 .i2o_col_mapping()
72 .rewrite_provided_distribution(input_dist);
73
74 let mut watermark_columns = WatermarkColumns::new();
75 let mut window_col_idx = None;
76 let mapping = core.i2o_col_mapping();
77 if emit_on_window_close {
78 let window_col = core
79 .eowc_window_column(input.watermark_columns())
80 .expect("checked in `to_eowc_version`");
81 watermark_columns.insert(
83 mapping.map(window_col),
84 input.watermark_columns().get_group(window_col).unwrap(),
85 );
86 window_col_idx = Some(window_col);
87 } else {
88 for idx in core.group_key.indices() {
89 if let Some(wtmk_group) = input.watermark_columns().get_group(idx) {
90 watermark_columns.insert(mapping.map(idx), wtmk_group);
92 }
93 }
94 }
95
96 let base = PlanBase::new_stream_with_core(
98 &core,
99 dist,
100 if emit_on_window_close {
101 StreamKind::AppendOnly
103 } else {
104 StreamKind::Retract
105 },
106 emit_on_window_close,
107 watermark_columns,
108 MonotonicityMap::new(), );
110
111 Ok(StreamHashAgg {
112 base,
113 core,
114 vnode_col_idx,
115 row_count_idx,
116 emit_on_window_close,
117 window_col_idx,
118 })
119 }
120
121 pub fn agg_calls(&self) -> &[PlanAggCall] {
122 &self.core.agg_calls
123 }
124
125 pub fn group_key(&self) -> &IndexSet {
126 &self.core.group_key
127 }
128
129 pub(crate) fn i2o_col_mapping(&self) -> ColIndexMapping {
130 self.core.i2o_col_mapping()
131 }
132
133 pub fn to_eowc_version(&self) -> Result<PlanRef> {
136 let input = self.input();
137
138 let _ = self.core.eowc_window_column(input.watermark_columns())?;
140
141 Ok(Self::new_with_eowc(
142 self.core.clone(),
143 self.vnode_col_idx,
144 self.row_count_idx,
145 true,
146 )?
147 .into())
148 }
149}
150
151impl Distill for StreamHashAgg {
152 fn distill<'a>(&self) -> XmlNode<'a> {
153 let mut vec = self.core.fields_pretty();
154 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
155 vec.push(("output_watermarks", ow));
156 }
157 childless_record(
158 plan_node_name!(
159 "StreamHashAgg",
160 { "append_only", self.input().append_only() },
161 { "eowc", self.emit_on_window_close },
162 ),
163 vec,
164 )
165 }
166}
167
168impl PlanTreeNodeUnary<Stream> for StreamHashAgg {
169 fn input(&self) -> PlanRef {
170 self.core.input.clone()
171 }
172
173 fn clone_with_input(&self, input: PlanRef) -> Self {
174 let logical = generic::Agg {
175 input,
176 ..self.core.clone()
177 };
178
179 Self::new_with_eowc(
180 logical,
181 self.vnode_col_idx,
182 self.row_count_idx,
183 self.emit_on_window_close,
184 )
185 .unwrap()
186 }
187}
188impl_plan_tree_node_for_unary! { Stream, StreamHashAgg }
189
190impl TryToStreamPb for StreamHashAgg {
191 fn try_to_stream_prost_body(
192 &self,
193 state: &mut BuildFragmentGraphState,
194 ) -> SchedulerResult<PbNodeBody> {
195 use risingwave_pb::stream_plan::*;
196 let (intermediate_state_table, agg_states, distinct_dedup_tables) =
197 self.core
198 .infer_tables(&self.base, self.vnode_col_idx, self.window_col_idx);
199
200 let agg_calls = self
201 .agg_calls()
202 .iter()
203 .map(|call| call.to_protobuf_checked_pure(self.input().stream_kind().is_retract()))
204 .collect::<crate::error::Result<Vec<_>>>()?;
205
206 Ok(PbNodeBody::HashAgg(Box::new(HashAggNode {
207 group_key: self.group_key().to_vec_as_u32(),
208 agg_calls,
209
210 is_append_only: self.input().append_only(),
211 agg_call_states: agg_states
212 .into_iter()
213 .map(|s| s.into_prost(state))
214 .collect(),
215 intermediate_state_table: Some(
216 intermediate_state_table
217 .with_id(state.gen_table_id_wrapped())
218 .to_internal_table_prost(),
219 ),
220 distinct_dedup_tables: distinct_dedup_tables
221 .into_iter()
222 .sorted_by_key(|(i, _)| *i)
223 .map(|(key_idx, table)| {
224 (
225 key_idx as u32,
226 table
227 .with_id(state.gen_table_id_wrapped())
228 .to_internal_table_prost(),
229 )
230 })
231 .collect(),
232 row_count_index: self.row_count_idx as u32,
233 emit_on_window_close: self.base.emit_on_window_close(),
234 version: PbAggNodeVersion::LATEST as _,
235 })))
236 }
237}
238
239impl ExprRewritable<Stream> for StreamHashAgg {
240 fn has_rewritable_expr(&self) -> bool {
241 true
242 }
243
244 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
245 let mut core = self.core.clone();
246 core.rewrite_exprs(r);
247 Self::new_with_eowc(
248 core,
249 self.vnode_col_idx,
250 self.row_count_idx,
251 self.emit_on_window_close,
252 )
253 .unwrap()
254 .into()
255 }
256}
257
258impl ExprVisitable for StreamHashAgg {
259 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
260 self.core.visit_exprs(v);
261 }
262}