risingwave_frontend/optimizer/plan_node/
stream_stateless_simple_agg.rs1use risingwave_pb::stream_plan::stream_node::PbNodeBody;
16
17use super::generic::{self, PlanAggCall};
18use super::stream::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21 ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef,
22 StreamPlanRef,
23};
24use crate::expr::{ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::{MonotonicityMap, RequiredDist, WatermarkColumns};
27use crate::stream_fragmenter::BuildFragmentGraphState;
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct StreamStatelessSimpleAgg {
37 pub base: PlanBase<Stream>,
38 core: generic::Agg<PlanRef>,
39}
40
41impl StreamStatelessSimpleAgg {
42 pub fn new(core: generic::Agg<StreamPlanRef>) -> Result<Self> {
43 let input = core.input.clone();
44 reject_upsert_input!(input);
45 let input_dist = input.distribution();
46 debug_assert!(input_dist.satisfies(&RequiredDist::AnyShard));
47
48 let mut watermark_columns = WatermarkColumns::new();
49 for (idx, input_idx) in core.group_key.indices().enumerate() {
51 if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
52 watermark_columns.insert(idx, wtmk_group);
53 }
54 }
55
56 let base = PlanBase::new_stream_with_core(
57 &core,
58 input_dist.clone(),
59 StreamKind::AppendOnly,
61 input.emit_on_window_close(),
62 watermark_columns,
63 MonotonicityMap::new(),
64 );
65
66 Ok(StreamStatelessSimpleAgg { base, core })
67 }
68
69 pub fn agg_calls(&self) -> &[PlanAggCall] {
70 &self.core.agg_calls
71 }
72}
73impl_distill_by_unit!(StreamStatelessSimpleAgg, core, "StreamStatelessSimpleAgg");
74
75impl PlanTreeNodeUnary<Stream> for StreamStatelessSimpleAgg {
76 fn input(&self) -> PlanRef {
77 self.core.input.clone()
78 }
79
80 fn clone_with_input(&self, input: PlanRef) -> Self {
81 let mut core = self.core.clone();
82 core.input = input;
83 Self::new(core).unwrap()
84 }
85}
86impl_plan_tree_node_for_unary! { Stream, StreamStatelessSimpleAgg }
87
88impl StreamNode for StreamStatelessSimpleAgg {
89 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
90 use risingwave_pb::stream_plan::*;
91 PbNodeBody::StatelessSimpleAgg(Box::new(SimpleAggNode {
92 agg_calls: self
93 .agg_calls()
94 .iter()
95 .map(PlanAggCall::to_protobuf)
96 .collect(),
97 row_count_index: u32::MAX, agg_call_states: vec![],
99 intermediate_state_table: None,
100 is_append_only: self.input().append_only(),
101 distinct_dedup_tables: Default::default(),
102 version: AggNodeVersion::Issue13465 as _,
103 must_output_per_barrier: false, }))
105 }
106}
107
108impl ExprRewritable<Stream> for StreamStatelessSimpleAgg {
109 fn has_rewritable_expr(&self) -> bool {
110 true
111 }
112
113 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
114 let mut core = self.core.clone();
115 core.rewrite_exprs(r);
116 Self::new(core).unwrap().into()
117 }
118}
119
120impl ExprVisitable for StreamStatelessSimpleAgg {
121 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
122 self.core.visit_exprs(v);
123 }
124}