risingwave_meta/stream/
test_scale.rs1#[cfg(test)]
16mod tests {
17 use std::collections::{BTreeSet, HashMap};
18
19 use itertools::Itertools;
20 use maplit::btreeset;
21 use risingwave_common::bitmap::Bitmap;
22 use risingwave_common::hash::{ActorMapping, VirtualNode};
23
24 use crate::model::ActorId;
25 use crate::stream::CustomActorInfo;
26 use crate::stream::scale::rebalance_actor_vnode;
27
28 fn simulated_parallelism(min: Option<usize>, max: Option<usize>) -> Vec<usize> {
29 let mut raw = vec![1, 3, 12, 42, VirtualNode::COUNT_FOR_TEST];
30 if let Some(min) = min {
31 raw.retain(|n| *n > min);
32 raw.push(min);
33 }
34 if let Some(max) = max {
35 raw.retain(|n| *n < max);
36 raw.push(max);
37 }
38 raw
39 }
40
41 fn build_fake_actors(actor_ids: Vec<ActorId>) -> Vec<CustomActorInfo> {
42 let actor_bitmaps =
43 ActorMapping::new_uniform(actor_ids.clone().into_iter(), VirtualNode::COUNT_FOR_TEST)
44 .to_bitmaps();
45 actor_ids
46 .iter()
47 .map(|actor_id| CustomActorInfo {
48 actor_id: *actor_id,
49 vnode_bitmap: actor_bitmaps.get(actor_id).cloned(),
50 ..Default::default()
51 })
52 .collect()
53 }
54
55 fn check_affinity_for_scale_in(bitmap: &Bitmap, actor: &CustomActorInfo) {
56 let prev_bitmap = actor.vnode_bitmap.as_ref().unwrap();
57
58 for idx in 0..VirtualNode::COUNT_FOR_TEST {
59 if prev_bitmap.is_set(idx) {
60 assert!(bitmap.is_set(idx));
61 }
62 }
63 }
64
65 fn check_bitmaps<T>(bitmaps: &HashMap<T, Bitmap>) {
66 let mut target = (0..VirtualNode::COUNT_FOR_TEST)
67 .map(|_| false)
68 .collect_vec();
69
70 for bitmap in bitmaps.values() {
71 for (idx, pos) in target.iter_mut().enumerate() {
72 if bitmap.is_set(idx) {
73 assert!(!*pos);
75 *pos = true;
76 }
77 }
78 }
79
80 for (idx, b) in target.iter().enumerate() {
81 assert!(*b, "vnode {} should be set", idx);
82 }
83
84 let vnodes = bitmaps.values().map(|bitmap| bitmap.count_ones());
85 let (min, max) = vnodes.minmax().into_option().unwrap();
86
87 assert!((max - min) <= 1, "min {} max {}", min, max);
88 }
89
90 #[test]
91 fn test_build_actor_mapping() {
92 for parallelism in simulated_parallelism(None, None) {
93 let actor_ids = (0..parallelism as ActorId).collect_vec();
94 let actor_mapping =
95 ActorMapping::new_uniform(actor_ids.into_iter(), VirtualNode::COUNT_FOR_TEST);
96
97 assert_eq!(actor_mapping.len(), VirtualNode::COUNT_FOR_TEST);
98
99 let mut check: HashMap<u32, Vec<_>> = HashMap::new();
100 for (vnode, actor_id) in actor_mapping.iter_with_vnode() {
101 check.entry(actor_id).or_default().push(vnode);
102 }
103
104 assert_eq!(check.len(), parallelism);
105
106 let (min, max) = check
107 .values()
108 .map(|indexes| indexes.len())
109 .minmax()
110 .into_option()
111 .unwrap();
112
113 assert!(max - min <= 1);
114 }
115 }
116
117 fn generate_actor_mapping(parallelism: usize) -> (ActorMapping, HashMap<ActorId, Bitmap>) {
118 let actor_ids = (0..parallelism).map(|i| i as ActorId).collect_vec();
119 let actors = build_fake_actors(actor_ids);
120
121 let bitmaps: HashMap<_, _> = actors
122 .into_iter()
123 .map(|actor| {
124 (
125 actor.actor_id as ActorId,
126 actor.vnode_bitmap.unwrap().clone(),
127 )
128 })
129 .collect();
130
131 (ActorMapping::from_bitmaps(&bitmaps), bitmaps)
132 }
133
134 #[test]
135 fn test_actor_mapping_from_bitmaps() {
136 for parallelism in simulated_parallelism(None, None) {
137 let (actor_mapping, bitmaps) = generate_actor_mapping(parallelism);
138 check_bitmaps(&bitmaps);
139
140 for (actor_id, bitmap) in &bitmaps {
141 for (vnode, value) in actor_mapping.iter_with_vnode() {
142 if bitmap.is_set(vnode.to_index()) {
143 assert_eq!(value, *actor_id);
144 }
145 }
146 }
147 }
148 }
149
150 #[test]
151 fn test_rebalance_empty() {
152 let actors = build_fake_actors((0..3 as ActorId).collect_vec());
153
154 let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &BTreeSet::new());
156 assert_eq!(result.len(), actors.len());
157 }
158
159 #[test]
160 fn test_rebalance_scale_in() {
161 for parallelism in simulated_parallelism(Some(3), None) {
162 let actors = build_fake_actors((0..parallelism as ActorId).collect_vec());
163
164 let actors_to_remove = btreeset! {0};
166 let result = rebalance_actor_vnode(&actors, &actors_to_remove, &BTreeSet::new());
167 assert_eq!(result.len(), actors.len() - actors_to_remove.len());
168 check_bitmaps(&result);
169 check_affinity_for_scale_in(result.get(&(1 as ActorId)).unwrap(), &actors[1]);
170
171 let actors_to_remove = (1..parallelism as ActorId).collect();
173 let result = rebalance_actor_vnode(&actors, &actors_to_remove, &BTreeSet::new());
174 assert_eq!(result.len(), 1);
175 check_bitmaps(&result);
176
177 let (_, bitmap) = result.iter().exactly_one().unwrap();
178 assert!(bitmap.all());
179 }
180 }
181
182 #[test]
183 fn test_rebalance_scale_out() {
184 for parallelism in simulated_parallelism(Some(3), Some(VirtualNode::COUNT_FOR_TEST - 1)) {
185 let actors = build_fake_actors((0..parallelism as ActorId).collect_vec());
186
187 let actors_to_add = btreeset! {parallelism as ActorId};
189 let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &actors_to_add);
190 assert_eq!(result.len(), actors.len() + actors_to_add.len());
191 check_bitmaps(&result);
192
193 let actors = build_fake_actors((0..parallelism as ActorId).collect_vec());
194
195 let actors_to_add =
197 (parallelism as ActorId..VirtualNode::COUNT_FOR_TEST as ActorId).collect();
198 let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &actors_to_add);
199 assert_eq!(result.len(), actors.len() + actors_to_add.len());
200 check_bitmaps(&result);
201 }
202 }
203
204 #[test]
205 fn test_rebalance_migration() {
206 for parallelism in simulated_parallelism(Some(3), None) {
207 let actors = build_fake_actors((0..parallelism as ActorId).collect_vec());
208
209 for idx in 0..parallelism {
210 let actors_to_remove = btreeset! {idx as ActorId};
211 let actors_to_add = btreeset! {parallelism as ActorId};
212 let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add);
213
214 assert_eq!(
215 result.len(),
216 actors.len() - actors_to_remove.len() + actors_to_add.len()
217 );
218
219 check_bitmaps(&result);
220
221 for actor in &actors {
222 if actor.actor_id == idx as ActorId {
223 continue;
224 }
225
226 let target_bitmap = result.get(&actor.actor_id).unwrap();
227 let prev_bitmap = actor.vnode_bitmap.as_ref().unwrap();
228 assert!(prev_bitmap.eq(target_bitmap));
229 }
230 }
231 let actors = build_fake_actors((0..parallelism as ActorId).collect_vec());
232
233 for migration_count in 1..parallelism {
234 let actors_to_remove = (0..migration_count as ActorId).collect();
235 let actors_to_add =
236 (parallelism as ActorId..(parallelism + migration_count) as ActorId).collect();
237 let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add);
238
239 assert_eq!(
240 result.len(),
241 actors.len() - actors_to_remove.len() + actors_to_add.len()
242 );
243
244 check_bitmaps(&result);
245 }
246 }
247 }
248
249 #[test]
250 fn test_rebalance_scale() {
251 for parallelism in simulated_parallelism(Some(3), None) {
252 let actor_ids = (0..parallelism as ActorId).collect_vec();
253 let actors = build_fake_actors(actor_ids);
254
255 let parallelism = parallelism as ActorId;
256 let actors_to_remove = btreeset! {0};
257 let actors_to_add = btreeset! {parallelism, parallelism+1};
258 let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add);
259
260 assert_eq!(
261 result.len(),
262 actors.len() - actors_to_remove.len() + actors_to_add.len()
263 );
264 check_bitmaps(&result);
265
266 let actors_to_remove = btreeset! {0, 1};
267 let actors_to_add = btreeset! {parallelism};
268 let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add);
269
270 assert_eq!(
271 result.len(),
272 actors.len() - actors_to_remove.len() + actors_to_add.len()
273 );
274 check_bitmaps(&result);
275
276 check_affinity_for_scale_in(result.get(&(2 as ActorId)).unwrap(), &actors[2]);
277 }
278 }
279
280 #[test]
281 fn test_rebalance_scale_real() {
282 let actor_ids = (0..(VirtualNode::COUNT_FOR_TEST - 1) as ActorId).collect_vec();
283 let actors = build_fake_actors(actor_ids);
284 let actors_to_remove = btreeset! {0, 1};
285 let actors_to_add = btreeset! {255};
286 let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add);
287
288 check_bitmaps(&result);
289 }
290}