risingwave_stream/executor/
receiver.rs1use itertools::Itertools;
16use tokio::sync::mpsc;
17use tokio::time::Instant;
18
19use super::exchange::input::BoxedActorInput;
20use crate::executor::prelude::*;
21use crate::executor::{DispatchBarrierBuffer, DispatcherMessage};
22use crate::task::{FragmentId, LocalBarrierManager};
23
24pub struct ReceiverExecutor {
28 input: BoxedActorInput,
30
31 actor_context: ActorContextRef,
33
34 fragment_id: FragmentId,
36
37 upstream_fragment_id: FragmentId,
39
40 local_barrier_manager: LocalBarrierManager,
41
42 metrics: Arc<StreamingMetrics>,
44
45 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
46}
47
48impl std::fmt::Debug for ReceiverExecutor {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct("ReceiverExecutor").finish()
51 }
52}
53
54impl ReceiverExecutor {
55 #[allow(clippy::too_many_arguments)]
56 pub fn new(
57 ctx: ActorContextRef,
58 fragment_id: FragmentId,
59 upstream_fragment_id: FragmentId,
60 input: BoxedActorInput,
61 local_barrier_manager: LocalBarrierManager,
62 metrics: Arc<StreamingMetrics>,
63 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
64 ) -> Self {
65 Self {
66 input,
67 actor_context: ctx,
68 upstream_fragment_id,
69 local_barrier_manager,
70 metrics,
71 fragment_id,
72 barrier_rx,
73 }
74 }
75
76 #[cfg(test)]
77 pub fn for_test(
78 actor_id: ActorId,
79 input: super::exchange::permit::Receiver,
80 local_barrier_manager: crate::task::LocalBarrierManager,
81 ) -> Self {
82 use super::exchange::input::LocalInput;
83 use crate::executor::exchange::input::ActorInput;
84
85 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
86
87 Self::new(
88 ActorContext::for_test(actor_id),
89 514,
90 1919,
91 LocalInput::new(input, 0).boxed_input(),
92 local_barrier_manager,
93 StreamingMetrics::unused().into(),
94 barrier_rx,
95 )
96 }
97}
98
99impl Execute for ReceiverExecutor {
100 fn execute(mut self: Box<Self>) -> BoxedMessageStream {
101 let actor_id = self.actor_context.id;
102
103 let mut metrics = self.metrics.new_actor_input_metrics(
104 actor_id,
105 self.fragment_id,
106 self.upstream_fragment_id,
107 );
108
109 let stream = #[try_stream]
110 async move {
111 let mut barrier_buffer = DispatchBarrierBuffer::new(
112 self.barrier_rx,
113 actor_id,
114 self.upstream_fragment_id,
115 self.local_barrier_manager,
116 self.metrics.clone(),
117 self.fragment_id,
118 );
119 let mut start_time = Instant::now();
120 loop {
121 let msg = barrier_buffer.await_next_message(&mut self.input).await?;
122 metrics
123 .actor_input_buffer_blocking_duration_ns
124 .inc_by(start_time.elapsed().as_nanos() as u64);
125
126 let msg = match msg {
127 DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark),
128 DispatcherMessage::Chunk(chunk) => {
129 metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
130 Message::Chunk(chunk)
131 }
132 DispatcherMessage::Barrier(barrier) => {
133 tracing::debug!(
134 target: "events::stream::barrier::path",
135 actor_id = actor_id,
136 "receiver receives barrier from path: {:?}",
137 barrier.passed_actors
138 );
139 let (mut barrier, new_inputs) =
140 barrier_buffer.pop_barrier_with_inputs(barrier).await?;
141 barrier.passed_actors.push(actor_id);
142
143 if let Some(update) = barrier
144 .as_update_merge(self.actor_context.id, self.upstream_fragment_id)
145 {
146 let new_upstream_fragment_id = update
147 .new_upstream_fragment_id
148 .unwrap_or(self.upstream_fragment_id);
149 let removed_upstream_actor_id: Vec<_> =
150 if update.new_upstream_fragment_id.is_some() {
151 vec![self.input.id()]
152 } else {
153 update.removed_upstream_actor_id.clone()
154 };
155
156 assert_eq!(
157 removed_upstream_actor_id,
158 vec![self.input.id()],
159 "the removed upstream actor should be the same as the current input"
160 );
161 let new_upstream = new_inputs
162 .expect("should always have new inputs when handling update merge")
163 .into_iter()
164 .exactly_one()
165 .expect("receiver should have exactly one new upstream");
166
167 self.input = new_upstream;
169
170 self.upstream_fragment_id = new_upstream_fragment_id;
171 metrics = self.metrics.new_actor_input_metrics(
172 actor_id,
173 self.fragment_id,
174 self.upstream_fragment_id,
175 );
176 }
177
178 let is_stop = barrier.is_stop(actor_id);
179 let msg = Message::Barrier(barrier);
180 if is_stop {
181 yield msg;
182 break;
183 }
184
185 msg
186 }
187 };
188
189 yield msg;
190 start_time = Instant::now();
191 }
192 };
193
194 stream.boxed()
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use std::collections::HashMap;
201
202 use futures::{FutureExt, pin_mut};
203 use risingwave_common::util::epoch::test_epoch;
204 use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
205
206 use super::*;
207 use crate::executor::exchange::input::new_input;
208 use crate::executor::{MessageInner as Message, UpdateMutation};
209 use crate::task::NewOutputRequest;
210 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
211 use crate::task::test_utils::helper_make_local_actor;
212
213 #[tokio::test]
214 async fn test_configuration_change() {
215 let actor_id = 233;
216 let (old, new) = (114, 514); let barrier_test_env = LocalBarrierTestEnv::for_test().await;
219
220 let metrics = Arc::new(StreamingMetrics::unused());
221
222 let (upstream_fragment_id, fragment_id) = (10, 18);
228
229 let merge_updates = maplit::hashmap! {
231 (actor_id, upstream_fragment_id) => MergeUpdate {
232 actor_id,
233 upstream_fragment_id,
234 new_upstream_fragment_id: None,
235 added_upstream_actors: vec![helper_make_local_actor(new)],
236 removed_upstream_actor_id: vec![old],
237 }
238 };
239
240 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
241 UpdateMutation {
242 dispatchers: Default::default(),
243 merges: merge_updates,
244 vnode_bitmaps: Default::default(),
245 dropped_actors: Default::default(),
246 actor_splits: Default::default(),
247 actor_new_dispatchers: Default::default(),
248 actor_cdc_table_snapshot_splits: Default::default(),
249 sink_add_columns: Default::default(),
250 },
251 ));
252
253 barrier_test_env.inject_barrier(&b1, [actor_id]);
254 barrier_test_env.flush_all_events().await;
255
256 let input = new_input(
257 &barrier_test_env.local_barrier_manager,
258 metrics.clone(),
259 actor_id,
260 fragment_id,
261 &helper_make_local_actor(old),
262 upstream_fragment_id,
263 )
264 .await
265 .unwrap();
266
267 let receiver = ReceiverExecutor::new(
268 ActorContext::for_test(actor_id),
269 fragment_id,
270 upstream_fragment_id,
271 input,
272 barrier_test_env.local_barrier_manager.clone(),
273 metrics.clone(),
274 barrier_test_env
275 .local_barrier_manager
276 .subscribe_barrier(actor_id),
277 )
278 .boxed()
279 .execute();
280
281 pin_mut!(receiver);
282
283 let mut txs = HashMap::new();
284 macro_rules! send {
285 ($actors:expr, $msg:expr) => {
286 for actor in $actors {
287 txs.get(&actor).unwrap().send($msg).await.unwrap();
288 }
289 };
290 }
291 macro_rules! send_error {
292 ($actors:expr, $msg:expr) => {
293 for actor in $actors {
294 txs.get(&actor).unwrap().send($msg).await.unwrap_err();
295 }
296 };
297 }
298 macro_rules! assert_recv_pending {
299 () => {
300 assert!(
301 receiver
302 .next()
303 .now_or_never()
304 .flatten()
305 .transpose()
306 .unwrap()
307 .is_none()
308 );
309 };
310 }
311
312 macro_rules! recv {
313 () => {
314 receiver.next().await.transpose().unwrap()
315 };
316 }
317
318 macro_rules! collect_upstream_tx {
319 ($actors:expr) => {
320 for upstream_id in $actors {
321 let mut output_requests = barrier_test_env
322 .take_pending_new_output_requests(upstream_id)
323 .await;
324 assert_eq!(output_requests.len(), 1);
325 let (downstream_actor_id, request) = output_requests.pop().unwrap();
326 assert_eq!(actor_id, downstream_actor_id);
327 let NewOutputRequest::Local(tx) = request else {
328 unreachable!()
329 };
330 txs.insert(upstream_id, tx);
331 }
332 };
333 }
334
335 assert_recv_pending!();
336 barrier_test_env.flush_all_events().await;
337
338 collect_upstream_tx!([old]);
340
341 send!([old], Message::Chunk(StreamChunk::default()).into());
343 recv!().unwrap().as_chunk().unwrap(); assert_recv_pending!();
345
346 send!([old], Message::Barrier(b1.clone().into_dispatcher()).into());
347 assert_recv_pending!(); collect_upstream_tx!([new]);
350
351 send!([new], Message::Barrier(b1.clone().into_dispatcher()).into());
352 recv!().unwrap().as_barrier().unwrap(); send_error!([old], Message::Chunk(StreamChunk::default()).into());
356 assert_recv_pending!();
357
358 send!([new], Message::Chunk(StreamChunk::default()).into());
360 recv!().unwrap().as_chunk().unwrap(); assert_recv_pending!();
362 }
363}