risingwave_stream/executor/
receiver.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use itertools::Itertools;
17use tokio::sync::mpsc;
18use tokio::time::Instant;
19
20use super::exchange::input::BoxedActorInput;
21use crate::executor::DispatcherMessage;
22use crate::executor::exchange::input::{
23    assert_equal_dispatcher_barrier, new_input, process_dispatcher_msg,
24};
25use crate::executor::prelude::*;
26use crate::task::{FragmentId, LocalBarrierManager};
27
28/// `ReceiverExecutor` is used along with a channel. After creating a mpsc channel,
29/// there should be a `ReceiverExecutor` running in the background, so as to push
30/// messages down to the executors.
31pub struct ReceiverExecutor {
32    /// Input from upstream.
33    input: BoxedActorInput,
34
35    /// The context of the actor.
36    actor_context: ActorContextRef,
37
38    /// Belonged fragment id.
39    fragment_id: FragmentId,
40
41    /// Upstream fragment id.
42    upstream_fragment_id: FragmentId,
43
44    local_barrier_manager: LocalBarrierManager,
45
46    /// Metrics
47    metrics: Arc<StreamingMetrics>,
48
49    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
50}
51
52impl std::fmt::Debug for ReceiverExecutor {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.debug_struct("ReceiverExecutor").finish()
55    }
56}
57
58impl ReceiverExecutor {
59    #[allow(clippy::too_many_arguments)]
60    pub fn new(
61        ctx: ActorContextRef,
62        fragment_id: FragmentId,
63        upstream_fragment_id: FragmentId,
64        input: BoxedActorInput,
65        local_barrier_manager: LocalBarrierManager,
66        metrics: Arc<StreamingMetrics>,
67        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
68    ) -> Self {
69        Self {
70            input,
71            actor_context: ctx,
72            upstream_fragment_id,
73            local_barrier_manager,
74            metrics,
75            fragment_id,
76            barrier_rx,
77        }
78    }
79
80    #[cfg(test)]
81    pub fn for_test(
82        actor_id: ActorId,
83        input: super::exchange::permit::Receiver,
84        local_barrier_manager: crate::task::LocalBarrierManager,
85    ) -> Self {
86        use super::exchange::input::LocalInput;
87        use crate::executor::exchange::input::ActorInput;
88
89        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
90
91        Self::new(
92            ActorContext::for_test(actor_id),
93            514,
94            1919,
95            LocalInput::new(input, 0).boxed_input(),
96            local_barrier_manager,
97            StreamingMetrics::unused().into(),
98            barrier_rx,
99        )
100    }
101}
102
103impl Execute for ReceiverExecutor {
104    fn execute(mut self: Box<Self>) -> BoxedMessageStream {
105        let actor_id = self.actor_context.id;
106
107        let mut metrics = self.metrics.new_actor_input_metrics(
108            actor_id,
109            self.fragment_id,
110            self.upstream_fragment_id,
111        );
112
113        let stream = #[try_stream]
114        async move {
115            let mut start_time = Instant::now();
116            while let Some(msg) = self.input.next().await {
117                metrics
118                    .actor_input_buffer_blocking_duration_ns
119                    .inc_by(start_time.elapsed().as_nanos() as u64);
120                let msg: DispatcherMessage = msg?;
121                let mut msg = process_dispatcher_msg(msg, &mut self.barrier_rx).await?;
122
123                match &mut msg {
124                    Message::Watermark(_) => {
125                        // Do nothing.
126                    }
127                    Message::Chunk(chunk) => {
128                        metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
129                    }
130                    Message::Barrier(barrier) => {
131                        tracing::debug!(
132                            target: "events::stream::barrier::path",
133                            actor_id = actor_id,
134                            "receiver receives barrier from path: {:?}",
135                            barrier.passed_actors
136                        );
137                        barrier.passed_actors.push(actor_id);
138
139                        if let Some(update) = barrier
140                            .as_update_merge(self.actor_context.id, self.upstream_fragment_id)
141                        {
142                            let new_upstream_fragment_id = update
143                                .new_upstream_fragment_id
144                                .unwrap_or(self.upstream_fragment_id);
145                            let removed_upstream_actor_id: Vec<_> =
146                                if update.new_upstream_fragment_id.is_some() {
147                                    vec![self.input.id()]
148                                } else {
149                                    update.removed_upstream_actor_id.clone()
150                                };
151
152                            assert_eq!(
153                                removed_upstream_actor_id,
154                                vec![self.input.id()],
155                                "the removed upstream actor should be the same as the current input"
156                            );
157                            let upstream_actor = update
158                                .added_upstream_actors
159                                .iter()
160                                .exactly_one()
161                                .expect("receiver should have exactly one upstream");
162
163                            // Create new upstream receiver.
164                            let mut new_upstream = new_input(
165                                &self.local_barrier_manager,
166                                self.metrics.clone(),
167                                self.actor_context.id,
168                                self.fragment_id,
169                                upstream_actor,
170                                new_upstream_fragment_id,
171                            )
172                            .await
173                            .context("failed to create upstream input")?;
174
175                            // Poll the first barrier from the new upstream. It must be the same as
176                            // the one we polled from original upstream.
177                            let new_barrier = expect_first_barrier(&mut new_upstream).await?;
178                            assert_equal_dispatcher_barrier(barrier, &new_barrier);
179
180                            // Replace the input.
181                            self.input = new_upstream;
182
183                            self.upstream_fragment_id = new_upstream_fragment_id;
184                            metrics = self.metrics.new_actor_input_metrics(
185                                actor_id,
186                                self.fragment_id,
187                                self.upstream_fragment_id,
188                            );
189                        }
190                    }
191                };
192
193                yield msg;
194                start_time = Instant::now();
195            }
196        };
197
198        stream.boxed()
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use std::collections::HashMap;
205
206    use futures::{FutureExt, pin_mut};
207    use risingwave_common::util::epoch::test_epoch;
208    use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
209
210    use super::*;
211    use crate::executor::{MessageInner as Message, UpdateMutation};
212    use crate::task::NewOutputRequest;
213    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
214    use crate::task::test_utils::helper_make_local_actor;
215
216    #[tokio::test]
217    async fn test_configuration_change() {
218        let actor_id = 233;
219        let (old, new) = (114, 514); // old and new upstream actor id
220
221        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
222
223        let metrics = Arc::new(StreamingMetrics::unused());
224
225        // 1. Register info in context.
226
227        // old -> actor_id
228        // new -> actor_id
229
230        let (upstream_fragment_id, fragment_id) = (10, 18);
231
232        // 4. Send a configuration change barrier.
233        let merge_updates = maplit::hashmap! {
234            (actor_id, upstream_fragment_id) => MergeUpdate {
235                actor_id,
236                upstream_fragment_id,
237                new_upstream_fragment_id: None,
238                added_upstream_actors: vec![helper_make_local_actor(new)],
239                removed_upstream_actor_id: vec![old],
240            }
241        };
242
243        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
244            UpdateMutation {
245                dispatchers: Default::default(),
246                merges: merge_updates,
247                vnode_bitmaps: Default::default(),
248                dropped_actors: Default::default(),
249                actor_splits: Default::default(),
250                actor_new_dispatchers: Default::default(),
251                actor_cdc_table_snapshot_splits: Default::default(),
252            },
253        ));
254
255        barrier_test_env.inject_barrier(&b1, [actor_id]);
256        barrier_test_env.flush_all_events().await;
257
258        let input = new_input(
259            &barrier_test_env.local_barrier_manager,
260            metrics.clone(),
261            actor_id,
262            fragment_id,
263            &helper_make_local_actor(old),
264            upstream_fragment_id,
265        )
266        .await
267        .unwrap();
268
269        let receiver = ReceiverExecutor::new(
270            ActorContext::for_test(actor_id),
271            fragment_id,
272            upstream_fragment_id,
273            input,
274            barrier_test_env.local_barrier_manager.clone(),
275            metrics.clone(),
276            barrier_test_env
277                .local_barrier_manager
278                .subscribe_barrier(actor_id),
279        )
280        .boxed()
281        .execute();
282
283        pin_mut!(receiver);
284
285        let mut txs = HashMap::new();
286        macro_rules! send {
287            ($actors:expr, $msg:expr) => {
288                for actor in $actors {
289                    txs.get(&actor).unwrap().send($msg).await.unwrap();
290                }
291            };
292        }
293        macro_rules! send_error {
294            ($actors:expr, $msg:expr) => {
295                for actor in $actors {
296                    txs.get(&actor).unwrap().send($msg).await.unwrap_err();
297                }
298            };
299        }
300        macro_rules! assert_recv_pending {
301            () => {
302                assert!(
303                    receiver
304                        .next()
305                        .now_or_never()
306                        .flatten()
307                        .transpose()
308                        .unwrap()
309                        .is_none()
310                );
311            };
312        }
313
314        macro_rules! recv {
315            () => {
316                receiver.next().await.transpose().unwrap()
317            };
318        }
319
320        macro_rules! collect_upstream_tx {
321            ($actors:expr) => {
322                for upstream_id in $actors {
323                    let mut output_requests = barrier_test_env
324                        .take_pending_new_output_requests(upstream_id)
325                        .await;
326                    assert_eq!(output_requests.len(), 1);
327                    let (downstream_actor_id, request) = output_requests.pop().unwrap();
328                    assert_eq!(actor_id, downstream_actor_id);
329                    let NewOutputRequest::Local(tx) = request else {
330                        unreachable!()
331                    };
332                    txs.insert(upstream_id, tx);
333                }
334            };
335        }
336
337        assert_recv_pending!();
338        barrier_test_env.flush_all_events().await;
339
340        // 2. Take downstream receivers.
341        collect_upstream_tx!([old]);
342
343        // 3. Send a chunk.
344        send!([old], Message::Chunk(StreamChunk::default()).into());
345        recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk.
346        assert_recv_pending!();
347
348        send!([old], Message::Barrier(b1.clone().into_dispatcher()).into());
349        assert_recv_pending!(); // We should not receive the barrier, as new is not the upstream.
350
351        collect_upstream_tx!([new]);
352
353        send!([new], Message::Barrier(b1.clone().into_dispatcher()).into());
354        recv!().unwrap().as_barrier().unwrap(); // We should now receive the barrier.
355
356        // 5. Send a chunk to the removed upstream.
357        send_error!([old], Message::Chunk(StreamChunk::default()).into());
358        assert_recv_pending!();
359
360        // 6. Send a chunk to the added upstream.
361        send!([new], Message::Chunk(StreamChunk::default()).into());
362        recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk.
363        assert_recv_pending!();
364    }
365}