risingwave_stream/executor/
receiver.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use itertools::Itertools;
17use tokio::sync::mpsc;
18use tokio::time::Instant;
19
20use super::exchange::input::BoxedInput;
21use crate::executor::DispatcherMessage;
22use crate::executor::exchange::input::{
23    assert_equal_dispatcher_barrier, new_input, process_dispatcher_msg,
24};
25use crate::executor::prelude::*;
26use crate::task::{FragmentId, LocalBarrierManager};
27
28/// `ReceiverExecutor` is used along with a channel. After creating a mpsc channel,
29/// there should be a `ReceiverExecutor` running in the background, so as to push
30/// messages down to the executors.
31pub struct ReceiverExecutor {
32    /// Input from upstream.
33    input: BoxedInput,
34
35    /// The context of the actor.
36    actor_context: ActorContextRef,
37
38    /// Belonged fragment id.
39    fragment_id: FragmentId,
40
41    /// Upstream fragment id.
42    upstream_fragment_id: FragmentId,
43
44    local_barrier_manager: LocalBarrierManager,
45
46    /// Metrics
47    metrics: Arc<StreamingMetrics>,
48
49    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
50}
51
52impl std::fmt::Debug for ReceiverExecutor {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.debug_struct("ReceiverExecutor").finish()
55    }
56}
57
58impl ReceiverExecutor {
59    #[allow(clippy::too_many_arguments)]
60    pub fn new(
61        ctx: ActorContextRef,
62        fragment_id: FragmentId,
63        upstream_fragment_id: FragmentId,
64        input: BoxedInput,
65        local_barrier_manager: LocalBarrierManager,
66        metrics: Arc<StreamingMetrics>,
67        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
68    ) -> Self {
69        Self {
70            input,
71            actor_context: ctx,
72            upstream_fragment_id,
73            local_barrier_manager,
74            metrics,
75            fragment_id,
76            barrier_rx,
77        }
78    }
79
80    #[cfg(test)]
81    pub fn for_test(
82        actor_id: ActorId,
83        input: super::exchange::permit::Receiver,
84        local_barrier_manager: crate::task::LocalBarrierManager,
85    ) -> Self {
86        use super::exchange::input::LocalInput;
87        use crate::executor::exchange::input::Input;
88
89        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
90
91        Self::new(
92            ActorContext::for_test(actor_id),
93            514,
94            1919,
95            LocalInput::new(input, 0).boxed_input(),
96            local_barrier_manager,
97            StreamingMetrics::unused().into(),
98            barrier_rx,
99        )
100    }
101}
102
103impl Execute for ReceiverExecutor {
104    fn execute(mut self: Box<Self>) -> BoxedMessageStream {
105        let actor_id = self.actor_context.id;
106
107        let mut metrics = self.metrics.new_actor_input_metrics(
108            actor_id,
109            self.fragment_id,
110            self.upstream_fragment_id,
111        );
112
113        let stream = #[try_stream]
114        async move {
115            let mut start_time = Instant::now();
116            while let Some(msg) = self.input.next().await {
117                metrics
118                    .actor_input_buffer_blocking_duration_ns
119                    .inc_by(start_time.elapsed().as_nanos() as u64);
120                let msg: DispatcherMessage = msg?;
121                let mut msg = process_dispatcher_msg(msg, &mut self.barrier_rx).await?;
122
123                match &mut msg {
124                    Message::Watermark(_) => {
125                        // Do nothing.
126                    }
127                    Message::Chunk(chunk) => {
128                        metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
129                    }
130                    Message::Barrier(barrier) => {
131                        tracing::debug!(
132                            target: "events::stream::barrier::path",
133                            actor_id = actor_id,
134                            "receiver receives barrier from path: {:?}",
135                            barrier.passed_actors
136                        );
137                        barrier.passed_actors.push(actor_id);
138
139                        if let Some(update) = barrier
140                            .as_update_merge(self.actor_context.id, self.upstream_fragment_id)
141                        {
142                            let new_upstream_fragment_id = update
143                                .new_upstream_fragment_id
144                                .unwrap_or(self.upstream_fragment_id);
145                            let removed_upstream_actor_id: Vec<_> =
146                                if update.new_upstream_fragment_id.is_some() {
147                                    vec![self.input.actor_id()]
148                                } else {
149                                    update.removed_upstream_actor_id.clone()
150                                };
151
152                            assert_eq!(
153                                removed_upstream_actor_id,
154                                vec![self.input.actor_id()],
155                                "the removed upstream actor should be the same as the current input"
156                            );
157                            let upstream_actor = update
158                                .added_upstream_actors
159                                .iter()
160                                .exactly_one()
161                                .expect("receiver should have exactly one upstream");
162
163                            // Create new upstream receiver.
164                            let mut new_upstream = new_input(
165                                &self.local_barrier_manager,
166                                self.metrics.clone(),
167                                self.actor_context.id,
168                                self.fragment_id,
169                                upstream_actor,
170                                new_upstream_fragment_id,
171                            )
172                            .await
173                            .context("failed to create upstream input")?;
174
175                            // Poll the first barrier from the new upstream. It must be the same as
176                            // the one we polled from original upstream.
177                            let new_barrier = expect_first_barrier(&mut new_upstream).await?;
178                            assert_equal_dispatcher_barrier(barrier, &new_barrier);
179
180                            // Replace the input.
181                            self.input = new_upstream;
182
183                            self.upstream_fragment_id = new_upstream_fragment_id;
184                            metrics = self.metrics.new_actor_input_metrics(
185                                actor_id,
186                                self.fragment_id,
187                                self.upstream_fragment_id,
188                            );
189                        }
190                    }
191                };
192
193                yield msg;
194                start_time = Instant::now();
195            }
196        };
197
198        stream.boxed()
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use std::collections::HashMap;
205
206    use futures::{FutureExt, pin_mut};
207    use risingwave_common::util::epoch::test_epoch;
208    use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
209
210    use super::*;
211    use crate::executor::{MessageInner as Message, UpdateMutation};
212    use crate::task::NewOutputRequest;
213    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
214    use crate::task::test_utils::helper_make_local_actor;
215
216    #[tokio::test]
217    async fn test_configuration_change() {
218        let actor_id = 233;
219        let (old, new) = (114, 514); // old and new upstream actor id
220
221        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
222
223        let metrics = Arc::new(StreamingMetrics::unused());
224
225        // 1. Register info in context.
226
227        // old -> actor_id
228        // new -> actor_id
229
230        let (upstream_fragment_id, fragment_id) = (10, 18);
231
232        // 4. Send a configuration change barrier.
233        let merge_updates = maplit::hashmap! {
234            (actor_id, upstream_fragment_id) => MergeUpdate {
235                actor_id,
236                upstream_fragment_id,
237                new_upstream_fragment_id: None,
238                added_upstream_actors: vec![helper_make_local_actor(new)],
239                removed_upstream_actor_id: vec![old],
240            }
241        };
242
243        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
244            UpdateMutation {
245                dispatchers: Default::default(),
246                merges: merge_updates,
247                vnode_bitmaps: Default::default(),
248                dropped_actors: Default::default(),
249                actor_splits: Default::default(),
250                actor_new_dispatchers: Default::default(),
251            },
252        ));
253
254        barrier_test_env.inject_barrier(&b1, [actor_id]);
255        barrier_test_env.flush_all_events().await;
256
257        let input = new_input(
258            &barrier_test_env.local_barrier_manager,
259            metrics.clone(),
260            actor_id,
261            fragment_id,
262            &helper_make_local_actor(old),
263            upstream_fragment_id,
264        )
265        .await
266        .unwrap();
267
268        let receiver = ReceiverExecutor::new(
269            ActorContext::for_test(actor_id),
270            fragment_id,
271            upstream_fragment_id,
272            input,
273            barrier_test_env.local_barrier_manager.clone(),
274            metrics.clone(),
275            barrier_test_env
276                .local_barrier_manager
277                .subscribe_barrier(actor_id),
278        )
279        .boxed()
280        .execute();
281
282        pin_mut!(receiver);
283
284        let mut txs = HashMap::new();
285        macro_rules! send {
286            ($actors:expr, $msg:expr) => {
287                for actor in $actors {
288                    txs.get(&actor).unwrap().send($msg).await.unwrap();
289                }
290            };
291        }
292        macro_rules! send_error {
293            ($actors:expr, $msg:expr) => {
294                for actor in $actors {
295                    txs.get(&actor).unwrap().send($msg).await.unwrap_err();
296                }
297            };
298        }
299        macro_rules! assert_recv_pending {
300            () => {
301                assert!(
302                    receiver
303                        .next()
304                        .now_or_never()
305                        .flatten()
306                        .transpose()
307                        .unwrap()
308                        .is_none()
309                );
310            };
311        }
312
313        macro_rules! recv {
314            () => {
315                receiver.next().await.transpose().unwrap()
316            };
317        }
318
319        macro_rules! collect_upstream_tx {
320            ($actors:expr) => {
321                for upstream_id in $actors {
322                    let mut output_requests = barrier_test_env
323                        .take_pending_new_output_requests(upstream_id)
324                        .await;
325                    assert_eq!(output_requests.len(), 1);
326                    let (downstream_actor_id, request) = output_requests.pop().unwrap();
327                    assert_eq!(actor_id, downstream_actor_id);
328                    let NewOutputRequest::Local(tx) = request else {
329                        unreachable!()
330                    };
331                    txs.insert(upstream_id, tx);
332                }
333            };
334        }
335
336        assert_recv_pending!();
337        barrier_test_env.flush_all_events().await;
338
339        // 2. Take downstream receivers.
340        collect_upstream_tx!([old]);
341
342        // 3. Send a chunk.
343        send!([old], Message::Chunk(StreamChunk::default()).into());
344        recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk.
345        assert_recv_pending!();
346
347        send!([old], Message::Barrier(b1.clone().into_dispatcher()).into());
348        assert_recv_pending!(); // We should not receive the barrier, as new is not the upstream.
349
350        collect_upstream_tx!([new]);
351
352        send!([new], Message::Barrier(b1.clone().into_dispatcher()).into());
353        recv!().unwrap().as_barrier().unwrap(); // We should now receive the barrier.
354
355        // 5. Send a chunk to the removed upstream.
356        send_error!([old], Message::Chunk(StreamChunk::default()).into());
357        assert_recv_pending!();
358
359        // 6. Send a chunk to the added upstream.
360        send!([new], Message::Chunk(StreamChunk::default()).into());
361        recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk.
362        assert_recv_pending!();
363    }
364}