risingwave_frontend/optimizer/plan_node/
stream_simple_agg.rs1use itertools::Itertools;
16use pretty_xmlish::XmlNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::generic::{self, PlanAggCall};
20use super::stream::prelude::*;
21use super::utils::{Distill, childless_record, plan_node_name};
22use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
26use crate::stream_fragmenter::BuildFragmentGraphState;
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct StreamSimpleAgg {
30 pub base: PlanBase<Stream>,
31 core: generic::Agg<PlanRef>,
32
33 row_count_idx: usize,
35
36 must_output_per_barrier: bool,
39}
40
41impl StreamSimpleAgg {
42 pub fn new(
43 core: generic::Agg<PlanRef>,
44 row_count_idx: usize,
45 must_output_per_barrier: bool,
46 ) -> Self {
47 assert_eq!(core.agg_calls[row_count_idx], PlanAggCall::count_star());
48
49 let input = core.input.clone();
50 let input_dist = input.distribution();
51 let dist = match input_dist {
52 Distribution::Single => Distribution::Single,
53 _ => panic!(),
54 };
55
56 let watermark_columns = WatermarkColumns::new();
58
59 let base = PlanBase::new_stream_with_core(
61 &core,
62 dist,
63 false,
64 false,
65 watermark_columns,
66 MonotonicityMap::new(),
67 );
68 StreamSimpleAgg {
69 base,
70 core,
71 row_count_idx,
72 must_output_per_barrier,
73 }
74 }
75
76 pub fn agg_calls(&self) -> &[PlanAggCall] {
77 &self.core.agg_calls
78 }
79}
80
81impl Distill for StreamSimpleAgg {
82 fn distill<'a>(&self) -> XmlNode<'a> {
83 let name = plan_node_name!("StreamSimpleAgg",
84 { "append_only", self.input().append_only() },
85 );
86 let mut vec = self.core.fields_pretty();
87 if self.must_output_per_barrier {
88 vec.push(("must_output_per_barrier", "true".into()));
89 }
90 childless_record(name, vec)
91 }
92}
93
94impl PlanTreeNodeUnary for StreamSimpleAgg {
95 fn input(&self) -> PlanRef {
96 self.core.input.clone()
97 }
98
99 fn clone_with_input(&self, input: PlanRef) -> Self {
100 let logical = generic::Agg {
101 input,
102 ..self.core.clone()
103 };
104 Self::new(logical, self.row_count_idx, self.must_output_per_barrier)
105 }
106}
107impl_plan_tree_node_for_unary! { StreamSimpleAgg }
108
109impl StreamNode for StreamSimpleAgg {
110 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
111 use risingwave_pb::stream_plan::*;
112 let (intermediate_state_table, agg_states, distinct_dedup_tables) =
113 self.core.infer_tables(&self.base, None, None);
114
115 PbNodeBody::SimpleAgg(Box::new(SimpleAggNode {
116 agg_calls: self
117 .agg_calls()
118 .iter()
119 .map(PlanAggCall::to_protobuf)
120 .collect(),
121 distribution_key: self
122 .base
123 .distribution()
124 .dist_column_indices()
125 .iter()
126 .map(|idx| *idx as u32)
127 .collect(),
128 is_append_only: self.input().append_only(),
129 agg_call_states: agg_states
130 .into_iter()
131 .map(|s| s.into_prost(state))
132 .collect(),
133 intermediate_state_table: Some(
134 intermediate_state_table
135 .with_id(state.gen_table_id_wrapped())
136 .to_internal_table_prost(),
137 ),
138 distinct_dedup_tables: distinct_dedup_tables
139 .into_iter()
140 .sorted_by_key(|(i, _)| *i)
141 .map(|(key_idx, table)| {
142 (
143 key_idx as u32,
144 table
145 .with_id(state.gen_table_id_wrapped())
146 .to_internal_table_prost(),
147 )
148 })
149 .collect(),
150 row_count_index: self.row_count_idx as u32,
151 version: PbAggNodeVersion::LATEST as _,
152 must_output_per_barrier: self.must_output_per_barrier,
153 }))
154 }
155}
156
157impl ExprRewritable for StreamSimpleAgg {
158 fn has_rewritable_expr(&self) -> bool {
159 true
160 }
161
162 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
163 let mut core = self.core.clone();
164 core.rewrite_exprs(r);
165 Self::new(core, self.row_count_idx, self.must_output_per_barrier).into()
166 }
167}
168
169impl ExprVisitable for StreamSimpleAgg {
170 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
171 self.core.visit_exprs(v);
172 }
173}