risingwave_frontend/optimizer/plan_node/
stream_stateless_simple_agg.rs1use risingwave_pb::stream_plan::stream_node::PbNodeBody;
16
17use super::generic::{self, PlanAggCall};
18use super::stream::prelude::*;
19use super::utils::impl_distill_by_unit;
20use super::{
21 ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamPlanRef as PlanRef, StreamPlanRef,
22 TryToStreamPb,
23};
24use crate::expr::{ExprRewriter, ExprVisitor};
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::{MonotonicityMap, RequiredDist, WatermarkColumns};
27use crate::scheduler::SchedulerResult;
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37pub struct StreamStatelessSimpleAgg {
38 pub base: PlanBase<Stream>,
39 core: generic::Agg<PlanRef>,
40}
41
42impl StreamStatelessSimpleAgg {
43 pub fn new(core: generic::Agg<StreamPlanRef>) -> Result<Self> {
44 let input = core.input.clone();
45 reject_upsert_input!(input);
46 let input_dist = input.distribution();
47 debug_assert!(input_dist.satisfies(&RequiredDist::AnyShard));
48
49 let mut watermark_columns = WatermarkColumns::new();
50 for (idx, input_idx) in core.group_key.indices().enumerate() {
52 if let Some(wtmk_group) = input.watermark_columns().get_group(input_idx) {
53 watermark_columns.insert(idx, wtmk_group);
54 }
55 }
56
57 let base = PlanBase::new_stream_with_core(
58 &core,
59 input_dist.clone(),
60 StreamKind::AppendOnly,
62 input.emit_on_window_close(),
63 watermark_columns,
64 MonotonicityMap::new(),
65 );
66
67 Ok(StreamStatelessSimpleAgg { base, core })
68 }
69
70 pub fn agg_calls(&self) -> &[PlanAggCall] {
71 &self.core.agg_calls
72 }
73}
74impl_distill_by_unit!(StreamStatelessSimpleAgg, core, "StreamStatelessSimpleAgg");
75
76impl PlanTreeNodeUnary<Stream> for StreamStatelessSimpleAgg {
77 fn input(&self) -> PlanRef {
78 self.core.input.clone()
79 }
80
81 fn clone_with_input(&self, input: PlanRef) -> Self {
82 let mut core = self.core.clone();
83 core.input = input;
84 Self::new(core).unwrap()
85 }
86}
87impl_plan_tree_node_for_unary! { Stream, StreamStatelessSimpleAgg }
88
89impl TryToStreamPb for StreamStatelessSimpleAgg {
90 fn try_to_stream_prost_body(
91 &self,
92 _state: &mut BuildFragmentGraphState,
93 ) -> SchedulerResult<PbNodeBody> {
94 use risingwave_pb::stream_plan::*;
95 let append_only = self.input().append_only();
96 let agg_calls = self
97 .agg_calls()
98 .iter()
99 .map(|call| call.to_protobuf_checked_pure(self.input().stream_kind().is_retract()))
100 .collect::<crate::error::Result<Vec<_>>>()?;
101 Ok(PbNodeBody::StatelessSimpleAgg(Box::new(SimpleAggNode {
102 agg_calls,
103 row_count_index: u32::MAX, agg_call_states: vec![],
105 intermediate_state_table: None,
106 is_append_only: append_only,
107 distinct_dedup_tables: Default::default(),
108 version: AggNodeVersion::Issue13465 as _,
109 must_output_per_barrier: false, })))
111 }
112}
113
114impl ExprRewritable<Stream> for StreamStatelessSimpleAgg {
115 fn has_rewritable_expr(&self) -> bool {
116 true
117 }
118
119 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
120 let mut core = self.core.clone();
121 core.rewrite_exprs(r);
122 Self::new(core).unwrap().into()
123 }
124}
125
126impl ExprVisitable for StreamStatelessSimpleAgg {
127 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
128 self.core.visit_exprs(v);
129 }
130}