risingwave_stream/executor/
receiver.rs1use anyhow::Context;
16use itertools::Itertools;
17use tokio::sync::mpsc;
18use tokio::time::Instant;
19
20use super::exchange::input::BoxedInput;
21use crate::executor::DispatcherMessage;
22use crate::executor::exchange::input::{
23 assert_equal_dispatcher_barrier, new_input, process_dispatcher_msg,
24};
25use crate::executor::prelude::*;
26use crate::task::{FragmentId, SharedContext};
27
28pub struct ReceiverExecutor {
32 input: BoxedInput,
34
35 actor_context: ActorContextRef,
37
38 fragment_id: FragmentId,
40
41 upstream_fragment_id: FragmentId,
43
44 context: Arc<SharedContext>,
46
47 metrics: Arc<StreamingMetrics>,
49
50 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
51}
52
53impl std::fmt::Debug for ReceiverExecutor {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("ReceiverExecutor").finish()
56 }
57}
58
59impl ReceiverExecutor {
60 #[allow(clippy::too_many_arguments)]
61 pub fn new(
62 ctx: ActorContextRef,
63 fragment_id: FragmentId,
64 upstream_fragment_id: FragmentId,
65 input: BoxedInput,
66 context: Arc<SharedContext>,
67 metrics: Arc<StreamingMetrics>,
68 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
69 ) -> Self {
70 Self {
71 input,
72 actor_context: ctx,
73 upstream_fragment_id,
74 metrics,
75 fragment_id,
76 context,
77 barrier_rx,
78 }
79 }
80
81 #[cfg(test)]
82 pub fn for_test(
83 actor_id: ActorId,
84 input: super::exchange::permit::Receiver,
85 shared_context: Arc<SharedContext>,
86 local_barrier_manager: crate::task::LocalBarrierManager,
87 ) -> Self {
88 use super::exchange::input::LocalInput;
89 use crate::executor::exchange::input::Input;
90
91 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
92
93 Self::new(
94 ActorContext::for_test(actor_id),
95 514,
96 1919,
97 LocalInput::new(input, 0).boxed_input(),
98 shared_context,
99 StreamingMetrics::unused().into(),
100 barrier_rx,
101 )
102 }
103}
104
105impl Execute for ReceiverExecutor {
106 fn execute(mut self: Box<Self>) -> BoxedMessageStream {
107 let actor_id = self.actor_context.id;
108
109 let mut metrics = self.metrics.new_actor_input_metrics(
110 actor_id,
111 self.fragment_id,
112 self.upstream_fragment_id,
113 );
114
115 let stream = #[try_stream]
116 async move {
117 let mut start_time = Instant::now();
118 while let Some(msg) = self.input.next().await {
119 metrics
120 .actor_input_buffer_blocking_duration_ns
121 .inc_by(start_time.elapsed().as_nanos() as u64);
122 let msg: DispatcherMessage = msg?;
123 let mut msg = process_dispatcher_msg(msg, &mut self.barrier_rx).await?;
124
125 match &mut msg {
126 Message::Watermark(_) => {
127 }
129 Message::Chunk(chunk) => {
130 metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
131 }
132 Message::Barrier(barrier) => {
133 tracing::debug!(
134 target: "events::stream::barrier::path",
135 actor_id = actor_id,
136 "receiver receives barrier from path: {:?}",
137 barrier.passed_actors
138 );
139 barrier.passed_actors.push(actor_id);
140
141 if let Some(update) = barrier
142 .as_update_merge(self.actor_context.id, self.upstream_fragment_id)
143 {
144 let new_upstream_fragment_id = update
145 .new_upstream_fragment_id
146 .unwrap_or(self.upstream_fragment_id);
147 let added_upstream_actor_id = update.added_upstream_actor_id.clone();
148 let removed_upstream_actor_id: Vec<_> =
149 if update.new_upstream_fragment_id.is_some() {
150 vec![self.input.actor_id()]
151 } else {
152 update.removed_upstream_actor_id.clone()
153 };
154
155 assert_eq!(
156 removed_upstream_actor_id,
157 vec![self.input.actor_id()],
158 "the removed upstream actor should be the same as the current input"
159 );
160 let upstream_actor_id = *added_upstream_actor_id
161 .iter()
162 .exactly_one()
163 .expect("receiver should have exactly one upstream");
164
165 let mut new_upstream = new_input(
167 &self.context,
168 self.metrics.clone(),
169 self.actor_context.id,
170 self.fragment_id,
171 upstream_actor_id,
172 new_upstream_fragment_id,
173 )
174 .context("failed to create upstream input")?;
175
176 let new_barrier = expect_first_barrier(&mut new_upstream).await?;
179 assert_equal_dispatcher_barrier(barrier, &new_barrier);
180
181 self.input = new_upstream;
183
184 self.upstream_fragment_id = new_upstream_fragment_id;
185 metrics = self.metrics.new_actor_input_metrics(
186 actor_id,
187 self.fragment_id,
188 self.upstream_fragment_id,
189 );
190 }
191 }
192 };
193
194 yield msg;
195 start_time = Instant::now();
196 }
197 };
198
199 stream.boxed()
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use std::collections::HashMap;
206
207 use futures::{FutureExt, pin_mut};
208 use risingwave_common::util::epoch::test_epoch;
209 use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
210
211 use super::*;
212 use crate::executor::{MessageInner as Message, UpdateMutation};
213 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
214 use crate::task::test_utils::helper_make_local_actor;
215
216 #[tokio::test]
217 async fn test_configuration_change() {
218 let actor_id = 233;
219 let (old, new) = (114, 514); let barrier_test_env = LocalBarrierTestEnv::for_test().await;
222
223 let ctx = barrier_test_env.shared_context.clone();
224 let metrics = Arc::new(StreamingMetrics::unused());
225
226 ctx.add_actors(
229 [actor_id, old, new]
230 .into_iter()
231 .map(helper_make_local_actor),
232 );
233
234 let (upstream_fragment_id, fragment_id) = (10, 18);
238
239 let merge_updates = maplit::hashmap! {
241 (actor_id, upstream_fragment_id) => MergeUpdate {
242 actor_id,
243 upstream_fragment_id,
244 new_upstream_fragment_id: None,
245 added_upstream_actor_id: vec![new],
246 removed_upstream_actor_id: vec![old],
247 }
248 };
249
250 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
251 UpdateMutation {
252 dispatchers: Default::default(),
253 merges: merge_updates,
254 vnode_bitmaps: Default::default(),
255 dropped_actors: Default::default(),
256 actor_splits: Default::default(),
257 actor_new_dispatchers: Default::default(),
258 },
259 ));
260
261 barrier_test_env.inject_barrier(&b1, [actor_id]);
262 barrier_test_env.flush_all_events().await;
263
264 let input = new_input(
265 &ctx,
266 metrics.clone(),
267 actor_id,
268 fragment_id,
269 old,
270 upstream_fragment_id,
271 )
272 .unwrap();
273
274 let receiver = ReceiverExecutor::new(
275 ActorContext::for_test(actor_id),
276 fragment_id,
277 upstream_fragment_id,
278 input,
279 ctx.clone(),
280 metrics.clone(),
281 barrier_test_env
282 .local_barrier_manager
283 .subscribe_barrier(actor_id),
284 )
285 .boxed()
286 .execute();
287
288 pin_mut!(receiver);
289
290 let txs = [old, new]
292 .into_iter()
293 .map(|id| (id, ctx.take_sender(&(id, actor_id)).unwrap()))
294 .collect::<HashMap<_, _>>();
295 macro_rules! send {
296 ($actors:expr, $msg:expr) => {
297 for actor in $actors {
298 txs.get(&actor).unwrap().send($msg).await.unwrap();
299 }
300 };
301 }
302 macro_rules! send_error {
303 ($actors:expr, $msg:expr) => {
304 for actor in $actors {
305 txs.get(&actor).unwrap().send($msg).await.unwrap_err();
306 }
307 };
308 }
309 macro_rules! assert_recv_pending {
310 () => {
311 assert!(
312 receiver
313 .next()
314 .now_or_never()
315 .flatten()
316 .transpose()
317 .unwrap()
318 .is_none()
319 );
320 };
321 }
322
323 macro_rules! recv {
324 () => {
325 receiver.next().await.transpose().unwrap()
326 };
327 }
328
329 send!([old], Message::Chunk(StreamChunk::default()).into());
331 recv!().unwrap().as_chunk().unwrap(); assert_recv_pending!();
333
334 send!([new], Message::Barrier(b1.clone().into_dispatcher()).into());
335 assert_recv_pending!(); send!([old], Message::Barrier(b1.clone().into_dispatcher()).into());
338 recv!().unwrap().as_barrier().unwrap(); send_error!([old], Message::Chunk(StreamChunk::default()).into());
342 assert_recv_pending!();
343
344 send!([new], Message::Chunk(StreamChunk::default()).into());
346 recv!().unwrap().as_chunk().unwrap(); assert_recv_pending!();
348 }
349}