risingwave_stream/executor/
receiver.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use itertools::Itertools;
17use tokio::sync::mpsc;
18use tokio::time::Instant;
19
20use super::exchange::input::BoxedInput;
21use crate::executor::DispatcherMessage;
22use crate::executor::exchange::input::{
23    assert_equal_dispatcher_barrier, new_input, process_dispatcher_msg,
24};
25use crate::executor::prelude::*;
26use crate::task::{FragmentId, SharedContext};
27
28/// `ReceiverExecutor` is used along with a channel. After creating a mpsc channel,
29/// there should be a `ReceiverExecutor` running in the background, so as to push
30/// messages down to the executors.
31pub struct ReceiverExecutor {
32    /// Input from upstream.
33    input: BoxedInput,
34
35    /// The context of the actor.
36    actor_context: ActorContextRef,
37
38    /// Belonged fragment id.
39    fragment_id: FragmentId,
40
41    /// Upstream fragment id.
42    upstream_fragment_id: FragmentId,
43
44    /// Shared context of the stream manager.
45    context: Arc<SharedContext>,
46
47    /// Metrics
48    metrics: Arc<StreamingMetrics>,
49
50    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
51}
52
53impl std::fmt::Debug for ReceiverExecutor {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("ReceiverExecutor").finish()
56    }
57}
58
59impl ReceiverExecutor {
60    #[allow(clippy::too_many_arguments)]
61    pub fn new(
62        ctx: ActorContextRef,
63        fragment_id: FragmentId,
64        upstream_fragment_id: FragmentId,
65        input: BoxedInput,
66        context: Arc<SharedContext>,
67        metrics: Arc<StreamingMetrics>,
68        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
69    ) -> Self {
70        Self {
71            input,
72            actor_context: ctx,
73            upstream_fragment_id,
74            metrics,
75            fragment_id,
76            context,
77            barrier_rx,
78        }
79    }
80
81    #[cfg(test)]
82    pub fn for_test(
83        actor_id: ActorId,
84        input: super::exchange::permit::Receiver,
85        shared_context: Arc<SharedContext>,
86        local_barrier_manager: crate::task::LocalBarrierManager,
87    ) -> Self {
88        use super::exchange::input::LocalInput;
89        use crate::executor::exchange::input::Input;
90
91        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
92
93        Self::new(
94            ActorContext::for_test(actor_id),
95            514,
96            1919,
97            LocalInput::new(input, 0).boxed_input(),
98            shared_context,
99            StreamingMetrics::unused().into(),
100            barrier_rx,
101        )
102    }
103}
104
105impl Execute for ReceiverExecutor {
106    fn execute(mut self: Box<Self>) -> BoxedMessageStream {
107        let actor_id = self.actor_context.id;
108
109        let mut metrics = self.metrics.new_actor_input_metrics(
110            actor_id,
111            self.fragment_id,
112            self.upstream_fragment_id,
113        );
114
115        let stream = #[try_stream]
116        async move {
117            let mut start_time = Instant::now();
118            while let Some(msg) = self.input.next().await {
119                metrics
120                    .actor_input_buffer_blocking_duration_ns
121                    .inc_by(start_time.elapsed().as_nanos() as u64);
122                let msg: DispatcherMessage = msg?;
123                let mut msg = process_dispatcher_msg(msg, &mut self.barrier_rx).await?;
124
125                match &mut msg {
126                    Message::Watermark(_) => {
127                        // Do nothing.
128                    }
129                    Message::Chunk(chunk) => {
130                        metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
131                    }
132                    Message::Barrier(barrier) => {
133                        tracing::debug!(
134                            target: "events::stream::barrier::path",
135                            actor_id = actor_id,
136                            "receiver receives barrier from path: {:?}",
137                            barrier.passed_actors
138                        );
139                        barrier.passed_actors.push(actor_id);
140
141                        if let Some(update) = barrier
142                            .as_update_merge(self.actor_context.id, self.upstream_fragment_id)
143                        {
144                            let new_upstream_fragment_id = update
145                                .new_upstream_fragment_id
146                                .unwrap_or(self.upstream_fragment_id);
147                            let added_upstream_actor_id = update.added_upstream_actor_id.clone();
148                            let removed_upstream_actor_id: Vec<_> =
149                                if update.new_upstream_fragment_id.is_some() {
150                                    vec![self.input.actor_id()]
151                                } else {
152                                    update.removed_upstream_actor_id.clone()
153                                };
154
155                            assert_eq!(
156                                removed_upstream_actor_id,
157                                vec![self.input.actor_id()],
158                                "the removed upstream actor should be the same as the current input"
159                            );
160                            let upstream_actor_id = *added_upstream_actor_id
161                                .iter()
162                                .exactly_one()
163                                .expect("receiver should have exactly one upstream");
164
165                            // Create new upstream receiver.
166                            let mut new_upstream = new_input(
167                                &self.context,
168                                self.metrics.clone(),
169                                self.actor_context.id,
170                                self.fragment_id,
171                                upstream_actor_id,
172                                new_upstream_fragment_id,
173                            )
174                            .context("failed to create upstream input")?;
175
176                            // Poll the first barrier from the new upstream. It must be the same as
177                            // the one we polled from original upstream.
178                            let new_barrier = expect_first_barrier(&mut new_upstream).await?;
179                            assert_equal_dispatcher_barrier(barrier, &new_barrier);
180
181                            // Replace the input.
182                            self.input = new_upstream;
183
184                            self.upstream_fragment_id = new_upstream_fragment_id;
185                            metrics = self.metrics.new_actor_input_metrics(
186                                actor_id,
187                                self.fragment_id,
188                                self.upstream_fragment_id,
189                            );
190                        }
191                    }
192                };
193
194                yield msg;
195                start_time = Instant::now();
196            }
197        };
198
199        stream.boxed()
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use std::collections::HashMap;
206
207    use futures::{FutureExt, pin_mut};
208    use risingwave_common::util::epoch::test_epoch;
209    use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
210
211    use super::*;
212    use crate::executor::{MessageInner as Message, UpdateMutation};
213    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
214    use crate::task::test_utils::helper_make_local_actor;
215
216    #[tokio::test]
217    async fn test_configuration_change() {
218        let actor_id = 233;
219        let (old, new) = (114, 514); // old and new upstream actor id
220
221        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
222
223        let ctx = barrier_test_env.shared_context.clone();
224        let metrics = Arc::new(StreamingMetrics::unused());
225
226        // 1. Register info in context.
227
228        ctx.add_actors(
229            [actor_id, old, new]
230                .into_iter()
231                .map(helper_make_local_actor),
232        );
233
234        // old -> actor_id
235        // new -> actor_id
236
237        let (upstream_fragment_id, fragment_id) = (10, 18);
238
239        // 4. Send a configuration change barrier.
240        let merge_updates = maplit::hashmap! {
241            (actor_id, upstream_fragment_id) => MergeUpdate {
242                actor_id,
243                upstream_fragment_id,
244                new_upstream_fragment_id: None,
245                added_upstream_actor_id: vec![new],
246                removed_upstream_actor_id: vec![old],
247            }
248        };
249
250        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
251            UpdateMutation {
252                dispatchers: Default::default(),
253                merges: merge_updates,
254                vnode_bitmaps: Default::default(),
255                dropped_actors: Default::default(),
256                actor_splits: Default::default(),
257                actor_new_dispatchers: Default::default(),
258            },
259        ));
260
261        barrier_test_env.inject_barrier(&b1, [actor_id]);
262        barrier_test_env.flush_all_events().await;
263
264        let input = new_input(
265            &ctx,
266            metrics.clone(),
267            actor_id,
268            fragment_id,
269            old,
270            upstream_fragment_id,
271        )
272        .unwrap();
273
274        let receiver = ReceiverExecutor::new(
275            ActorContext::for_test(actor_id),
276            fragment_id,
277            upstream_fragment_id,
278            input,
279            ctx.clone(),
280            metrics.clone(),
281            barrier_test_env
282                .local_barrier_manager
283                .subscribe_barrier(actor_id),
284        )
285        .boxed()
286        .execute();
287
288        pin_mut!(receiver);
289
290        // 2. Take downstream receivers.
291        let txs = [old, new]
292            .into_iter()
293            .map(|id| (id, ctx.take_sender(&(id, actor_id)).unwrap()))
294            .collect::<HashMap<_, _>>();
295        macro_rules! send {
296            ($actors:expr, $msg:expr) => {
297                for actor in $actors {
298                    txs.get(&actor).unwrap().send($msg).await.unwrap();
299                }
300            };
301        }
302        macro_rules! send_error {
303            ($actors:expr, $msg:expr) => {
304                for actor in $actors {
305                    txs.get(&actor).unwrap().send($msg).await.unwrap_err();
306                }
307            };
308        }
309        macro_rules! assert_recv_pending {
310            () => {
311                assert!(
312                    receiver
313                        .next()
314                        .now_or_never()
315                        .flatten()
316                        .transpose()
317                        .unwrap()
318                        .is_none()
319                );
320            };
321        }
322
323        macro_rules! recv {
324            () => {
325                receiver.next().await.transpose().unwrap()
326            };
327        }
328
329        // 3. Send a chunk.
330        send!([old], Message::Chunk(StreamChunk::default()).into());
331        recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk.
332        assert_recv_pending!();
333
334        send!([new], Message::Barrier(b1.clone().into_dispatcher()).into());
335        assert_recv_pending!(); // We should not receive the barrier, as new is not the upstream.
336
337        send!([old], Message::Barrier(b1.clone().into_dispatcher()).into());
338        recv!().unwrap().as_barrier().unwrap(); // We should now receive the barrier.
339
340        // 5. Send a chunk to the removed upstream.
341        send_error!([old], Message::Chunk(StreamChunk::default()).into());
342        assert_recv_pending!();
343
344        // 6. Send a chunk to the added upstream.
345        send!([new], Message::Chunk(StreamChunk::default()).into());
346        recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk.
347        assert_recv_pending!();
348    }
349}