risingwave_frontend/optimizer/plan_node/
stream_simple_agg.rs1use itertools::Itertools;
16use pretty_xmlish::XmlNode;
17use risingwave_pb::stream_plan::stream_node::PbNodeBody;
18
19use super::generic::{self, PlanAggCall};
20use super::stream::prelude::*;
21use super::utils::{Distill, childless_record, plan_node_name};
22use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamPlanRef as PlanRef, TryToStreamPb};
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
26use crate::scheduler::SchedulerResult;
27use crate::stream_fragmenter::BuildFragmentGraphState;
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct StreamSimpleAgg {
31 pub base: PlanBase<Stream>,
32 core: generic::Agg<PlanRef>,
33
34 row_count_idx: usize,
36
37 must_output_per_barrier: bool,
40}
41
42impl StreamSimpleAgg {
43 pub fn new(
44 core: generic::Agg<PlanRef>,
45 row_count_idx: usize,
46 must_output_per_barrier: bool,
47 ) -> Result<Self> {
48 assert_eq!(core.agg_calls[row_count_idx], PlanAggCall::count_star());
49 reject_upsert_input!(core.input);
50
51 let input = core.input.clone();
52 let input_dist = input.distribution();
53 let dist = match input_dist {
54 Distribution::Single => Distribution::Single,
55 _ => panic!(),
56 };
57
58 let watermark_columns = WatermarkColumns::new();
60
61 let base = PlanBase::new_stream_with_core(
63 &core,
64 dist,
65 StreamKind::Retract,
66 false,
67 watermark_columns,
68 MonotonicityMap::new(),
69 );
70
71 Ok(StreamSimpleAgg {
72 base,
73 core,
74 row_count_idx,
75 must_output_per_barrier,
76 })
77 }
78
79 pub fn agg_calls(&self) -> &[PlanAggCall] {
80 &self.core.agg_calls
81 }
82}
83
84impl Distill for StreamSimpleAgg {
85 fn distill<'a>(&self) -> XmlNode<'a> {
86 let name = plan_node_name!("StreamSimpleAgg",
87 { "append_only", self.input().append_only() },
88 );
89 let mut vec = self.core.fields_pretty();
90 if self.must_output_per_barrier {
91 vec.push(("must_output_per_barrier", "true".into()));
92 }
93 childless_record(name, vec)
94 }
95}
96
97impl PlanTreeNodeUnary<Stream> for StreamSimpleAgg {
98 fn input(&self) -> PlanRef {
99 self.core.input.clone()
100 }
101
102 fn clone_with_input(&self, input: PlanRef) -> Self {
103 let logical = generic::Agg {
104 input,
105 ..self.core.clone()
106 };
107 Self::new(logical, self.row_count_idx, self.must_output_per_barrier).unwrap()
108 }
109}
110impl_plan_tree_node_for_unary! { Stream, StreamSimpleAgg }
111
112impl TryToStreamPb for StreamSimpleAgg {
113 fn try_to_stream_prost_body(
114 &self,
115 state: &mut BuildFragmentGraphState,
116 ) -> SchedulerResult<PbNodeBody> {
117 use risingwave_pb::stream_plan::*;
118 let (intermediate_state_table, agg_states, distinct_dedup_tables) =
119 self.core.infer_tables(&self.base, None, None);
120
121 let append_only = self.input().append_only();
122 let agg_calls = self
123 .agg_calls()
124 .iter()
125 .map(|call| call.to_protobuf_checked_pure(self.input().stream_kind().is_retract()))
126 .collect::<crate::error::Result<Vec<_>>>()?;
127
128 Ok(PbNodeBody::SimpleAgg(Box::new(SimpleAggNode {
129 agg_calls,
130 is_append_only: append_only,
131 agg_call_states: agg_states
132 .into_iter()
133 .map(|s| s.into_prost(state))
134 .collect(),
135 intermediate_state_table: Some(
136 intermediate_state_table
137 .with_id(state.gen_table_id_wrapped())
138 .to_internal_table_prost(),
139 ),
140 distinct_dedup_tables: distinct_dedup_tables
141 .into_iter()
142 .sorted_by_key(|(i, _)| *i)
143 .map(|(key_idx, table)| {
144 (
145 key_idx as u32,
146 table
147 .with_id(state.gen_table_id_wrapped())
148 .to_internal_table_prost(),
149 )
150 })
151 .collect(),
152 row_count_index: self.row_count_idx as u32,
153 version: PbAggNodeVersion::LATEST as _,
154 must_output_per_barrier: self.must_output_per_barrier,
155 })))
156 }
157}
158
159impl ExprRewritable<Stream> for StreamSimpleAgg {
160 fn has_rewritable_expr(&self) -> bool {
161 true
162 }
163
164 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
165 let mut core = self.core.clone();
166 core.rewrite_exprs(r);
167 Self::new(core, self.row_count_idx, self.must_output_per_barrier)
168 .unwrap()
169 .into()
170 }
171}
172
173impl ExprVisitable for StreamSimpleAgg {
174 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
175 self.core.visit_exprs(v);
176 }
177}