risingwave_frontend/optimizer/plan_node/
stream_stateless_simple_agg.rs1use itertools::Itertools;
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::generic::{self, PlanAggCall};
19use super::stream::prelude::*;
20use super::utils::impl_distill_by_unit;
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
22use crate::expr::{ExprRewriter, ExprVisitor};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::property::{MonotonicityMap, RequiredDist, WatermarkColumns};
25use crate::stream_fragmenter::BuildFragmentGraphState;
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct StreamStatelessSimpleAgg {
35 pub base: PlanBase<Stream>,
36 core: generic::Agg<PlanRef>,
37}
38
39impl StreamStatelessSimpleAgg {
40 pub fn new(core: generic::Agg<PlanRef>) -> Self {
41 let input = core.input.clone();
42 let input_dist = input.distribution();
43 debug_assert!(input_dist.satisfies(&RequiredDist::AnyShard));
44
45 let mut watermark_columns = WatermarkColumns::new();
46 for (idx, input_idx) in core.group_key.indices().enumerate() {
48 if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
49 watermark_columns.insert(idx, wtmk_group);
50 }
51 }
52
53 let base = PlanBase::new_stream_with_core(
54 &core,
55 input_dist.clone(),
56 input.append_only(),
57 input.emit_on_window_close(),
58 watermark_columns,
59 MonotonicityMap::new(),
60 );
61 StreamStatelessSimpleAgg { base, core }
62 }
63
64 pub fn agg_calls(&self) -> &[PlanAggCall] {
65 &self.core.agg_calls
66 }
67}
68impl_distill_by_unit!(StreamStatelessSimpleAgg, core, "StreamStatelessSimpleAgg");
69
70impl PlanTreeNodeUnary for StreamStatelessSimpleAgg {
71 fn input(&self) -> PlanRef {
72 self.core.input.clone()
73 }
74
75 fn clone_with_input(&self, input: PlanRef) -> Self {
76 let mut core = self.core.clone();
77 core.input = input;
78 Self::new(core)
79 }
80}
81impl_plan_tree_node_for_unary! { StreamStatelessSimpleAgg }
82
83impl StreamNode for StreamStatelessSimpleAgg {
84 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
85 use risingwave_pb::stream_plan::*;
86 PbNodeBody::StatelessSimpleAgg(Box::new(SimpleAggNode {
87 agg_calls: self
88 .agg_calls()
89 .iter()
90 .map(PlanAggCall::to_protobuf)
91 .collect(),
92 row_count_index: u32::MAX, distribution_key: self
94 .distribution()
95 .dist_column_indices()
96 .iter()
97 .map(|idx| *idx as u32)
98 .collect_vec(),
99 agg_call_states: vec![],
100 intermediate_state_table: None,
101 is_append_only: self.input().append_only(),
102 distinct_dedup_tables: Default::default(),
103 version: AggNodeVersion::Issue13465 as _,
104 must_output_per_barrier: false, }))
106 }
107}
108
109impl ExprRewritable for StreamStatelessSimpleAgg {
110 fn has_rewritable_expr(&self) -> bool {
111 true
112 }
113
114 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
115 let mut core = self.core.clone();
116 core.rewrite_exprs(r);
117 Self::new(core).into()
118 }
119}
120
121impl ExprVisitable for StreamStatelessSimpleAgg {
122 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
123 self.core.visit_exprs(v);
124 }
125}