risingwave_stream/executor/
receiver.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::executor::merge::{MergeExecutorInner, SingletonUpstream};
16
17/// `ReceiverExecutor` receives data from a single upstream actor. It's a special case of
18/// `MergeExecutor` with only one upstream.
19pub type ReceiverExecutor = MergeExecutorInner<SingletonUpstream>;
20
21impl ReceiverExecutor {
22    #[cfg(test)]
23    pub fn for_test(
24        actor_id: impl Into<risingwave_pb::id::ActorId>,
25        input: super::exchange::permit::Receiver,
26        local_barrier_manager: crate::task::LocalBarrierManager,
27    ) -> Self {
28        use super::exchange::input::LocalInput;
29        use crate::executor::ActorContext;
30        use crate::executor::exchange::input::ActorInput;
31        use crate::executor::prelude::StreamingMetrics;
32
33        let actor_id = actor_id.into();
34
35        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
36
37        Self::new(
38            ActorContext::for_test(actor_id),
39            514.into(),
40            1919.into(),
41            LocalInput::new(input, 0.into()).boxed_input(),
42            local_barrier_manager,
43            StreamingMetrics::unused().into(),
44            barrier_rx,
45        )
46    }
47}
48
49#[cfg(test)]
50mod tests {
51    use std::collections::HashMap;
52    use std::sync::Arc;
53
54    use futures::{FutureExt, StreamExt, pin_mut};
55    use risingwave_common::array::StreamChunk;
56    use risingwave_common::util::epoch::test_epoch;
57    use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
58
59    use super::*;
60    use crate::executor::exchange::input::new_input;
61    use crate::executor::prelude::StreamingMetrics;
62    use crate::executor::{
63        ActorContext, Barrier, Execute as _, MessageInner as Message, Mutation, UpdateMutation,
64    };
65    use crate::task::NewOutputRequest;
66    use crate::task::barrier_test_utils::LocalBarrierTestEnv;
67    use crate::task::test_utils::helper_make_local_actor;
68
69    #[tokio::test]
70    async fn test_configuration_change() {
71        let actor_id = 233.into();
72        let (old, new) = (114.into(), 514.into()); // old and new upstream actor id
73
74        let barrier_test_env = LocalBarrierTestEnv::for_test().await;
75
76        let metrics = Arc::new(StreamingMetrics::unused());
77
78        // 1. Register info in context.
79
80        // old -> actor_id
81        // new -> actor_id
82
83        let (upstream_fragment_id, fragment_id) = (10.into(), 18.into());
84
85        // 4. Send a configuration change barrier.
86        let merge_updates = maplit::hashmap! {
87            (actor_id, upstream_fragment_id) => MergeUpdate {
88                actor_id,
89                upstream_fragment_id,
90                new_upstream_fragment_id: None,
91                added_upstream_actors: vec![helper_make_local_actor(new)],
92                removed_upstream_actor_id: vec![old],
93            }
94        };
95
96        let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
97            UpdateMutation {
98                merges: merge_updates,
99                ..Default::default()
100            },
101        ));
102
103        barrier_test_env.inject_barrier(&b1, [actor_id]);
104        barrier_test_env.flush_all_events().await;
105
106        let actor_ctx = ActorContext::for_test(actor_id);
107
108        let input = new_input(
109            &barrier_test_env.local_barrier_manager,
110            metrics.clone(),
111            actor_id,
112            fragment_id,
113            &helper_make_local_actor(old),
114            upstream_fragment_id,
115            actor_ctx.config.clone(),
116        )
117        .await
118        .unwrap();
119
120        let receiver = ReceiverExecutor::new(
121            actor_ctx.clone(),
122            fragment_id,
123            upstream_fragment_id,
124            input,
125            barrier_test_env.local_barrier_manager.clone(),
126            metrics.clone(),
127            barrier_test_env
128                .local_barrier_manager
129                .subscribe_barrier(actor_id),
130        )
131        .boxed()
132        .execute();
133
134        pin_mut!(receiver);
135
136        let mut txs = HashMap::new();
137        macro_rules! send {
138            ($actors:expr, $msg:expr) => {
139                for actor in $actors {
140                    txs.get(&actor).unwrap().send($msg).await.unwrap();
141                }
142            };
143        }
144        macro_rules! send_error {
145            ($actors:expr, $msg:expr) => {
146                for actor in $actors {
147                    txs.get(&actor).unwrap().send($msg).await.unwrap_err();
148                }
149            };
150        }
151        macro_rules! assert_recv_pending {
152            () => {
153                assert!(
154                    receiver
155                        .next()
156                        .now_or_never()
157                        .flatten()
158                        .transpose()
159                        .unwrap()
160                        .is_none()
161                );
162            };
163        }
164
165        macro_rules! recv {
166            () => {
167                receiver.next().await.transpose().unwrap()
168            };
169        }
170
171        macro_rules! collect_upstream_tx {
172            ($actors:expr) => {
173                for upstream_id in $actors {
174                    let mut output_requests = barrier_test_env
175                        .take_pending_new_output_requests(upstream_id.into())
176                        .await;
177                    assert_eq!(output_requests.len(), 1);
178                    let (downstream_actor_id, request) = output_requests.pop().unwrap();
179                    assert_eq!(downstream_actor_id, actor_id);
180                    let NewOutputRequest::Local(tx) = request else {
181                        unreachable!()
182                    };
183                    txs.insert(upstream_id, tx);
184                }
185            };
186        }
187
188        assert_recv_pending!();
189        barrier_test_env.flush_all_events().await;
190
191        // 2. Take downstream receivers.
192        collect_upstream_tx!([old]);
193
194        // 3. Send a chunk.
195        send!([old], Message::Chunk(StreamChunk::default()).into());
196        recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk.
197        assert_recv_pending!();
198
199        send!([old], Message::Barrier(b1.clone().into_dispatcher()).into());
200        assert_recv_pending!(); // We should not receive the barrier, as new is not the upstream.
201
202        collect_upstream_tx!([new]);
203
204        send!([new], Message::Barrier(b1.clone().into_dispatcher()).into());
205        recv!().unwrap().as_barrier().unwrap(); // We should now receive the barrier.
206
207        // 5. Send a chunk to the removed upstream.
208        send_error!([old], Message::Chunk(StreamChunk::default()).into());
209        assert_recv_pending!();
210
211        // 6. Send a chunk to the added upstream.
212        send!([new], Message::Chunk(StreamChunk::default()).into());
213        recv!().unwrap().as_chunk().unwrap(); // We should be able to receive the chunk.
214        assert_recv_pending!();
215    }
216}