risingwave_stream/executor/
barrier_align.rs1use std::sync::Arc;
16use std::time::Instant;
17
18use anyhow::Context;
19use enum_as_inner::EnumAsInner;
20use futures::StreamExt;
21use futures::future::{Either, select};
22use futures_async_stream::try_stream;
23use risingwave_common::bail;
24
25use super::error::StreamExecutorError;
26use super::{Barrier, BoxedMessageStream, Message, StreamChunk, StreamExecutorResult, Watermark};
27use crate::executor::monitor::StreamingMetrics;
28use crate::task::{ActorId, FragmentId};
29
30pub type AlignedMessageStreamItem = StreamExecutorResult<AlignedMessage>;
31pub trait AlignedMessageStream = futures::Stream<Item = AlignedMessageStreamItem> + Send;
32
33#[derive(Debug, EnumAsInner, PartialEq)]
34pub enum AlignedMessage {
35 Barrier(Barrier),
36 WatermarkLeft(Watermark),
37 WatermarkRight(Watermark),
38 Left(StreamChunk),
39 Right(StreamChunk),
40}
41
42#[try_stream(ok = AlignedMessage, error = StreamExecutorError)]
43pub async fn barrier_align(
44 mut left: BoxedMessageStream,
45 mut right: BoxedMessageStream,
46 actor_id: ActorId,
47 fragment_id: FragmentId,
48 metrics: Arc<StreamingMetrics>,
49 executor_name: &str,
50) {
51 let actor_id = actor_id.to_string();
52 let fragment_id = fragment_id.to_string();
53 let left_barrier_align_duration = metrics.barrier_align_duration.with_guarded_label_values(&[
54 &actor_id,
55 &fragment_id,
56 "left",
57 executor_name,
58 ]);
59 let right_barrier_align_duration = metrics.barrier_align_duration.with_guarded_label_values(&[
60 &actor_id,
61 &fragment_id,
62 "right",
63 executor_name,
64 ]);
65 loop {
66 let prefer_left: bool = rand::random();
67 let select_result = if prefer_left {
68 select(left.next(), right.next()).await
69 } else {
70 match select(right.next(), left.next()).await {
71 Either::Left(x) => Either::Right(x),
72 Either::Right(x) => Either::Left(x),
73 }
74 };
75 match select_result {
76 Either::Left((None, _)) => {
77 while let Some(msg) = right.next().await {
79 match msg? {
80 Message::Watermark(watermark) => {
81 yield AlignedMessage::WatermarkRight(watermark)
82 }
83 Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
84 Message::Barrier(_) => {
85 bail!("right barrier received while left stream end");
86 }
87 }
88 }
89 break;
90 }
91 Either::Right((None, _)) => {
92 while let Some(msg) = left.next().await {
94 match msg? {
95 Message::Watermark(watermark) => {
96 yield AlignedMessage::WatermarkLeft(watermark)
97 }
98 Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
99 Message::Barrier(_) => {
100 bail!("left barrier received while right stream end");
101 }
102 }
103 }
104 break;
105 }
106 Either::Left((Some(msg), _)) => match msg? {
107 Message::Watermark(watermark) => yield AlignedMessage::WatermarkLeft(watermark),
108 Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
109 Message::Barrier(_) => loop {
110 let start_time = Instant::now();
111 match right
113 .next()
114 .await
115 .context("failed to poll right message, stream closed unexpectedly")??
116 {
117 Message::Watermark(watermark) => {
118 yield AlignedMessage::WatermarkRight(watermark)
119 }
120 Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
121 Message::Barrier(barrier) => {
122 yield AlignedMessage::Barrier(barrier);
123 right_barrier_align_duration
124 .inc_by(start_time.elapsed().as_nanos() as u64);
125 break;
126 }
127 }
128 },
129 },
130 Either::Right((Some(msg), _)) => match msg? {
131 Message::Watermark(watermark) => yield AlignedMessage::WatermarkRight(watermark),
132 Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
133 Message::Barrier(_) => loop {
134 let start_time = Instant::now();
135 match left
137 .next()
138 .await
139 .context("failed to poll left message, stream closed unexpectedly")??
140 {
141 Message::Watermark(watermark) => {
142 yield AlignedMessage::WatermarkLeft(watermark)
143 }
144 Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
145 Message::Barrier(barrier) => {
146 yield AlignedMessage::Barrier(barrier);
147 left_barrier_align_duration
148 .inc_by(start_time.elapsed().as_nanos() as u64);
149 break;
150 }
151 }
152 },
153 },
154 }
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use std::time::Duration;
161
162 use async_stream::try_stream;
163 use futures::{Stream, TryStreamExt};
164 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
165 use risingwave_common::util::epoch::test_epoch;
166 use tokio::time::sleep;
167
168 use super::*;
169
170 fn barrier_align_for_test(
171 left: BoxedMessageStream,
172 right: BoxedMessageStream,
173 ) -> impl Stream<Item = Result<AlignedMessage, StreamExecutorError>> {
174 barrier_align(
175 left,
176 right,
177 0,
178 0,
179 Arc::new(StreamingMetrics::unused()),
180 "dummy_executor",
181 )
182 }
183
184 #[tokio::test]
185 async fn test_barrier_align() {
186 let left = try_stream! {
187 yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
188 yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
189 yield Message::Chunk(StreamChunk::from_pretty("I\n + 2"));
190 yield Message::Barrier(Barrier::new_test_barrier(test_epoch(2)));
191 }
192 .boxed();
193 let right = try_stream! {
194 sleep(Duration::from_millis(1)).await;
195 yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
196 yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
197 yield Message::Barrier(Barrier::new_test_barrier(test_epoch(2)));
198 yield Message::Chunk(StreamChunk::from_pretty("I\n + 3"));
199 }
200 .boxed();
201 let output: Vec<_> = barrier_align_for_test(left, right)
202 .try_collect()
203 .await
204 .unwrap();
205 assert_eq!(
206 output,
207 vec![
208 AlignedMessage::Left(StreamChunk::from_pretty("I\n + 1")),
209 AlignedMessage::Right(StreamChunk::from_pretty("I\n + 1")),
210 AlignedMessage::Barrier(Barrier::new_test_barrier(test_epoch(1))),
211 AlignedMessage::Left(StreamChunk::from_pretty("I\n + 2")),
212 AlignedMessage::Barrier(Barrier::new_test_barrier(2 * test_epoch(1))),
213 AlignedMessage::Right(StreamChunk::from_pretty("I\n + 3")),
214 ]
215 );
216 }
217
218 #[tokio::test]
219 #[should_panic]
220 async fn left_barrier_right_end_1() {
221 let left = try_stream! {
222 sleep(Duration::from_millis(1)).await;
223 yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
224 yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
225 }
226 .boxed();
227 let right = try_stream! {
228 yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
229 }
230 .boxed();
231 let _output: Vec<_> = barrier_align_for_test(left, right)
232 .try_collect()
233 .await
234 .unwrap();
235 }
236
237 #[tokio::test]
238 #[should_panic]
239 async fn left_barrier_right_end_2() {
240 let left = try_stream! {
241 yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
242 yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
243 }
244 .boxed();
245 let right = try_stream! {
246 sleep(Duration::from_millis(1)).await;
247 yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
248 }
249 .boxed();
250 let _output: Vec<_> = barrier_align_for_test(left, right)
251 .try_collect()
252 .await
253 .unwrap();
254 }
255}