risingwave_stream/executor/
receiver.rs1use crate::executor::merge::{MergeExecutorInner, SingletonUpstream};
16
17pub type ReceiverExecutor = MergeExecutorInner<SingletonUpstream>;
20
21impl ReceiverExecutor {
22 #[cfg(test)]
23 pub fn for_test(
24 actor_id: impl Into<risingwave_pb::id::ActorId>,
25 input: super::exchange::permit::Receiver,
26 local_barrier_manager: crate::task::LocalBarrierManager,
27 ) -> Self {
28 use super::exchange::input::LocalInput;
29 use crate::executor::ActorContext;
30 use crate::executor::exchange::input::ActorInput;
31 use crate::executor::prelude::StreamingMetrics;
32
33 let actor_id = actor_id.into();
34
35 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_id);
36
37 Self::new(
38 ActorContext::for_test(actor_id),
39 514.into(),
40 1919.into(),
41 LocalInput::new(input, 0.into()).boxed_input(),
42 local_barrier_manager,
43 StreamingMetrics::unused().into(),
44 barrier_rx,
45 )
46 }
47}
48
49#[cfg(test)]
50mod tests {
51 use std::collections::HashMap;
52 use std::sync::Arc;
53
54 use futures::{FutureExt, StreamExt, pin_mut};
55 use risingwave_common::array::StreamChunk;
56 use risingwave_common::util::epoch::test_epoch;
57 use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
58
59 use super::*;
60 use crate::executor::exchange::input::new_input;
61 use crate::executor::prelude::StreamingMetrics;
62 use crate::executor::{
63 ActorContext, Barrier, Execute as _, MessageInner as Message, Mutation, UpdateMutation,
64 };
65 use crate::task::NewOutputRequest;
66 use crate::task::barrier_test_utils::LocalBarrierTestEnv;
67 use crate::task::test_utils::helper_make_local_actor;
68
69 #[tokio::test]
70 async fn test_configuration_change() {
71 let actor_id = 233.into();
72 let (old, new) = (114.into(), 514.into()); let barrier_test_env = LocalBarrierTestEnv::for_test().await;
75
76 let metrics = Arc::new(StreamingMetrics::unused());
77
78 let (upstream_fragment_id, fragment_id) = (10.into(), 18.into());
84
85 let merge_updates = maplit::hashmap! {
87 (actor_id, upstream_fragment_id) => MergeUpdate {
88 actor_id,
89 upstream_fragment_id,
90 new_upstream_fragment_id: None,
91 added_upstream_actors: vec![helper_make_local_actor(new)],
92 removed_upstream_actor_id: vec![old],
93 }
94 };
95
96 let b1 = Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Update(
97 UpdateMutation {
98 merges: merge_updates,
99 ..Default::default()
100 },
101 ));
102
103 barrier_test_env.inject_barrier(&b1, [actor_id]);
104 barrier_test_env.flush_all_events().await;
105
106 let actor_ctx = ActorContext::for_test(actor_id);
107
108 let input = new_input(
109 &barrier_test_env.local_barrier_manager,
110 metrics.clone(),
111 actor_id,
112 fragment_id,
113 &helper_make_local_actor(old),
114 upstream_fragment_id,
115 actor_ctx.config.clone(),
116 )
117 .await
118 .unwrap();
119
120 let receiver = ReceiverExecutor::new(
121 actor_ctx.clone(),
122 fragment_id,
123 upstream_fragment_id,
124 input,
125 barrier_test_env.local_barrier_manager.clone(),
126 metrics.clone(),
127 barrier_test_env
128 .local_barrier_manager
129 .subscribe_barrier(actor_id),
130 )
131 .boxed()
132 .execute();
133
134 pin_mut!(receiver);
135
136 let mut txs = HashMap::new();
137 macro_rules! send {
138 ($actors:expr, $msg:expr) => {
139 for actor in $actors {
140 txs.get(&actor).unwrap().send($msg).await.unwrap();
141 }
142 };
143 }
144 macro_rules! send_error {
145 ($actors:expr, $msg:expr) => {
146 for actor in $actors {
147 txs.get(&actor).unwrap().send($msg).await.unwrap_err();
148 }
149 };
150 }
151 macro_rules! assert_recv_pending {
152 () => {
153 assert!(
154 receiver
155 .next()
156 .now_or_never()
157 .flatten()
158 .transpose()
159 .unwrap()
160 .is_none()
161 );
162 };
163 }
164
165 macro_rules! recv {
166 () => {
167 receiver.next().await.transpose().unwrap()
168 };
169 }
170
171 macro_rules! collect_upstream_tx {
172 ($actors:expr) => {
173 for upstream_id in $actors {
174 let mut output_requests = barrier_test_env
175 .take_pending_new_output_requests(upstream_id.into())
176 .await;
177 assert_eq!(output_requests.len(), 1);
178 let (downstream_actor_id, request) = output_requests.pop().unwrap();
179 assert_eq!(downstream_actor_id, actor_id);
180 let NewOutputRequest::Local(tx) = request else {
181 unreachable!()
182 };
183 txs.insert(upstream_id, tx);
184 }
185 };
186 }
187
188 assert_recv_pending!();
189 barrier_test_env.flush_all_events().await;
190
191 collect_upstream_tx!([old]);
193
194 send!([old], Message::Chunk(StreamChunk::default()).into());
196 recv!().unwrap().as_chunk().unwrap(); assert_recv_pending!();
198
199 send!([old], Message::Barrier(b1.clone().into_dispatcher()).into());
200 assert_recv_pending!(); collect_upstream_tx!([new]);
203
204 send!([new], Message::Barrier(b1.clone().into_dispatcher()).into());
205 recv!().unwrap().as_barrier().unwrap(); send_error!([old], Message::Chunk(StreamChunk::default()).into());
209 assert_recv_pending!();
210
211 send!([new], Message::Chunk(StreamChunk::default()).into());
213 recv!().unwrap().as_chunk().unwrap(); assert_recv_pending!();
215 }
216}