risingwave_stream/executor/
barrier_align.rs1use std::sync::Arc;
16use std::time::Instant;
17
18use anyhow::Context;
19use enum_as_inner::EnumAsInner;
20use futures::StreamExt;
21use futures::future::{Either, select};
22use futures_async_stream::try_stream;
23use risingwave_common::bail;
24
25use super::error::StreamExecutorError;
26use super::{Barrier, BoxedMessageStream, Message, StreamChunk, StreamExecutorResult, Watermark};
27use crate::executor::monitor::StreamingMetrics;
28use crate::task::{ActorId, FragmentId};
29
30pub type AlignedMessageStreamItem = StreamExecutorResult<AlignedMessage>;
31pub trait AlignedMessageStream = futures::Stream<Item = AlignedMessageStreamItem> + Send;
32
33#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
34#[derive(Debug, EnumAsInner)]
35pub enum AlignedMessage {
36 Barrier(Barrier),
37 WatermarkLeft(Watermark),
38 WatermarkRight(Watermark),
39 Left(StreamChunk),
40 Right(StreamChunk),
41}
42
43#[try_stream(ok = AlignedMessage, error = StreamExecutorError)]
44pub async fn barrier_align(
45 mut left: BoxedMessageStream,
46 mut right: BoxedMessageStream,
47 actor_id: ActorId,
48 fragment_id: FragmentId,
49 metrics: Arc<StreamingMetrics>,
50 executor_name: &str,
51) {
52 let actor_id = actor_id.to_string();
53 let fragment_id = fragment_id.to_string();
54 let left_barrier_align_duration = metrics.barrier_align_duration.with_guarded_label_values(&[
55 actor_id.as_str(),
56 fragment_id.as_str(),
57 "left",
58 executor_name,
59 ]);
60 let right_barrier_align_duration = metrics.barrier_align_duration.with_guarded_label_values(&[
61 actor_id.as_str(),
62 fragment_id.as_str(),
63 "right",
64 executor_name,
65 ]);
66 loop {
67 let prefer_left: bool = rand::random();
68 let select_result = if prefer_left {
69 select(left.next(), right.next()).await
70 } else {
71 match select(right.next(), left.next()).await {
72 Either::Left(x) => Either::Right(x),
73 Either::Right(x) => Either::Left(x),
74 }
75 };
76 match select_result {
77 Either::Left((None, _)) => {
78 while let Some(msg) = right.next().await {
80 match msg? {
81 Message::Watermark(watermark) => {
82 yield AlignedMessage::WatermarkRight(watermark)
83 }
84 Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
85 Message::Barrier(_) => {
86 bail!("right barrier received while left stream end");
87 }
88 }
89 }
90 break;
91 }
92 Either::Right((None, _)) => {
93 while let Some(msg) = left.next().await {
95 match msg? {
96 Message::Watermark(watermark) => {
97 yield AlignedMessage::WatermarkLeft(watermark)
98 }
99 Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
100 Message::Barrier(_) => {
101 bail!("left barrier received while right stream end");
102 }
103 }
104 }
105 break;
106 }
107 Either::Left((Some(msg), _)) => match msg? {
108 Message::Watermark(watermark) => yield AlignedMessage::WatermarkLeft(watermark),
109 Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
110 Message::Barrier(_) => loop {
111 let start_time = Instant::now();
112 match right
114 .next()
115 .await
116 .context("failed to poll right message, stream closed unexpectedly")??
117 {
118 Message::Watermark(watermark) => {
119 yield AlignedMessage::WatermarkRight(watermark)
120 }
121 Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
122 Message::Barrier(barrier) => {
123 yield AlignedMessage::Barrier(barrier);
124 right_barrier_align_duration
125 .inc_by(start_time.elapsed().as_nanos() as u64);
126 break;
127 }
128 }
129 },
130 },
131 Either::Right((Some(msg), _)) => match msg? {
132 Message::Watermark(watermark) => yield AlignedMessage::WatermarkRight(watermark),
133 Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
134 Message::Barrier(_) => loop {
135 let start_time = Instant::now();
136 match left
138 .next()
139 .await
140 .context("failed to poll left message, stream closed unexpectedly")??
141 {
142 Message::Watermark(watermark) => {
143 yield AlignedMessage::WatermarkLeft(watermark)
144 }
145 Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
146 Message::Barrier(barrier) => {
147 yield AlignedMessage::Barrier(barrier);
148 left_barrier_align_duration
149 .inc_by(start_time.elapsed().as_nanos() as u64);
150 break;
151 }
152 }
153 },
154 },
155 }
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use std::time::Duration;
162
163 use async_stream::try_stream;
164 use futures::{Stream, TryStreamExt};
165 use risingwave_common::array::stream_chunk::StreamChunkTestExt;
166 use risingwave_common::util::epoch::test_epoch;
167 use tokio::time::sleep;
168
169 use super::*;
170
171 fn barrier_align_for_test(
172 left: BoxedMessageStream,
173 right: BoxedMessageStream,
174 ) -> impl Stream<Item = Result<AlignedMessage, StreamExecutorError>> {
175 barrier_align(
176 left,
177 right,
178 0.into(),
179 0.into(),
180 Arc::new(StreamingMetrics::unused()),
181 "dummy_executor",
182 )
183 }
184
185 #[tokio::test]
186 async fn test_barrier_align() {
187 let left = try_stream! {
188 yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
189 yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
190 yield Message::Chunk(StreamChunk::from_pretty("I\n + 2"));
191 yield Message::Barrier(Barrier::new_test_barrier(test_epoch(2)));
192 }
193 .boxed();
194 let right = try_stream! {
195 sleep(Duration::from_millis(1)).await;
196 yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
197 yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
198 yield Message::Barrier(Barrier::new_test_barrier(test_epoch(2)));
199 yield Message::Chunk(StreamChunk::from_pretty("I\n + 3"));
200 }
201 .boxed();
202 let output: Vec<_> = barrier_align_for_test(left, right)
203 .try_collect()
204 .await
205 .unwrap();
206 assert_eq!(
207 output,
208 vec![
209 AlignedMessage::Left(StreamChunk::from_pretty("I\n + 1")),
210 AlignedMessage::Right(StreamChunk::from_pretty("I\n + 1")),
211 AlignedMessage::Barrier(Barrier::new_test_barrier(test_epoch(1))),
212 AlignedMessage::Left(StreamChunk::from_pretty("I\n + 2")),
213 AlignedMessage::Barrier(Barrier::new_test_barrier(2 * test_epoch(1))),
214 AlignedMessage::Right(StreamChunk::from_pretty("I\n + 3")),
215 ]
216 );
217 }
218
219 #[tokio::test]
220 #[should_panic]
221 async fn left_barrier_right_end_1() {
222 let left = try_stream! {
223 sleep(Duration::from_millis(1)).await;
224 yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
225 yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
226 }
227 .boxed();
228 let right = try_stream! {
229 yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
230 }
231 .boxed();
232 let _output: Vec<_> = barrier_align_for_test(left, right)
233 .try_collect()
234 .await
235 .unwrap();
236 }
237
238 #[tokio::test]
239 #[should_panic]
240 async fn left_barrier_right_end_2() {
241 let left = try_stream! {
242 yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
243 yield Message::Barrier(Barrier::new_test_barrier(test_epoch(1)));
244 }
245 .boxed();
246 let right = try_stream! {
247 sleep(Duration::from_millis(1)).await;
248 yield Message::Chunk(StreamChunk::from_pretty("I\n + 1"));
249 }
250 .boxed();
251 let _output: Vec<_> = barrier_align_for_test(left, right)
252 .try_collect()
253 .await
254 .unwrap();
255 }
256}