risingwave_stream/executor/
barrier_align.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Instant;
17
18use anyhow::Context;
19use enum_as_inner::EnumAsInner;
20use futures::StreamExt;
21use futures::future::{Either, select};
22use futures_async_stream::try_stream;
23use risingwave_common::bail;
24
25use super::error::StreamExecutorError;
26use super::{Barrier, BoxedMessageStream, Message, StreamChunk, StreamExecutorResult, Watermark};
27use crate::executor::monitor::StreamingMetrics;
28use crate::task::{ActorId, FragmentId};
29
30pub type AlignedMessageStreamItem = StreamExecutorResult<AlignedMessage>;
31pub trait AlignedMessageStream = futures::Stream<Item = AlignedMessageStreamItem> + Send;
32
33#[derive(Debug, EnumAsInner, PartialEq)]
34pub enum AlignedMessage {
35    Barrier(Barrier),
36    WatermarkLeft(Watermark),
37    WatermarkRight(Watermark),
38    Left(StreamChunk),
39    Right(StreamChunk),
40}
41
42#[try_stream(ok = AlignedMessage, error = StreamExecutorError)]
43pub async fn barrier_align(
44    mut left: BoxedMessageStream,
45    mut right: BoxedMessageStream,
46    actor_id: ActorId,
47    fragment_id: FragmentId,
48    metrics: Arc<StreamingMetrics>,
49    executor_name: &str,
50) {
51    let actor_id = actor_id.to_string();
52    let fragment_id = fragment_id.to_string();
53    let left_barrier_align_duration = metrics.barrier_align_duration.with_guarded_label_values(&[
54        &actor_id,
55        &fragment_id,
56        "left",
57        executor_name,
58    ]);
59    let right_barrier_align_duration = metrics.barrier_align_duration.with_guarded_label_values(&[
60        &actor_id,
61        &fragment_id,
62        "right",
63        executor_name,
64    ]);
65    loop {
66        let prefer_left: bool = rand::random();
67        let select_result = if prefer_left {
68            select(left.next(), right.next()).await
69        } else {
70            match select(right.next(), left.next()).await {
71                Either::Left(x) => Either::Right(x),
72                Either::Right(x) => Either::Left(x),
73            }
74        };
75        match select_result {
76            Either::Left((None, _)) => {
77                // left stream end, passthrough right chunks
78                while let Some(msg) = right.next().await {
79                    match msg? {
80                        Message::Watermark(watermark) => {
81                            yield AlignedMessage::WatermarkRight(watermark)
82                        }
83                        Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
84                        Message::Barrier(_) => {
85                            bail!("right barrier received while left stream end");
86                        }
87                    }
88                }
89                break;
90            }
91            Either::Right((None, _)) => {
92                // right stream end, passthrough left chunks
93                while let Some(msg) = left.next().await {
94                    match msg? {
95                        Message::Watermark(watermark) => {
96                            yield AlignedMessage::WatermarkLeft(watermark)
97                        }
98                        Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
99                        Message::Barrier(_) => {
100                            bail!("left barrier received while right stream end");
101                        }
102                    }
103                }
104                break;
105            }
106            Either::Left((Some(msg), _)) => match msg? {
107                Message::Watermark(watermark) => yield AlignedMessage::WatermarkLeft(watermark),
108                Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
109                Message::Barrier(_) => loop {
110                    let start_time = Instant::now();
111                    // received left barrier, waiting for right barrier
112                    match right
113                        .next()
114                        .await
115                        .context("failed to poll right message, stream closed unexpectedly")??
116                    {
117                        Message::Watermark(watermark) => {
118                            yield AlignedMessage::WatermarkRight(watermark)
119                        }
120                        Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
121                        Message::Barrier(barrier) => {
122                            yield AlignedMessage::Barrier(barrier);
123                            right_barrier_align_duration
124                                .inc_by(start_time.elapsed().as_nanos() as u64);
125                            break;
126                        }
127                    }
128                },
129            },
130            Either::Right((Some(msg), _)) => match msg? {
131                Message::Watermark(watermark) => yield AlignedMessage::WatermarkRight(watermark),
132                Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
133                Message::Barrier(_) => loop {
134                    let start_time = Instant::now();
135                    // received right barrier, waiting for left barrier
136                    match left
137                        .next()
138                        .await
139                        .context("failed to poll left message, stream closed unexpectedly")??
140                    {
141                        Message::Watermark(watermark) => {
142                            yield AlignedMessage::WatermarkLeft(watermark)
143                        }
144                        Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
145                        Message::Barrier(barrier) => {
146                            yield AlignedMessage::Barrier(barrier);
147                            left_barrier_align_duration
148                                .inc_by(start_time.elapsed().as_nanos() as u64);
149                            break;
150                        }
151                    }
152                },
153            },
154        }
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use std::time::Duration;
161
162    use async_stream::try_stream;
163    use futures::{Stream, TryStreamExt};
164    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
165    use risingwave_common::util::epoch::test_epoch;
166    use tokio::time::sleep;
167
168    use super::*;
169
170    fn barrier_align_for_test(
171        left: BoxedMessageStream,
172        right: BoxedMessageStream,
173    ) -> impl Stream<Item = Result<AlignedMessage, StreamExecutorError>> {
174        barrier_align(
175            left,
176            right,
177            0,
178            0,
179            Arc::new(StreamingMetrics::unused()),
180            "dummy_executor",
181        )
182    }
183
184    #[tokio::test]
185    async fn test_barrier_align() {
186        let left = try_stream! {
187            yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
188            yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
189            yield Message::Chunk(StreamChunk::from_pretty("I\n + 2"));
190            yield Message::Barrier(Barrier::new_test_barrier(test_epoch(2)));
191        }
192        .boxed();
193        let right = try_stream! {
194            sleep(Duration::from_millis(1)).await;
195            yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
196            yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
197            yield Message::Barrier(Barrier::new_test_barrier(test_epoch(2)));
198            yield Message::Chunk(StreamChunk::from_pretty("I\n + 3"));
199        }
200        .boxed();
201        let output: Vec<_> = barrier_align_for_test(left, right)
202            .try_collect()
203            .await
204            .unwrap();
205        assert_eq!(
206            output,
207            vec![
208                AlignedMessage::Left(StreamChunk::from_pretty("I\n + 1")),
209                AlignedMessage::Right(StreamChunk::from_pretty("I\n + 1")),
210                AlignedMessage::Barrier(Barrier::new_test_barrier(test_epoch(1))),
211                AlignedMessage::Left(StreamChunk::from_pretty("I\n + 2")),
212                AlignedMessage::Barrier(Barrier::new_test_barrier(2 * test_epoch(1))),
213                AlignedMessage::Right(StreamChunk::from_pretty("I\n + 3")),
214            ]
215        );
216    }
217
218    #[tokio::test]
219    #[should_panic]
220    async fn left_barrier_right_end_1() {
221        let left = try_stream! {
222            sleep(Duration::from_millis(1)).await;
223            yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
224            yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
225        }
226        .boxed();
227        let right = try_stream! {
228            yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
229        }
230        .boxed();
231        let _output: Vec<_> = barrier_align_for_test(left, right)
232            .try_collect()
233            .await
234            .unwrap();
235    }
236
237    #[tokio::test]
238    #[should_panic]
239    async fn left_barrier_right_end_2() {
240        let left = try_stream! {
241            yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
242            yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
243        }
244        .boxed();
245        let right = try_stream! {
246            sleep(Duration::from_millis(1)).await;
247            yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
248        }
249        .boxed();
250        let _output: Vec<_> = barrier_align_for_test(left, right)
251            .try_collect()
252            .await
253            .unwrap();
254    }
255}