risingwave_stream/executor/
receiver.rs1use anyhow::Context;
16use itertools::Itertools;
17use tokio::sync::mpsc;
18use tokio::time::Instant;
19
20use super::exchange::input::BoxedActorInput;
21use crate::executor::DispatcherMessage;
22use crate::executor::exchange::input::{
23 assert_equal_dispatcher_barrier, new_input, process_dispatcher_msg,
24};
25use crate::executor::prelude::*;
26use crate::task::{FragmentId, LocalBarrierManager};
27
28pub struct ReceiverExecutor {
32 input: BoxedActorInput,
34
35 actor_context: ActorContextRef,
37
38 fragment_id: FragmentId,
40
41 upstream_fragment_id: FragmentId,
43
44 local_barrier_manager: LocalBarrierManager,
45
46 metrics: Arc<StreamingMetrics>,
48
49 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
50}
51
52impl std::fmt::Debug for ReceiverExecutor {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct("ReceiverExecutor").finish()
55 }
56}
57
58impl ReceiverExecutor {
59 #[allow(clippy::too_many_arguments)]
60 pub fn new(
61 ctx: ActorContextRef,
62 fragment_id: FragmentId,
63 upstream_fragment_id: FragmentId,
64 input: BoxedActorInput,
65 local_barrier_manager: LocalBarrierManager,
66 metrics: Arc<StreamingMetrics>,
67 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
68 ) -> Self {
69 Self {
70 input,
71 actor_context: ctx,
72 upstream_fragment_id,
73 local_barrier_manager,
74 metrics,
75 fragment_id,
76 barrier_rx,
77 }
78 }
79
80 #[cfg(test)]
81 pub fn for_test(
82 actor_id: ActorId,
83 input: super::exchange::permit::Receiver,
84 local_barrier_manager: crate::task::LocalBarrierManager,
85 ) -> Self {
86 use super::exchange::input::LocalInput;
87 use crate::executor::exchange::input::ActorInput;
88
89 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
90
91 Self::new(
92 ActorContext::for_test(actor_id),
93 514,
94 1919,
95 LocalInput::new(input, 0).boxed_input(),
96 local_barrier_manager,
97 StreamingMetrics::unused().into(),
98 barrier_rx,
99 )
100 }
101}
102
103impl Execute for ReceiverExecutor {
104 fn execute(mut self: Box<Self>) -> BoxedMessageStream {
105 let actor_id = self.actor_context.id;
106
107 let mut metrics = self.metrics.new_actor_input_metrics(
108 actor_id,
109 self.fragment_id,
110 self.upstream_fragment_id,
111 );
112
113 let stream = #[try_stream]
114 async move {
115 let mut start_time = Instant::now();
116 while let Some(msg) = self.input.next().await {
117 metrics
118 .actor_input_buffer_blocking_duration_ns
119 .inc_by(start_time.elapsed().as_nanos() as u64);
120 let msg: DispatcherMessage = msg?;
121 let mut msg = process_dispatcher_msg(msg, &mut self.barrier_rx).await?;
122
123 match &mut msg {
124 Message::Watermark(_) => {
125 }
127 Message::Chunk(chunk) => {
128 metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
129 }
130 Message::Barrier(barrier) => {
131 tracing::debug!(
132 target: "events::stream::barrier::path",
133 actor_id = actor_id,
134 "receiver receives barrier from path: {:?}",
135 barrier.passed_actors
136 );
137 barrier.passed_actors.push(actor_id);
138
139 if let Some(update) = barrier
140 .as_update_merge(self.actor_context.id, self.upstream_fragment_id)
141 {
142 let new_upstream_fragment_id = update
143 .new_upstream_fragment_id
144 .unwrap_or(self.upstream_fragment_id);
145 let removed_upstream_actor_id: Vec<_> =
146 if update.new_upstream_fragment_id.is_some() {
147 vec![self.input.id()]
148 } else {
149 update.removed_upstream_actor_id.clone()
150 };
151
152 assert_eq!(
153 removed_upstream_actor_id,
154 vec![self.input.id()],
155 "the removed upstream actor should be the same as the current input"
156 );
157 let upstream_actor = update
158 .added_upstream_actors
159 .iter()
160 .exactly_one()
161 .expect("receiver should have exactly one upstream");
162
163 let mut new_upstream = new_input(
165 &self.local_barrier_manager,
166 self.metrics.clone(),
167 self.actor_context.id,
168 self.fragment_id,
169 upstream_actor,
170 new_upstream_fragment_id,
171 )
172 .await
173 .context("failed to create upstream input")?;
174
175 let new_barrier = expect_first_barrier(&mut new_upstream).await?;
178 assert_equal_dispatcher_barrier(barrier, &new_barrier);
179
180 self.input = new_upstream;
182
183 self.upstream_fragment_id = new_upstream_fragment_id;
184 metrics = self.metrics.new_actor_input_metrics(
185 actor_id,
186 self.fragment_id,
187 self.upstream_fragment_id,
188 );
189 }
190 }
191 };
192
193 yield msg;
194 start_time = Instant::now();
195 }
196 };
197
198 stream.boxed()
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use std::collections::HashMap;
205
206 use futures::{FutureExt, pin_mut};
207 use risingwave_common::util::epoch::test_epoch;
208 use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
209
210 use super::*;
211 use crate::executor::{MessageInner as Message, UpdateMutation};
212 use crate::task::NewOutputRequest;
213 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
214 use crate::task::test_utils::helper_make_local_actor;
215
216 #[tokio::test]
217 async fn test_configuration_change() {
218 let actor_id = 233;
219 let (old, new) = (114, 514); let barrier_test_env = LocalBarrierTestEnv::for_test().await;
222
223 let metrics = Arc::new(StreamingMetrics::unused());
224
225 let (upstream_fragment_id, fragment_id) = (10, 18);
231
232 let merge_updates = maplit::hashmap! {
234 (actor_id, upstream_fragment_id) => MergeUpdate {
235 actor_id,
236 upstream_fragment_id,
237 new_upstream_fragment_id: None,
238 added_upstream_actors: vec![helper_make_local_actor(new)],
239 removed_upstream_actor_id: vec![old],
240 }
241 };
242
243 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
244 UpdateMutation {
245 dispatchers: Default::default(),
246 merges: merge_updates,
247 vnode_bitmaps: Default::default(),
248 dropped_actors: Default::default(),
249 actor_splits: Default::default(),
250 actor_new_dispatchers: Default::default(),
251 actor_cdc_table_snapshot_splits: Default::default(),
252 },
253 ));
254
255 barrier_test_env.inject_barrier(&b1, [actor_id]);
256 barrier_test_env.flush_all_events().await;
257
258 let input = new_input(
259 &barrier_test_env.local_barrier_manager,
260 metrics.clone(),
261 actor_id,
262 fragment_id,
263 &helper_make_local_actor(old),
264 upstream_fragment_id,
265 )
266 .await
267 .unwrap();
268
269 let receiver = ReceiverExecutor::new(
270 ActorContext::for_test(actor_id),
271 fragment_id,
272 upstream_fragment_id,
273 input,
274 barrier_test_env.local_barrier_manager.clone(),
275 metrics.clone(),
276 barrier_test_env
277 .local_barrier_manager
278 .subscribe_barrier(actor_id),
279 )
280 .boxed()
281 .execute();
282
283 pin_mut!(receiver);
284
285 let mut txs = HashMap::new();
286 macro_rules! send {
287 ($actors:expr, $msg:expr) => {
288 for actor in $actors {
289 txs.get(&actor).unwrap().send($msg).await.unwrap();
290 }
291 };
292 }
293 macro_rules! send_error {
294 ($actors:expr, $msg:expr) => {
295 for actor in $actors {
296 txs.get(&actor).unwrap().send($msg).await.unwrap_err();
297 }
298 };
299 }
300 macro_rules! assert_recv_pending {
301 () => {
302 assert!(
303 receiver
304 .next()
305 .now_or_never()
306 .flatten()
307 .transpose()
308 .unwrap()
309 .is_none()
310 );
311 };
312 }
313
314 macro_rules! recv {
315 () => {
316 receiver.next().await.transpose().unwrap()
317 };
318 }
319
320 macro_rules! collect_upstream_tx {
321 ($actors:expr) => {
322 for upstream_id in $actors {
323 let mut output_requests = barrier_test_env
324 .take_pending_new_output_requests(upstream_id)
325 .await;
326 assert_eq!(output_requests.len(), 1);
327 let (downstream_actor_id, request) = output_requests.pop().unwrap();
328 assert_eq!(actor_id, downstream_actor_id);
329 let NewOutputRequest::Local(tx) = request else {
330 unreachable!()
331 };
332 txs.insert(upstream_id, tx);
333 }
334 };
335 }
336
337 assert_recv_pending!();
338 barrier_test_env.flush_all_events().await;
339
340 collect_upstream_tx!([old]);
342
343 send!([old], Message::Chunk(StreamChunk::default()).into());
345 recv!().unwrap().as_chunk().unwrap(); assert_recv_pending!();
347
348 send!([old], Message::Barrier(b1.clone().into_dispatcher()).into());
349 assert_recv_pending!(); collect_upstream_tx!([new]);
352
353 send!([new], Message::Barrier(b1.clone().into_dispatcher()).into());
354 recv!().unwrap().as_barrier().unwrap(); send_error!([old], Message::Chunk(StreamChunk::default()).into());
358 assert_recv_pending!();
359
360 send!([new], Message::Chunk(StreamChunk::default()).into());
362 recv!().unwrap().as_chunk().unwrap(); assert_recv_pending!();
364 }
365}