risingwave_stream/executor/
barrier_recv.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use tokio::sync::mpsc::UnboundedReceiver;
16use tokio_stream::wrappers::UnboundedReceiverStream;
17
18use crate::executor::prelude::*;
19
20/// The executor only for receiving barrier from the meta service. It always resides in the leaves
21/// of the streaming graph.
22pub struct BarrierRecvExecutor {
23    _ctx: ActorContextRef,
24
25    /// The barrier receiver registered in the local barrier manager.
26    barrier_receiver: UnboundedReceiver<Barrier>,
27}
28
29impl BarrierRecvExecutor {
30    pub fn new(ctx: ActorContextRef, barrier_receiver: UnboundedReceiver<Barrier>) -> Self {
31        Self {
32            _ctx: ctx,
33            barrier_receiver,
34        }
35    }
36
37    pub fn for_test(barrier_receiver: UnboundedReceiver<Barrier>) -> Self {
38        Self::new(ActorContext::for_test(0), barrier_receiver)
39    }
40}
41
42impl Execute for BarrierRecvExecutor {
43    fn execute(self: Box<Self>) -> BoxedMessageStream {
44        UnboundedReceiverStream::new(self.barrier_receiver)
45            .map(|barrier| Ok(Message::Barrier(barrier)))
46            .chain(futures::stream::once(async {
47                // We do not use the stream termination as the control message, and this line should
48                // never be reached in normal cases. So we just return an error here.
49                Err(StreamExecutorError::channel_closed("barrier receiver"))
50            }))
51            .boxed()
52    }
53}
54
55#[cfg(test)]
56mod tests {
57    use futures::pin_mut;
58    use risingwave_common::util::epoch::test_epoch;
59    use tokio::sync::mpsc;
60
61    use super::*;
62    use crate::executor::test_utils::StreamExecutorTestExt;
63
64    #[tokio::test]
65    async fn test_barrier_recv() {
66        let (barrier_tx, barrier_rx) = mpsc::unbounded_channel();
67
68        let barrier_recv = BarrierRecvExecutor::for_test(barrier_rx).boxed();
69        let stream = barrier_recv.execute();
70        pin_mut!(stream);
71
72        barrier_tx
73            .send(Barrier::new_test_barrier(test_epoch(1)))
74            .unwrap();
75        barrier_tx
76            .send(Barrier::new_test_barrier(test_epoch(2)))
77            .unwrap();
78
79        let barrier_1 = stream.next_unwrap_ready_barrier().unwrap();
80        assert_eq!(barrier_1.epoch.curr, test_epoch(1));
81        let barrier_2 = stream.next_unwrap_ready_barrier().unwrap();
82        assert_eq!(barrier_2.epoch.curr, test_epoch(2));
83
84        stream.next_unwrap_pending();
85
86        drop(barrier_tx);
87        assert!(stream.next_unwrap_ready().is_err());
88    }
89}