risingwave_frontend/optimizer/plan_node/
stream_stateless_simple_agg.rsuse fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use super::generic::{self, PlanAggCall};
use super::stream::prelude::*;
use super::utils::impl_distill_by_unit;
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::{MonotonicityMap, RequiredDist};
use crate::stream_fragmenter::BuildFragmentGraphState;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamStatelessSimpleAgg {
pub base: PlanBase<Stream>,
core: generic::Agg<PlanRef>,
}
impl StreamStatelessSimpleAgg {
pub fn new(core: generic::Agg<PlanRef>) -> Self {
let input = core.input.clone();
let input_dist = input.distribution();
debug_assert!(input_dist.satisfies(&RequiredDist::AnyShard));
let mut watermark_columns = FixedBitSet::with_capacity(core.output_len());
for (idx, input_idx) in core.group_key.indices().enumerate() {
if input.watermark_columns().contains(input_idx) {
watermark_columns.insert(idx);
}
}
let base = PlanBase::new_stream_with_core(
&core,
input_dist.clone(),
input.append_only(),
input.emit_on_window_close(),
watermark_columns,
MonotonicityMap::new(),
);
StreamStatelessSimpleAgg { base, core }
}
pub fn agg_calls(&self) -> &[PlanAggCall] {
&self.core.agg_calls
}
}
impl_distill_by_unit!(StreamStatelessSimpleAgg, core, "StreamStatelessSimpleAgg");
impl PlanTreeNodeUnary for StreamStatelessSimpleAgg {
fn input(&self) -> PlanRef {
self.core.input.clone()
}
fn clone_with_input(&self, input: PlanRef) -> Self {
let mut core = self.core.clone();
core.input = input;
Self::new(core)
}
}
impl_plan_tree_node_for_unary! { StreamStatelessSimpleAgg }
impl StreamNode for StreamStatelessSimpleAgg {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
use risingwave_pb::stream_plan::*;
PbNodeBody::StatelessSimpleAgg(SimpleAggNode {
agg_calls: self
.agg_calls()
.iter()
.map(PlanAggCall::to_protobuf)
.collect(),
row_count_index: u32::MAX, distribution_key: self
.distribution()
.dist_column_indices()
.iter()
.map(|idx| *idx as u32)
.collect_vec(),
agg_call_states: vec![],
intermediate_state_table: None,
is_append_only: self.input().append_only(),
distinct_dedup_tables: Default::default(),
version: AggNodeVersion::Issue13465 as _,
must_output_per_barrier: false, })
}
}
impl ExprRewritable for StreamStatelessSimpleAgg {
fn has_rewritable_expr(&self) -> bool {
true
}
fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
let mut core = self.core.clone();
core.rewrite_exprs(r);
Self::new(core).into()
}
}
impl ExprVisitable for StreamStatelessSimpleAgg {
fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
self.core.visit_exprs(v);
}
}