risingwave_stream/executor/
receiver.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use tokio::sync::mpsc;
17
18use super::exchange::input::BoxedActorInput;
19use crate::executor::prelude::*;
20use crate::executor::{DispatchBarrierBuffer, DispatcherMessage};
21use crate::task::{FragmentId, LocalBarrierManager};
22
23/// `ReceiverExecutor` is used along with a channel. After creating a mpsc channel,
24/// there should be a `ReceiverExecutor` running in the background, so as to push
25/// messages down to the executors.
26pub struct ReceiverExecutor {
27    /// Input from upstream.
28    input: BoxedActorInput,
29
30    /// The context of the actor.
31    actor_context: ActorContextRef,
32
33    /// Belonged fragment id.
34    fragment_id: FragmentId,
35
36    /// Upstream fragment id.
37    upstream_fragment_id: FragmentId,
38
39    local_barrier_manager: LocalBarrierManager,
40
41    /// Metrics
42    metrics: Arc<StreamingMetrics>,
43
44    barrier_rx: mpsc::UnboundedReceiver<Barrier>,
45}
46
47impl std::fmt::Debug for ReceiverExecutor {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("ReceiverExecutor").finish()
50    }
51}
52
53impl ReceiverExecutor {
54    #[allow(clippy::too_many_arguments)]
55    pub fn new(
56        ctx: ActorContextRef,
57        fragment_id: FragmentId,
58        upstream_fragment_id: FragmentId,
59        input: BoxedActorInput,
60        local_barrier_manager: LocalBarrierManager,
61        metrics: Arc<StreamingMetrics>,
62        barrier_rx: mpsc::UnboundedReceiver<Barrier>,
63    ) -> Self {
64        Self {
65            input,
66            actor_context: ctx,
67            upstream_fragment_id,
68            local_barrier_manager,
69            metrics,
70            fragment_id,
71            barrier_rx,
72        }
73    }
74
75    #[cfg(test)]
76    pub fn for_test(
77        actor_id: impl Into<ActorId>,
78        input: super::exchange::permit::Receiver,
79        local_barrier_manager: crate::task::LocalBarrierManager,
80    ) -> Self {
81        let actor_id = actor_id.into();
82        use super::exchange::input::LocalInput;
83        use crate::executor::exchange::input::ActorInput;
84
85        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
86
87        Self::new(
88            ActorContext::for_test(actor_id),
89            514.into(),
90            1919.into(),
91            LocalInput::new(input, 0.into()).boxed_input(),
92            local_barrier_manager,
93            StreamingMetrics::unused().into(),
94            barrier_rx,
95        )
96    }
97}
98
99impl Execute for ReceiverExecutor {
100    fn execute(mut self: Box<Self>) -> BoxedMessageStream {
101        let actor_id = self.actor_context.id;
102
103        let mut metrics = self.metrics.new_actor_input_metrics(
104            actor_id,
105            self.fragment_id,
106            self.upstream_fragment_id,
107        );
108
109        let stream = #[try_stream]
110        async move {
111            let mut barrier_buffer = DispatchBarrierBuffer::new(
112                self.barrier_rx,
113                actor_id,
114                self.upstream_fragment_id,
115                self.local_barrier_manager,
116                self.metrics.clone(),
117                self.fragment_id,
118            );
119            loop {
120                let msg = barrier_buffer
121                    .await_next_message(&mut self.input, &metrics)
122                    .await?;
123                let msg = match msg {
124                    DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark),
125                    DispatcherMessage::Chunk(chunk) => {
126                        metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
127                        Message::Chunk(chunk)
128                    }
129                    DispatcherMessage::Barrier(barrier) => {
130                        tracing::debug!(
131                            target: "events::stream::barrier::path",
132                            actor_id = %actor_id,
133                            "receiver receives barrier from path: {:?}",
134                            barrier.passed_actors
135                        );
136                        let (mut barrier, new_inputs) =
137                            barrier_buffer.pop_barrier_with_inputs(barrier).await?;
138                        barrier.passed_actors.push(actor_id);
139
140                        if let Some(update) = barrier
141                            .as_update_merge(self.actor_context.id, self.upstream_fragment_id)
142                        {
143                            let new_upstream_fragment_id = update
144                                .new_upstream_fragment_id
145                                .unwrap_or(self.upstream_fragment_id);
146                            let removed_upstream_actor_id: Vec<_> =
147                                if update.new_upstream_fragment_id.is_some() {
148                                    vec![self.input.id()]
149                                } else {
150                                    update.removed_upstream_actor_id.clone()
151                                };
152
153                            assert_eq!(
154                                removed_upstream_actor_id,
155                                vec![self.input.id()],
156                                "the removed upstream actor should be the same as the current input"
157                            );
158                            let new_upstream = new_inputs
159                                .expect("should always have new inputs when handling update merge")
160                                .into_iter()
161                                .exactly_one()
162                                .expect("receiver should have exactly one new upstream");
163
164                            // Replace the input.
165                            self.input = new_upstream;
166
167                            self.upstream_fragment_id = new_upstream_fragment_id;
168                            metrics = self.metrics.new_actor_input_metrics(
169                                actor_id,
170                                self.fragment_id,
171                                self.upstream_fragment_id,
172                            );
173                        }
174
175                        let is_stop = barrier.is_stop(actor_id);
176                        let msg = Message::Barrier(barrier);
177                        if is_stop {
178                            yield msg;
179                            break;
180                        }
181
182                        msg
183                    }
184                };
185
186                yield msg;
187            }
188        };
189
190        stream.boxed()
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use std::collections::HashMap;
197
198    use futures::{FutureExt, pin_mut};
199    use risingwave_common::util::epoch::test_epoch;
200    use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
201
202    use super::*;
203    use crate::executor::exchange::input::new_input;
204    use crate::executor::{MessageInner as Message, UpdateMutation};
205    use crate::task::NewOutputRequest;
206    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
207    use crate::task::test_utils::helper_make_local_actor;
208
209    #[tokio::test]
210    async fn test_configuration_change() {
211        let actor_id = 233.into();
212        let (old, new) = (114.into(), 514.into()); // old and new upstream actor id
213
214        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
215
216        let metrics = Arc::new(StreamingMetrics::unused());
217
218        // 1. Register info in context.
219
220        // old -> actor_id
221        // new -> actor_id
222
223        let (upstream_fragment_id, fragment_id) = (10.into(), 18.into());
224
225        // 4. Send a configuration change barrier.
226        let merge_updates = maplit::hashmap! {
227            (actor_id, upstream_fragment_id) => MergeUpdate {
228                actor_id,
229                upstream_fragment_id,
230                new_upstream_fragment_id: None,
231                added_upstream_actors: vec![helper_make_local_actor(new)],
232                removed_upstream_actor_id: vec![old],
233            }
234        };
235
236        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
237            UpdateMutation {
238                dispatchers: Default::default(),
239                merges: merge_updates,
240                vnode_bitmaps: Default::default(),
241                dropped_actors: Default::default(),
242                actor_splits: Default::default(),
243                actor_new_dispatchers: Default::default(),
244                actor_cdc_table_snapshot_splits: Default::default(),
245                sink_add_columns: Default::default(),
246            },
247        ));
248
249        barrier_test_env.inject_barrier(&b1, [actor_id]);
250        barrier_test_env.flush_all_events().await;
251
252        let input = new_input(
253            &barrier_test_env.local_barrier_manager,
254            metrics.clone(),
255            actor_id,
256            fragment_id,
257            &helper_make_local_actor(old),
258            upstream_fragment_id,
259        )
260        .await
261        .unwrap();
262
263        let receiver = ReceiverExecutor::new(
264            ActorContext::for_test(actor_id),
265            fragment_id,
266            upstream_fragment_id,
267            input,
268            barrier_test_env.local_barrier_manager.clone(),
269            metrics.clone(),
270            barrier_test_env
271                .local_barrier_manager
272                .subscribe_barrier(actor_id),
273        )
274        .boxed()
275        .execute();
276
277        pin_mut!(receiver);
278
279        let mut txs = HashMap::new();
280        macro_rules! send {
281            ($actors:expr, $msg:expr) => {
282                for actor in $actors {
283                    txs.get(&actor).unwrap().send($msg).await.unwrap();
284                }
285            };
286        }
287        macro_rules! send_error {
288            ($actors:expr, $msg:expr) => {
289                for actor in $actors {
290                    txs.get(&actor).unwrap().send($msg).await.unwrap_err();
291                }
292            };
293        }
294        macro_rules! assert_recv_pending {
295            () => {
296                assert!(
297                    receiver
298                        .next()
299                        .now_or_never()
300                        .flatten()
301                        .transpose()
302                        .unwrap()
303                        .is_none()
304                );
305            };
306        }
307
308        macro_rules! recv {
309            () => {
310                receiver.next().await.transpose().unwrap()
311            };
312        }
313
314        macro_rules! collect_upstream_tx {
315            ($actors:expr) => {
316                for upstream_id in $actors {
317                    let mut output_requests = barrier_test_env
318                        .take_pending_new_output_requests(upstream_id.into())
319                        .await;
320                    assert_eq!(output_requests.len(), 1);
321                    let (downstream_actor_id, request) = output_requests.pop().unwrap();
322                    assert_eq!(downstream_actor_id, actor_id);
323                    let NewOutputRequest::Local(tx) = request else {
324                        unreachable!()
325                    };
326                    txs.insert(upstream_id, tx);
327                }
328            };
329        }
330
331        assert_recv_pending!();
332        barrier_test_env.flush_all_events().await;
333
334        // 2. Take downstream receivers.
335        collect_upstream_tx!([old]);
336
337        // 3. Send a chunk.
338        send!([old], Message::Chunk(StreamChunk::default()).into());
339        recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk.
340        assert_recv_pending!();
341
342        send!([old], Message::Barrier(b1.clone().into_dispatcher()).into());
343        assert_recv_pending!(); // We should not receive the barrier, as new is not the upstream.
344
345        collect_upstream_tx!([new]);
346
347        send!([new], Message::Barrier(b1.clone().into_dispatcher()).into());
348        recv!().unwrap().as_barrier().unwrap(); // We should now receive the barrier.
349
350        // 5. Send a chunk to the removed upstream.
351        send_error!([old], Message::Chunk(StreamChunk::default()).into());
352        assert_recv_pending!();
353
354        // 6. Send a chunk to the added upstream.
355        send!([new], Message::Chunk(StreamChunk::default()).into());
356        recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk.
357        assert_recv_pending!();
358    }
359}