risingwave_stream/from_proto/
stateless_simple_agg.rs1use risingwave_expr::aggregate::AggCall;
16use risingwave_pb::stream_plan::SimpleAggNode;
17
18use super::*;
19use crate::executor::aggregate::StatelessSimpleAggExecutor;
20
21pub struct StatelessSimpleAggExecutorBuilder;
22
23impl ExecutorBuilder for StatelessSimpleAggExecutorBuilder {
24 type Node = SimpleAggNode;
25
26 async fn new_boxed_executor(
27 params: ExecutorParams,
28 node: &Self::Node,
29 _store: impl StateStore,
30 ) -> StreamResult<Executor> {
31 let [input]: [_; 1] = params.input.try_into().unwrap();
32 let agg_calls: Vec<AggCall> = node
33 .get_agg_calls()
34 .iter()
35 .map(AggCall::from_protobuf)
36 .try_collect()?;
37
38 let exec = StatelessSimpleAggExecutor::new(
39 params.actor_context,
40 input,
41 params.info.schema.clone(),
42 agg_calls,
43 )?;
44 Ok((params.info, exec).into())
45 }
46}