risingwave_stream/executor/
receiver.rs1use itertools::Itertools;
16use tokio::sync::mpsc;
17
18use super::exchange::input::BoxedActorInput;
19use crate::executor::prelude::*;
20use crate::executor::{DispatchBarrierBuffer, DispatcherMessage};
21use crate::task::{FragmentId, LocalBarrierManager};
22
23pub struct ReceiverExecutor {
27 input: BoxedActorInput,
29
30 actor_context: ActorContextRef,
32
33 fragment_id: FragmentId,
35
36 upstream_fragment_id: FragmentId,
38
39 local_barrier_manager: LocalBarrierManager,
40
41 metrics: Arc<StreamingMetrics>,
43
44 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
45}
46
47impl std::fmt::Debug for ReceiverExecutor {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("ReceiverExecutor").finish()
50 }
51}
52
53impl ReceiverExecutor {
54 #[allow(clippy::too_many_arguments)]
55 pub fn new(
56 ctx: ActorContextRef,
57 fragment_id: FragmentId,
58 upstream_fragment_id: FragmentId,
59 input: BoxedActorInput,
60 local_barrier_manager: LocalBarrierManager,
61 metrics: Arc<StreamingMetrics>,
62 barrier_rx: mpsc::UnboundedReceiver<Barrier>,
63 ) -> Self {
64 Self {
65 input,
66 actor_context: ctx,
67 upstream_fragment_id,
68 local_barrier_manager,
69 metrics,
70 fragment_id,
71 barrier_rx,
72 }
73 }
74
75 #[cfg(test)]
76 pub fn for_test(
77 actor_id: impl Into<ActorId>,
78 input: super::exchange::permit::Receiver,
79 local_barrier_manager: crate::task::LocalBarrierManager,
80 ) -> Self {
81 let actor_id = actor_id.into();
82 use super::exchange::input::LocalInput;
83 use crate::executor::exchange::input::ActorInput;
84
85 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
86
87 Self::new(
88 ActorContext::for_test(actor_id),
89 514.into(),
90 1919.into(),
91 LocalInput::new(input, 0.into()).boxed_input(),
92 local_barrier_manager,
93 StreamingMetrics::unused().into(),
94 barrier_rx,
95 )
96 }
97}
98
99impl Execute for ReceiverExecutor {
100 fn execute(mut self: Box<Self>) -> BoxedMessageStream {
101 let actor_id = self.actor_context.id;
102
103 let mut metrics = self.metrics.new_actor_input_metrics(
104 actor_id,
105 self.fragment_id,
106 self.upstream_fragment_id,
107 );
108
109 let stream = #[try_stream]
110 async move {
111 let mut barrier_buffer = DispatchBarrierBuffer::new(
112 self.barrier_rx,
113 actor_id,
114 self.upstream_fragment_id,
115 self.local_barrier_manager,
116 self.metrics.clone(),
117 self.fragment_id,
118 );
119 loop {
120 let msg = barrier_buffer
121 .await_next_message(&mut self.input, &metrics)
122 .await?;
123 let msg = match msg {
124 DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark),
125 DispatcherMessage::Chunk(chunk) => {
126 metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
127 Message::Chunk(chunk)
128 }
129 DispatcherMessage::Barrier(barrier) => {
130 tracing::debug!(
131 target: "events::stream::barrier::path",
132 actor_id = %actor_id,
133 "receiver receives barrier from path: {:?}",
134 barrier.passed_actors
135 );
136 let (mut barrier, new_inputs) =
137 barrier_buffer.pop_barrier_with_inputs(barrier).await?;
138 barrier.passed_actors.push(actor_id);
139
140 if let Some(update) = barrier
141 .as_update_merge(self.actor_context.id, self.upstream_fragment_id)
142 {
143 let new_upstream_fragment_id = update
144 .new_upstream_fragment_id
145 .unwrap_or(self.upstream_fragment_id);
146 let removed_upstream_actor_id: Vec<_> =
147 if update.new_upstream_fragment_id.is_some() {
148 vec![self.input.id()]
149 } else {
150 update.removed_upstream_actor_id.clone()
151 };
152
153 assert_eq!(
154 removed_upstream_actor_id,
155 vec![self.input.id()],
156 "the removed upstream actor should be the same as the current input"
157 );
158 let new_upstream = new_inputs
159 .expect("should always have new inputs when handling update merge")
160 .into_iter()
161 .exactly_one()
162 .expect("receiver should have exactly one new upstream");
163
164 self.input = new_upstream;
166
167 self.upstream_fragment_id = new_upstream_fragment_id;
168 metrics = self.metrics.new_actor_input_metrics(
169 actor_id,
170 self.fragment_id,
171 self.upstream_fragment_id,
172 );
173 }
174
175 let is_stop = barrier.is_stop(actor_id);
176 let msg = Message::Barrier(barrier);
177 if is_stop {
178 yield msg;
179 break;
180 }
181
182 msg
183 }
184 };
185
186 yield msg;
187 }
188 };
189
190 stream.boxed()
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use std::collections::HashMap;
197
198 use futures::{FutureExt, pin_mut};
199 use risingwave_common::util::epoch::test_epoch;
200 use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
201
202 use super::*;
203 use crate::executor::exchange::input::new_input;
204 use crate::executor::{MessageInner as Message, UpdateMutation};
205 use crate::task::NewOutputRequest;
206 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
207 use crate::task::test_utils::helper_make_local_actor;
208
209 #[tokio::test]
210 async fn test_configuration_change() {
211 let actor_id = 233.into();
212 let (old, new) = (114.into(), 514.into()); let barrier_test_env = LocalBarrierTestEnv::for_test().await;
215
216 let metrics = Arc::new(StreamingMetrics::unused());
217
218 let (upstream_fragment_id, fragment_id) = (10.into(), 18.into());
224
225 let merge_updates = maplit::hashmap! {
227 (actor_id, upstream_fragment_id) => MergeUpdate {
228 actor_id,
229 upstream_fragment_id,
230 new_upstream_fragment_id: None,
231 added_upstream_actors: vec![helper_make_local_actor(new)],
232 removed_upstream_actor_id: vec![old],
233 }
234 };
235
236 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
237 UpdateMutation {
238 dispatchers: Default::default(),
239 merges: merge_updates,
240 vnode_bitmaps: Default::default(),
241 dropped_actors: Default::default(),
242 actor_splits: Default::default(),
243 actor_new_dispatchers: Default::default(),
244 actor_cdc_table_snapshot_splits: Default::default(),
245 sink_add_columns: Default::default(),
246 },
247 ));
248
249 barrier_test_env.inject_barrier(&b1, [actor_id]);
250 barrier_test_env.flush_all_events().await;
251
252 let input = new_input(
253 &barrier_test_env.local_barrier_manager,
254 metrics.clone(),
255 actor_id,
256 fragment_id,
257 &helper_make_local_actor(old),
258 upstream_fragment_id,
259 )
260 .await
261 .unwrap();
262
263 let receiver = ReceiverExecutor::new(
264 ActorContext::for_test(actor_id),
265 fragment_id,
266 upstream_fragment_id,
267 input,
268 barrier_test_env.local_barrier_manager.clone(),
269 metrics.clone(),
270 barrier_test_env
271 .local_barrier_manager
272 .subscribe_barrier(actor_id),
273 )
274 .boxed()
275 .execute();
276
277 pin_mut!(receiver);
278
279 let mut txs = HashMap::new();
280 macro_rules! send {
281 ($actors:expr, $msg:expr) => {
282 for actor in $actors {
283 txs.get(&actor).unwrap().send($msg).await.unwrap();
284 }
285 };
286 }
287 macro_rules! send_error {
288 ($actors:expr, $msg:expr) => {
289 for actor in $actors {
290 txs.get(&actor).unwrap().send($msg).await.unwrap_err();
291 }
292 };
293 }
294 macro_rules! assert_recv_pending {
295 () => {
296 assert!(
297 receiver
298 .next()
299 .now_or_never()
300 .flatten()
301 .transpose()
302 .unwrap()
303 .is_none()
304 );
305 };
306 }
307
308 macro_rules! recv {
309 () => {
310 receiver.next().await.transpose().unwrap()
311 };
312 }
313
314 macro_rules! collect_upstream_tx {
315 ($actors:expr) => {
316 for upstream_id in $actors {
317 let mut output_requests = barrier_test_env
318 .take_pending_new_output_requests(upstream_id.into())
319 .await;
320 assert_eq!(output_requests.len(), 1);
321 let (downstream_actor_id, request) = output_requests.pop().unwrap();
322 assert_eq!(downstream_actor_id, actor_id);
323 let NewOutputRequest::Local(tx) = request else {
324 unreachable!()
325 };
326 txs.insert(upstream_id, tx);
327 }
328 };
329 }
330
331 assert_recv_pending!();
332 barrier_test_env.flush_all_events().await;
333
334 collect_upstream_tx!([old]);
336
337 send!([old], Message::Chunk(StreamChunk::default()).into());
339 recv!().unwrap().as_chunk().unwrap(); assert_recv_pending!();
341
342 send!([old], Message::Barrier(b1.clone().into_dispatcher()).into());
343 assert_recv_pending!(); collect_upstream_tx!([new]);
346
347 send!([new], Message::Barrier(b1.clone().into_dispatcher()).into());
348 recv!().unwrap().as_barrier().unwrap(); send_error!([old], Message::Chunk(StreamChunk::default()).into());
352 assert_recv_pending!();
353
354 send!([new], Message::Chunk(StreamChunk::default()).into());
356 recv!().unwrap().as_chunk().unwrap(); assert_recv_pending!();
358 }
359}