risingwave_stream/executor/
barrier_recv.rs1use tokio::sync::mpsc::UnboundedReceiver;
16use tokio_stream::wrappers::UnboundedReceiverStream;
17
18use crate::executor::prelude::*;
19
20pub struct BarrierRecvExecutor {
23 _ctx: ActorContextRef,
24
25 barrier_receiver: UnboundedReceiver<Barrier>,
27}
28
29impl BarrierRecvExecutor {
30 pub fn new(ctx: ActorContextRef, barrier_receiver: UnboundedReceiver<Barrier>) -> Self {
31 Self {
32 _ctx: ctx,
33 barrier_receiver,
34 }
35 }
36
37 pub fn for_test(barrier_receiver: UnboundedReceiver<Barrier>) -> Self {
38 Self::new(ActorContext::for_test(0), barrier_receiver)
39 }
40}
41
42impl Execute for BarrierRecvExecutor {
43 fn execute(self: Box<Self>) -> BoxedMessageStream {
44 UnboundedReceiverStream::new(self.barrier_receiver)
45 .map(|barrier| Ok(Message::Barrier(barrier)))
46 .chain(futures::stream::once(async {
47 Err(StreamExecutorError::channel_closed("barrier receiver"))
50 }))
51 .boxed()
52 }
53}
54
55#[cfg(test)]
56mod tests {
57 use futures::pin_mut;
58 use risingwave_common::util::epoch::test_epoch;
59 use tokio::sync::mpsc;
60
61 use super::*;
62 use crate::executor::test_utils::StreamExecutorTestExt;
63
64 #[tokio::test]
65 async fn test_barrier_recv() {
66 let (barrier_tx, barrier_rx) = mpsc::unbounded_channel();
67
68 let barrier_recv = BarrierRecvExecutor::for_test(barrier_rx).boxed();
69 let stream = barrier_recv.execute();
70 pin_mut!(stream);
71
72 barrier_tx
73 .send(Barrier::new_test_barrier(test_epoch(1)))
74 .unwrap();
75 barrier_tx
76 .send(Barrier::new_test_barrier(test_epoch(2)))
77 .unwrap();
78
79 let barrier_1 = stream.next_unwrap_ready_barrier().unwrap();
80 assert_eq!(barrier_1.epoch.curr, test_epoch(1));
81 let barrier_2 = stream.next_unwrap_ready_barrier().unwrap();
82 assert_eq!(barrier_2.epoch.curr, test_epoch(2));
83
84 stream.next_unwrap_pending();
85
86 drop(barrier_tx);
87 assert!(stream.next_unwrap_ready().is_err());
88 }
89}