risingwave_stream/executor/
receiver.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use tokio::sync::mpsc;
17use tokio::time::Instant;
18
19use super::exchange::input::BoxedActorInput;
20use crate::executor::prelude::*;
21use crate::executor::{DispatchBarrierBuffer, DispatcherMessage};
22use crate::task::{FragmentId, LocalBarrierManager};
23
24/// `ReceiverExecutor` is used along with a channel. After creating a mpsc channel,
25/// there should be a `ReceiverExecutor` running in the background, so as to push
26/// messages down to the executors.
27pub struct ReceiverExecutor {
28    /// Input from upstream.
29    input: BoxedActorInput,
30
31    /// The context of the actor.
32    actor_context: ActorContextRef,
33
34    /// Belonged fragment id.
35    fragment_id: FragmentId,
36
37    /// Upstream fragment id.
38    upstream_fragment_id: FragmentId,
39
40    local_barrier_manager: LocalBarrierManager,
41
42    /// Metrics
43    metrics: Arc<StreamingMetrics>,
44
45    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
46}
47
48impl std::fmt::Debug for ReceiverExecutor {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct("ReceiverExecutor").finish()
51    }
52}
53
54impl ReceiverExecutor {
55    #[allow(clippy::too_many_arguments)]
56    pub fn new(
57        ctx: ActorContextRef,
58        fragment_id: FragmentId,
59        upstream_fragment_id: FragmentId,
60        input: BoxedActorInput,
61        local_barrier_manager: LocalBarrierManager,
62        metrics: Arc<StreamingMetrics>,
63        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
64    ) -> Self {
65        Self {
66            input,
67            actor_context: ctx,
68            upstream_fragment_id,
69            local_barrier_manager,
70            metrics,
71            fragment_id,
72            barrier_rx,
73        }
74    }
75
76    #[cfg(test)]
77    pub fn for_test(
78        actor_id: ActorId,
79        input: super::exchange::permit::Receiver,
80        local_barrier_manager: crate::task::LocalBarrierManager,
81    ) -> Self {
82        use super::exchange::input::LocalInput;
83        use crate::executor::exchange::input::ActorInput;
84
85        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
86
87        Self::new(
88            ActorContext::for_test(actor_id),
89            514,
90            1919,
91            LocalInput::new(input, 0).boxed_input(),
92            local_barrier_manager,
93            StreamingMetrics::unused().into(),
94            barrier_rx,
95        )
96    }
97}
98
99impl Execute for ReceiverExecutor {
100    fn execute(mut self: Box<Self>) -> BoxedMessageStream {
101        let actor_id = self.actor_context.id;
102
103        let mut metrics = self.metrics.new_actor_input_metrics(
104            actor_id,
105            self.fragment_id,
106            self.upstream_fragment_id,
107        );
108
109        let stream = #[try_stream]
110        async move {
111            let mut barrier_buffer = DispatchBarrierBuffer::new(
112                self.barrier_rx,
113                actor_id,
114                self.upstream_fragment_id,
115                self.local_barrier_manager,
116                self.metrics.clone(),
117                self.fragment_id,
118            );
119            let mut start_time = Instant::now();
120            loop {
121                let msg = barrier_buffer.await_next_message(&mut self.input).await?;
122                metrics
123                    .actor_input_buffer_blocking_duration_ns
124                    .inc_by(start_time.elapsed().as_nanos() as u64);
125
126                let msg = match msg {
127                    DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark),
128                    DispatcherMessage::Chunk(chunk) => {
129                        metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
130                        Message::Chunk(chunk)
131                    }
132                    DispatcherMessage::Barrier(barrier) => {
133                        tracing::debug!(
134                            target: "events::stream::barrier::path",
135                            actor_id = actor_id,
136                            "receiver receives barrier from path: {:?}",
137                            barrier.passed_actors
138                        );
139                        let (mut barrier, new_inputs) =
140                            barrier_buffer.pop_barrier_with_inputs(barrier).await?;
141                        barrier.passed_actors.push(actor_id);
142
143                        if let Some(update) = barrier
144                            .as_update_merge(self.actor_context.id, self.upstream_fragment_id)
145                        {
146                            let new_upstream_fragment_id = update
147                                .new_upstream_fragment_id
148                                .unwrap_or(self.upstream_fragment_id);
149                            let removed_upstream_actor_id: Vec<_> =
150                                if update.new_upstream_fragment_id.is_some() {
151                                    vec![self.input.id()]
152                                } else {
153                                    update.removed_upstream_actor_id.clone()
154                                };
155
156                            assert_eq!(
157                                removed_upstream_actor_id,
158                                vec![self.input.id()],
159                                "the removed upstream actor should be the same as the current input"
160                            );
161                            let new_upstream = new_inputs
162                                .expect("should always have new inputs when handling update merge")
163                                .into_iter()
164                                .exactly_one()
165                                .expect("receiver should have exactly one new upstream");
166
167                            // Replace the input.
168                            self.input = new_upstream;
169
170                            self.upstream_fragment_id = new_upstream_fragment_id;
171                            metrics = self.metrics.new_actor_input_metrics(
172                                actor_id,
173                                self.fragment_id,
174                                self.upstream_fragment_id,
175                            );
176                        }
177
178                        let is_stop = barrier.is_stop(actor_id);
179                        let msg = Message::Barrier(barrier);
180                        if is_stop {
181                            yield msg;
182                            break;
183                        }
184
185                        msg
186                    }
187                };
188
189                yield msg;
190                start_time = Instant::now();
191            }
192        };
193
194        stream.boxed()
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use std::collections::HashMap;
201
202    use futures::{FutureExt, pin_mut};
203    use risingwave_common::util::epoch::test_epoch;
204    use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
205
206    use super::*;
207    use crate::executor::exchange::input::new_input;
208    use crate::executor::{MessageInner as Message, UpdateMutation};
209    use crate::task::NewOutputRequest;
210    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
211    use crate::task::test_utils::helper_make_local_actor;
212
213    #[tokio::test]
214    async fn test_configuration_change() {
215        let actor_id = 233;
216        let (old, new) = (114, 514); // old and new upstream actor id
217
218        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
219
220        let metrics = Arc::new(StreamingMetrics::unused());
221
222        // 1. Register info in context.
223
224        // old -> actor_id
225        // new -> actor_id
226
227        let (upstream_fragment_id, fragment_id) = (10, 18);
228
229        // 4. Send a configuration change barrier.
230        let merge_updates = maplit::hashmap! {
231            (actor_id, upstream_fragment_id) => MergeUpdate {
232                actor_id,
233                upstream_fragment_id,
234                new_upstream_fragment_id: None,
235                added_upstream_actors: vec![helper_make_local_actor(new)],
236                removed_upstream_actor_id: vec![old],
237            }
238        };
239
240        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
241            UpdateMutation {
242                dispatchers: Default::default(),
243                merges: merge_updates,
244                vnode_bitmaps: Default::default(),
245                dropped_actors: Default::default(),
246                actor_splits: Default::default(),
247                actor_new_dispatchers: Default::default(),
248                actor_cdc_table_snapshot_splits: Default::default(),
249                sink_add_columns: Default::default(),
250            },
251        ));
252
253        barrier_test_env.inject_barrier(&b1, [actor_id]);
254        barrier_test_env.flush_all_events().await;
255
256        let input = new_input(
257            &barrier_test_env.local_barrier_manager,
258            metrics.clone(),
259            actor_id,
260            fragment_id,
261            &helper_make_local_actor(old),
262            upstream_fragment_id,
263        )
264        .await
265        .unwrap();
266
267        let receiver = ReceiverExecutor::new(
268            ActorContext::for_test(actor_id),
269            fragment_id,
270            upstream_fragment_id,
271            input,
272            barrier_test_env.local_barrier_manager.clone(),
273            metrics.clone(),
274            barrier_test_env
275                .local_barrier_manager
276                .subscribe_barrier(actor_id),
277        )
278        .boxed()
279        .execute();
280
281        pin_mut!(receiver);
282
283        let mut txs = HashMap::new();
284        macro_rules! send {
285            ($actors:expr, $msg:expr) => {
286                for actor in $actors {
287                    txs.get(&actor).unwrap().send($msg).await.unwrap();
288                }
289            };
290        }
291        macro_rules! send_error {
292            ($actors:expr, $msg:expr) => {
293                for actor in $actors {
294                    txs.get(&actor).unwrap().send($msg).await.unwrap_err();
295                }
296            };
297        }
298        macro_rules! assert_recv_pending {
299            () => {
300                assert!(
301                    receiver
302                        .next()
303                        .now_or_never()
304                        .flatten()
305                        .transpose()
306                        .unwrap()
307                        .is_none()
308                );
309            };
310        }
311
312        macro_rules! recv {
313            () => {
314                receiver.next().await.transpose().unwrap()
315            };
316        }
317
318        macro_rules! collect_upstream_tx {
319            ($actors:expr) => {
320                for upstream_id in $actors {
321                    let mut output_requests = barrier_test_env
322                        .take_pending_new_output_requests(upstream_id)
323                        .await;
324                    assert_eq!(output_requests.len(), 1);
325                    let (downstream_actor_id, request) = output_requests.pop().unwrap();
326                    assert_eq!(actor_id, downstream_actor_id);
327                    let NewOutputRequest::Local(tx) = request else {
328                        unreachable!()
329                    };
330                    txs.insert(upstream_id, tx);
331                }
332            };
333        }
334
335        assert_recv_pending!();
336        barrier_test_env.flush_all_events().await;
337
338        // 2. Take downstream receivers.
339        collect_upstream_tx!([old]);
340
341        // 3. Send a chunk.
342        send!([old], Message::Chunk(StreamChunk::default()).into());
343        recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk.
344        assert_recv_pending!();
345
346        send!([old], Message::Barrier(b1.clone().into_dispatcher()).into());
347        assert_recv_pending!(); // We should not receive the barrier, as new is not the upstream.
348
349        collect_upstream_tx!([new]);
350
351        send!([new], Message::Barrier(b1.clone().into_dispatcher()).into());
352        recv!().unwrap().as_barrier().unwrap(); // We should now receive the barrier.
353
354        // 5. Send a chunk to the removed upstream.
355        send_error!([old], Message::Chunk(StreamChunk::default()).into());
356        assert_recv_pending!();
357
358        // 6. Send a chunk to the added upstream.
359        send!([new], Message::Chunk(StreamChunk::default()).into());
360        recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk.
361        assert_recv_pending!();
362    }
363}