risingwave_stream/from_proto/
barrier_recv.rs1use risingwave_pb::stream_plan::BarrierRecvNode;
16
17use super::*;
18use crate::executor::BarrierRecvExecutor;
19
20pub struct BarrierRecvExecutorBuilder;
21
22impl_stream_node_body!(BarrierRecv(BarrierRecvNode) => BarrierRecvExecutorBuilder);
23
24impl ExecutorBuilder for BarrierRecvExecutorBuilder {
25 type Node = BarrierRecvNode;
26
27 async fn new_boxed_executor(
28 params: ExecutorParams,
29 _node: &Self::Node,
30 _store: impl StateStore,
31 ) -> StreamResult<Executor> {
32 assert!(
33 params.input.is_empty(),
34 "barrier receiver should not have input"
35 );
36
37 let barrier_receiver = params
38 .local_barrier_manager
39 .subscribe_barrier(params.actor_context.id);
40
41 let exec = BarrierRecvExecutor::new(params.actor_context, barrier_receiver);
42 Ok((params.info, exec).into())
43 }
44}