risingwave_stream/executor/
receiver.rs1use anyhow::Context;
16use itertools::Itertools;
17use tokio::sync::mpsc;
18use tokio::time::Instant;
19
20use super::exchange::input::BoxedInput;
21use crate::executor::DispatcherMessage;
22use crate::executor::exchange::input::{
23 assert_equal_dispatcher_barrier, new_input, process_dispatcher_msg,
24};
25use crate::executor::prelude::*;
26use crate::task::{FragmentId, LocalBarrierManager};
27
28pub struct ReceiverExecutor {
32 input: BoxedInput,
34
35 actor_context: ActorContextRef,
37
38 fragment_id: FragmentId,
40
41 upstream_fragment_id: FragmentId,
43
44 local_barrier_manager: LocalBarrierManager,
45
46 metrics: Arc<StreamingMetrics>,
48
49 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
50}
51
52impl std::fmt::Debug for ReceiverExecutor {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct("ReceiverExecutor").finish()
55 }
56}
57
58impl ReceiverExecutor {
59 #[allow(clippy::too_many_arguments)]
60 pub fn new(
61 ctx: ActorContextRef,
62 fragment_id: FragmentId,
63 upstream_fragment_id: FragmentId,
64 input: BoxedInput,
65 local_barrier_manager: LocalBarrierManager,
66 metrics: Arc<StreamingMetrics>,
67 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
68 ) -> Self {
69 Self {
70 input,
71 actor_context: ctx,
72 upstream_fragment_id,
73 local_barrier_manager,
74 metrics,
75 fragment_id,
76 barrier_rx,
77 }
78 }
79
80 #[cfg(test)]
81 pub fn for_test(
82 actor_id: ActorId,
83 input: super::exchange::permit::Receiver,
84 local_barrier_manager: crate::task::LocalBarrierManager,
85 ) -> Self {
86 use super::exchange::input::LocalInput;
87 use crate::executor::exchange::input::Input;
88
89 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
90
91 Self::new(
92 ActorContext::for_test(actor_id),
93 514,
94 1919,
95 LocalInput::new(input, 0).boxed_input(),
96 local_barrier_manager,
97 StreamingMetrics::unused().into(),
98 barrier_rx,
99 )
100 }
101}
102
103impl Execute for ReceiverExecutor {
104 fn execute(mut self: Box<Self>) -> BoxedMessageStream {
105 let actor_id = self.actor_context.id;
106
107 let mut metrics = self.metrics.new_actor_input_metrics(
108 actor_id,
109 self.fragment_id,
110 self.upstream_fragment_id,
111 );
112
113 let stream = #[try_stream]
114 async move {
115 let mut start_time = Instant::now();
116 while let Some(msg) = self.input.next().await {
117 metrics
118 .actor_input_buffer_blocking_duration_ns
119 .inc_by(start_time.elapsed().as_nanos() as u64);
120 let msg: DispatcherMessage = msg?;
121 let mut msg = process_dispatcher_msg(msg, &mut self.barrier_rx).await?;
122
123 match &mut msg {
124 Message::Watermark(_) => {
125 }
127 Message::Chunk(chunk) => {
128 metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
129 }
130 Message::Barrier(barrier) => {
131 tracing::debug!(
132 target: "events::stream::barrier::path",
133 actor_id = actor_id,
134 "receiver receives barrier from path: {:?}",
135 barrier.passed_actors
136 );
137 barrier.passed_actors.push(actor_id);
138
139 if let Some(update) = barrier
140 .as_update_merge(self.actor_context.id, self.upstream_fragment_id)
141 {
142 let new_upstream_fragment_id = update
143 .new_upstream_fragment_id
144 .unwrap_or(self.upstream_fragment_id);
145 let removed_upstream_actor_id: Vec<_> =
146 if update.new_upstream_fragment_id.is_some() {
147 vec![self.input.actor_id()]
148 } else {
149 update.removed_upstream_actor_id.clone()
150 };
151
152 assert_eq!(
153 removed_upstream_actor_id,
154 vec![self.input.actor_id()],
155 "the removed upstream actor should be the same as the current input"
156 );
157 let upstream_actor = update
158 .added_upstream_actors
159 .iter()
160 .exactly_one()
161 .expect("receiver should have exactly one upstream");
162
163 let mut new_upstream = new_input(
165 &self.local_barrier_manager,
166 self.metrics.clone(),
167 self.actor_context.id,
168 self.fragment_id,
169 upstream_actor,
170 new_upstream_fragment_id,
171 )
172 .await
173 .context("failed to create upstream input")?;
174
175 let new_barrier = expect_first_barrier(&mut new_upstream).await?;
178 assert_equal_dispatcher_barrier(barrier, &new_barrier);
179
180 self.input = new_upstream;
182
183 self.upstream_fragment_id = new_upstream_fragment_id;
184 metrics = self.metrics.new_actor_input_metrics(
185 actor_id,
186 self.fragment_id,
187 self.upstream_fragment_id,
188 );
189 }
190 }
191 };
192
193 yield msg;
194 start_time = Instant::now();
195 }
196 };
197
198 stream.boxed()
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use std::collections::HashMap;
205
206 use futures::{FutureExt, pin_mut};
207 use risingwave_common::util::epoch::test_epoch;
208 use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
209
210 use super::*;
211 use crate::executor::{MessageInner as Message, UpdateMutation};
212 use crate::task::NewOutputRequest;
213 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
214 use crate::task::test_utils::helper_make_local_actor;
215
216 #[tokio::test]
217 async fn test_configuration_change() {
218 let actor_id = 233;
219 let (old, new) = (114, 514); let barrier_test_env = LocalBarrierTestEnv::for_test().await;
222
223 let metrics = Arc::new(StreamingMetrics::unused());
224
225 let (upstream_fragment_id, fragment_id) = (10, 18);
231
232 let merge_updates = maplit::hashmap! {
234 (actor_id, upstream_fragment_id) => MergeUpdate {
235 actor_id,
236 upstream_fragment_id,
237 new_upstream_fragment_id: None,
238 added_upstream_actors: vec![helper_make_local_actor(new)],
239 removed_upstream_actor_id: vec![old],
240 }
241 };
242
243 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
244 UpdateMutation {
245 dispatchers: Default::default(),
246 merges: merge_updates,
247 vnode_bitmaps: Default::default(),
248 dropped_actors: Default::default(),
249 actor_splits: Default::default(),
250 actor_new_dispatchers: Default::default(),
251 },
252 ));
253
254 barrier_test_env.inject_barrier(&b1, [actor_id]);
255 barrier_test_env.flush_all_events().await;
256
257 let input = new_input(
258 &barrier_test_env.local_barrier_manager,
259 metrics.clone(),
260 actor_id,
261 fragment_id,
262 &helper_make_local_actor(old),
263 upstream_fragment_id,
264 )
265 .await
266 .unwrap();
267
268 let receiver = ReceiverExecutor::new(
269 ActorContext::for_test(actor_id),
270 fragment_id,
271 upstream_fragment_id,
272 input,
273 barrier_test_env.local_barrier_manager.clone(),
274 metrics.clone(),
275 barrier_test_env
276 .local_barrier_manager
277 .subscribe_barrier(actor_id),
278 )
279 .boxed()
280 .execute();
281
282 pin_mut!(receiver);
283
284 let mut txs = HashMap::new();
285 macro_rules! send {
286 ($actors:expr, $msg:expr) => {
287 for actor in $actors {
288 txs.get(&actor).unwrap().send($msg).await.unwrap();
289 }
290 };
291 }
292 macro_rules! send_error {
293 ($actors:expr, $msg:expr) => {
294 for actor in $actors {
295 txs.get(&actor).unwrap().send($msg).await.unwrap_err();
296 }
297 };
298 }
299 macro_rules! assert_recv_pending {
300 () => {
301 assert!(
302 receiver
303 .next()
304 .now_or_never()
305 .flatten()
306 .transpose()
307 .unwrap()
308 .is_none()
309 );
310 };
311 }
312
313 macro_rules! recv {
314 () => {
315 receiver.next().await.transpose().unwrap()
316 };
317 }
318
319 macro_rules! collect_upstream_tx {
320 ($actors:expr) => {
321 for upstream_id in $actors {
322 let mut output_requests = barrier_test_env
323 .take_pending_new_output_requests(upstream_id)
324 .await;
325 assert_eq!(output_requests.len(), 1);
326 let (downstream_actor_id, request) = output_requests.pop().unwrap();
327 assert_eq!(actor_id, downstream_actor_id);
328 let NewOutputRequest::Local(tx) = request else {
329 unreachable!()
330 };
331 txs.insert(upstream_id, tx);
332 }
333 };
334 }
335
336 assert_recv_pending!();
337 barrier_test_env.flush_all_events().await;
338
339 collect_upstream_tx!([old]);
341
342 send!([old], Message::Chunk(StreamChunk::default()).into());
344 recv!().unwrap().as_chunk().unwrap(); assert_recv_pending!();
346
347 send!([old], Message::Barrier(b1.clone().into_dispatcher()).into());
348 assert_recv_pending!(); collect_upstream_tx!([new]);
351
352 send!([new], Message::Barrier(b1.clone().into_dispatcher()).into());
353 recv!().unwrap().as_barrier().unwrap(); send_error!([old], Message::Chunk(StreamChunk::default()).into());
357 assert_recv_pending!();
358
359 send!([new], Message::Chunk(StreamChunk::default()).into());
361 recv!().unwrap().as_chunk().unwrap(); assert_recv_pending!();
363 }
364}