risingwave_stream/from_proto/
stateless_simple_agg.rs1use risingwave_expr::aggregate::AggCall;
16use risingwave_pb::stream_plan::SimpleAggNode;
17
18use super::*;
19use crate::executor::aggregate::StatelessSimpleAggExecutor;
20
21pub struct StatelessSimpleAggExecutorBuilder;
22
23impl_stream_node_body!(StatelessSimpleAgg(SimpleAggNode) => StatelessSimpleAggExecutorBuilder);
24
25impl ExecutorBuilder for StatelessSimpleAggExecutorBuilder {
26 type Node = SimpleAggNode;
27
28 async fn new_boxed_executor(
29 params: ExecutorParams,
30 node: &Self::Node,
31 _store: impl StateStore,
32 ) -> StreamResult<Executor> {
33 let [input]: [_; 1] = params.input.try_into().unwrap();
34 let agg_calls: Vec<AggCall> = node
35 .get_agg_calls()
36 .iter()
37 .map(AggCall::from_protobuf)
38 .try_collect()?;
39
40 let exec = StatelessSimpleAggExecutor::new(
41 params.actor_context,
42 input,
43 params.info.schema.clone(),
44 agg_calls,
45 )?;
46 Ok((params.info, exec).into())
47 }
48}