risingwave_stream/executor/
barrier_align.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Instant;
17
18use anyhow::Context;
19use enum_as_inner::EnumAsInner;
20use futures::StreamExt;
21use futures::future::{Either, select};
22use futures_async_stream::try_stream;
23use risingwave_common::bail;
24
25use super::error::StreamExecutorError;
26use super::{Barrier, BoxedMessageStream, Message, StreamChunk, StreamExecutorResult, Watermark};
27use crate::executor::monitor::StreamingMetrics;
28use crate::task::{ActorId, FragmentId};
29
30pub type AlignedMessageStreamItem = StreamExecutorResult<AlignedMessage>;
31pub trait AlignedMessageStream = futures::Stream<Item = AlignedMessageStreamItem> + Send;
32
33#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
34#[derive(Debug, EnumAsInner)]
35pub enum AlignedMessage {
36    Barrier(Barrier),
37    WatermarkLeft(Watermark),
38    WatermarkRight(Watermark),
39    Left(StreamChunk),
40    Right(StreamChunk),
41}
42
43#[try_stream(ok = AlignedMessage, error = StreamExecutorError)]
44pub async fn barrier_align(
45    mut left: BoxedMessageStream,
46    mut right: BoxedMessageStream,
47    actor_id: ActorId,
48    fragment_id: FragmentId,
49    metrics: Arc<StreamingMetrics>,
50    executor_name: &str,
51) {
52    let actor_id = actor_id.to_string();
53    let fragment_id = fragment_id.to_string();
54    let left_barrier_align_duration = metrics.barrier_align_duration.with_guarded_label_values(&[
55        actor_id.as_str(),
56        fragment_id.as_str(),
57        "left",
58        executor_name,
59    ]);
60    let right_barrier_align_duration = metrics.barrier_align_duration.with_guarded_label_values(&[
61        actor_id.as_str(),
62        fragment_id.as_str(),
63        "right",
64        executor_name,
65    ]);
66    loop {
67        let prefer_left: bool = rand::random();
68        let select_result = if prefer_left {
69            select(left.next(), right.next()).await
70        } else {
71            match select(right.next(), left.next()).await {
72                Either::Left(x) => Either::Right(x),
73                Either::Right(x) => Either::Left(x),
74            }
75        };
76        match select_result {
77            Either::Left((None, _)) => {
78                // left stream end, passthrough right chunks
79                while let Some(msg) = right.next().await {
80                    match msg? {
81                        Message::Watermark(watermark) => {
82                            yield AlignedMessage::WatermarkRight(watermark)
83                        }
84                        Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
85                        Message::Barrier(_) => {
86                            bail!("right barrier received while left stream end");
87                        }
88                    }
89                }
90                break;
91            }
92            Either::Right((None, _)) => {
93                // right stream end, passthrough left chunks
94                while let Some(msg) = left.next().await {
95                    match msg? {
96                        Message::Watermark(watermark) => {
97                            yield AlignedMessage::WatermarkLeft(watermark)
98                        }
99                        Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
100                        Message::Barrier(_) => {
101                            bail!("left barrier received while right stream end");
102                        }
103                    }
104                }
105                break;
106            }
107            Either::Left((Some(msg), _)) => match msg? {
108                Message::Watermark(watermark) => yield AlignedMessage::WatermarkLeft(watermark),
109                Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
110                Message::Barrier(_) => loop {
111                    let start_time = Instant::now();
112                    // received left barrier, waiting for right barrier
113                    match right
114                        .next()
115                        .await
116                        .context("failed to poll right message, stream closed unexpectedly")??
117                    {
118                        Message::Watermark(watermark) => {
119                            yield AlignedMessage::WatermarkRight(watermark)
120                        }
121                        Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
122                        Message::Barrier(barrier) => {
123                            yield AlignedMessage::Barrier(barrier);
124                            right_barrier_align_duration
125                                .inc_by(start_time.elapsed().as_nanos() as u64);
126                            break;
127                        }
128                    }
129                },
130            },
131            Either::Right((Some(msg), _)) => match msg? {
132                Message::Watermark(watermark) => yield AlignedMessage::WatermarkRight(watermark),
133                Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
134                Message::Barrier(_) => loop {
135                    let start_time = Instant::now();
136                    // received right barrier, waiting for left barrier
137                    match left
138                        .next()
139                        .await
140                        .context("failed to poll left message, stream closed unexpectedly")??
141                    {
142                        Message::Watermark(watermark) => {
143                            yield AlignedMessage::WatermarkLeft(watermark)
144                        }
145                        Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
146                        Message::Barrier(barrier) => {
147                            yield AlignedMessage::Barrier(barrier);
148                            left_barrier_align_duration
149                                .inc_by(start_time.elapsed().as_nanos() as u64);
150                            break;
151                        }
152                    }
153                },
154            },
155        }
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use std::time::Duration;
162
163    use async_stream::try_stream;
164    use futures::{Stream, TryStreamExt};
165    use risingwave_common::array::stream_chunk::StreamChunkTestExt;
166    use risingwave_common::util::epoch::test_epoch;
167    use tokio::time::sleep;
168
169    use super::*;
170
171    fn barrier_align_for_test(
172        left: BoxedMessageStream,
173        right: BoxedMessageStream,
174    ) -> impl Stream<Item = Result<AlignedMessage, StreamExecutorError>> {
175        barrier_align(
176            left,
177            right,
178            0.into(),
179            0.into(),
180            Arc::new(StreamingMetrics::unused()),
181            "dummy_executor",
182        )
183    }
184
185    #[tokio::test]
186    async fn test_barrier_align() {
187        let left = try_stream! {
188            yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
189            yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
190            yield Message::Chunk(StreamChunk::from_pretty("I\n + 2"));
191            yield Message::Barrier(Barrier::new_test_barrier(test_epoch(2)));
192        }
193        .boxed();
194        let right = try_stream! {
195            sleep(Duration::from_millis(1)).await;
196            yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
197            yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
198            yield Message::Barrier(Barrier::new_test_barrier(test_epoch(2)));
199            yield Message::Chunk(StreamChunk::from_pretty("I\n + 3"));
200        }
201        .boxed();
202        let output: Vec<_> = barrier_align_for_test(left, right)
203            .try_collect()
204            .await
205            .unwrap();
206        assert_eq!(
207            output,
208            vec![
209                AlignedMessage::Left(StreamChunk::from_pretty("I\n + 1")),
210                AlignedMessage::Right(StreamChunk::from_pretty("I\n + 1")),
211                AlignedMessage::Barrier(Barrier::new_test_barrier(test_epoch(1))),
212                AlignedMessage::Left(StreamChunk::from_pretty("I\n + 2")),
213                AlignedMessage::Barrier(Barrier::new_test_barrier(2 * test_epoch(1))),
214                AlignedMessage::Right(StreamChunk::from_pretty("I\n + 3")),
215            ]
216        );
217    }
218
219    #[tokio::test]
220    #[should_panic]
221    async fn left_barrier_right_end_1() {
222        let left = try_stream! {
223            sleep(Duration::from_millis(1)).await;
224            yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
225            yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
226        }
227        .boxed();
228        let right = try_stream! {
229            yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
230        }
231        .boxed();
232        let _output: Vec<_> = barrier_align_for_test(left, right)
233            .try_collect()
234            .await
235            .unwrap();
236    }
237
238    #[tokio::test]
239    #[should_panic]
240    async fn left_barrier_right_end_2() {
241        let left = try_stream! {
242            yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
243            yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
244        }
245        .boxed();
246        let right = try_stream! {
247            sleep(Duration::from_millis(1)).await;
248            yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
249        }
250        .boxed();
251        let _output: Vec<_> = barrier_align_for_test(left, right)
252            .try_collect()
253            .await
254            .unwrap();
255    }
256}