risingwave_meta/stream/
test_scale.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests {
17    use std::collections::{BTreeSet, HashMap};
18
19    use itertools::Itertools;
20    use maplit::btreeset;
21    use risingwave_common::bitmap::Bitmap;
22    use risingwave_common::hash::{ActorMapping, VirtualNode};
23
24    use crate::model::ActorId;
25    use crate::stream::CustomActorInfo;
26    use crate::stream::scale::rebalance_actor_vnode;
27
28    fn simulated_parallelism(min: Option<usize>, max: Option<usize>) -> Vec<usize> {
29        let mut raw = vec![1, 3, 12, 42, VirtualNode::COUNT_FOR_TEST];
30        if let Some(min) = min {
31            raw.retain(|n| *n > min);
32            raw.push(min);
33        }
34        if let Some(max) = max {
35            raw.retain(|n| *n < max);
36            raw.push(max);
37        }
38        raw
39    }
40
41    fn build_fake_actors(actor_ids: Vec<ActorId>) -> Vec<CustomActorInfo> {
42        let actor_bitmaps =
43            ActorMapping::new_uniform(actor_ids.clone().into_iter(), VirtualNode::COUNT_FOR_TEST)
44                .to_bitmaps();
45        actor_ids
46            .iter()
47            .map(|actor_id| CustomActorInfo {
48                actor_id: *actor_id,
49                vnode_bitmap: actor_bitmaps.get(actor_id).cloned(),
50                ..Default::default()
51            })
52            .collect()
53    }
54
55    fn check_affinity_for_scale_in(bitmap: &Bitmap, actor: &CustomActorInfo) {
56        let prev_bitmap = actor.vnode_bitmap.as_ref().unwrap();
57
58        for idx in 0..VirtualNode::COUNT_FOR_TEST {
59            if prev_bitmap.is_set(idx) {
60                assert!(bitmap.is_set(idx));
61            }
62        }
63    }
64
65    fn check_bitmaps<T>(bitmaps: &HashMap<T, Bitmap>) {
66        let mut target = (0..VirtualNode::COUNT_FOR_TEST)
67            .map(|_| false)
68            .collect_vec();
69
70        for bitmap in bitmaps.values() {
71            for (idx, pos) in target.iter_mut().enumerate() {
72                if bitmap.is_set(idx) {
73                    // *pos should be false
74                    assert!(!*pos);
75                    *pos = true;
76                }
77            }
78        }
79
80        for (idx, b) in target.iter().enumerate() {
81            assert!(*b, "vnode {} should be set", idx);
82        }
83
84        let vnodes = bitmaps.values().map(|bitmap| bitmap.count_ones());
85        let (min, max) = vnodes.minmax().into_option().unwrap();
86
87        assert!((max - min) <= 1, "min {} max {}", min, max);
88    }
89
90    #[test]
91    fn test_build_actor_mapping() {
92        for parallelism in simulated_parallelism(None, None) {
93            let actor_ids = (0..parallelism as ActorId).collect_vec();
94            let actor_mapping =
95                ActorMapping::new_uniform(actor_ids.into_iter(), VirtualNode::COUNT_FOR_TEST);
96
97            assert_eq!(actor_mapping.len(), VirtualNode::COUNT_FOR_TEST);
98
99            let mut check: HashMap<u32, Vec<_>> = HashMap::new();
100            for (vnode, actor_id) in actor_mapping.iter_with_vnode() {
101                check.entry(actor_id).or_default().push(vnode);
102            }
103
104            assert_eq!(check.len(), parallelism);
105
106            let (min, max) = check
107                .values()
108                .map(|indexes| indexes.len())
109                .minmax()
110                .into_option()
111                .unwrap();
112
113            assert!(max - min <= 1);
114        }
115    }
116
117    fn generate_actor_mapping(parallelism: usize) -> (ActorMapping, HashMap<ActorId, Bitmap>) {
118        let actor_ids = (0..parallelism).map(|i| i as ActorId).collect_vec();
119        let actors = build_fake_actors(actor_ids);
120
121        let bitmaps: HashMap<_, _> = actors
122            .into_iter()
123            .map(|actor| {
124                (
125                    actor.actor_id as ActorId,
126                    actor.vnode_bitmap.unwrap().clone(),
127                )
128            })
129            .collect();
130
131        (ActorMapping::from_bitmaps(&bitmaps), bitmaps)
132    }
133
134    #[test]
135    fn test_actor_mapping_from_bitmaps() {
136        for parallelism in simulated_parallelism(None, None) {
137            let (actor_mapping, bitmaps) = generate_actor_mapping(parallelism);
138            check_bitmaps(&bitmaps);
139
140            for (actor_id, bitmap) in &bitmaps {
141                for (vnode, value) in actor_mapping.iter_with_vnode() {
142                    if bitmap.is_set(vnode.to_index()) {
143                        assert_eq!(value, *actor_id);
144                    }
145                }
146            }
147        }
148    }
149
150    #[test]
151    fn test_rebalance_empty() {
152        let actors = build_fake_actors((0..3 as ActorId).collect_vec());
153
154        // empty input
155        let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &BTreeSet::new());
156        assert_eq!(result.len(), actors.len());
157    }
158
159    #[test]
160    fn test_rebalance_scale_in() {
161        for parallelism in simulated_parallelism(Some(3), None) {
162            let actors = build_fake_actors((0..parallelism as ActorId).collect_vec());
163
164            // remove 1
165            let actors_to_remove = btreeset! {0};
166            let result = rebalance_actor_vnode(&actors, &actors_to_remove, &BTreeSet::new());
167            assert_eq!(result.len(), actors.len() - actors_to_remove.len());
168            check_bitmaps(&result);
169            check_affinity_for_scale_in(result.get(&(1 as ActorId)).unwrap(), &actors[1]);
170
171            // remove n-1
172            let actors_to_remove = (1..parallelism as ActorId).collect();
173            let result = rebalance_actor_vnode(&actors, &actors_to_remove, &BTreeSet::new());
174            assert_eq!(result.len(), 1);
175            check_bitmaps(&result);
176
177            let (_, bitmap) = result.iter().exactly_one().unwrap();
178            assert!(bitmap.all());
179        }
180    }
181
182    #[test]
183    fn test_rebalance_scale_out() {
184        for parallelism in simulated_parallelism(Some(3), Some(VirtualNode::COUNT_FOR_TEST - 1)) {
185            let actors = build_fake_actors((0..parallelism as ActorId).collect_vec());
186
187            // add 1
188            let actors_to_add = btreeset! {parallelism as ActorId};
189            let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &actors_to_add);
190            assert_eq!(result.len(), actors.len() + actors_to_add.len());
191            check_bitmaps(&result);
192
193            let actors = build_fake_actors((0..parallelism as ActorId).collect_vec());
194
195            // add to VirtualNode::COUNT_FOR_TEST
196            let actors_to_add =
197                (parallelism as ActorId..VirtualNode::COUNT_FOR_TEST as ActorId).collect();
198            let result = rebalance_actor_vnode(&actors, &BTreeSet::new(), &actors_to_add);
199            assert_eq!(result.len(), actors.len() + actors_to_add.len());
200            check_bitmaps(&result);
201        }
202    }
203
204    #[test]
205    fn test_rebalance_migration() {
206        for parallelism in simulated_parallelism(Some(3), None) {
207            let actors = build_fake_actors((0..parallelism as ActorId).collect_vec());
208
209            for idx in 0..parallelism {
210                let actors_to_remove = btreeset! {idx as ActorId};
211                let actors_to_add = btreeset! {parallelism as ActorId};
212                let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add);
213
214                assert_eq!(
215                    result.len(),
216                    actors.len() - actors_to_remove.len() + actors_to_add.len()
217                );
218
219                check_bitmaps(&result);
220
221                for actor in &actors {
222                    if actor.actor_id == idx as ActorId {
223                        continue;
224                    }
225
226                    let target_bitmap = result.get(&actor.actor_id).unwrap();
227                    let prev_bitmap = actor.vnode_bitmap.as_ref().unwrap();
228                    assert!(prev_bitmap.eq(target_bitmap));
229                }
230            }
231            let actors = build_fake_actors((0..parallelism as ActorId).collect_vec());
232
233            for migration_count in 1..parallelism {
234                let actors_to_remove = (0..migration_count as ActorId).collect();
235                let actors_to_add =
236                    (parallelism as ActorId..(parallelism + migration_count) as ActorId).collect();
237                let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add);
238
239                assert_eq!(
240                    result.len(),
241                    actors.len() - actors_to_remove.len() + actors_to_add.len()
242                );
243
244                check_bitmaps(&result);
245            }
246        }
247    }
248
249    #[test]
250    fn test_rebalance_scale() {
251        for parallelism in simulated_parallelism(Some(3), None) {
252            let actor_ids = (0..parallelism as ActorId).collect_vec();
253            let actors = build_fake_actors(actor_ids);
254
255            let parallelism = parallelism as ActorId;
256            let actors_to_remove = btreeset! {0};
257            let actors_to_add = btreeset! {parallelism, parallelism+1};
258            let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add);
259
260            assert_eq!(
261                result.len(),
262                actors.len() - actors_to_remove.len() + actors_to_add.len()
263            );
264            check_bitmaps(&result);
265
266            let actors_to_remove = btreeset! {0, 1};
267            let actors_to_add = btreeset! {parallelism};
268            let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add);
269
270            assert_eq!(
271                result.len(),
272                actors.len() - actors_to_remove.len() + actors_to_add.len()
273            );
274            check_bitmaps(&result);
275
276            check_affinity_for_scale_in(result.get(&(2 as ActorId)).unwrap(), &actors[2]);
277        }
278    }
279
280    #[test]
281    fn test_rebalance_scale_real() {
282        let actor_ids = (0..(VirtualNode::COUNT_FOR_TEST - 1) as ActorId).collect_vec();
283        let actors = build_fake_actors(actor_ids);
284        let actors_to_remove = btreeset! {0, 1};
285        let actors_to_add = btreeset! {255};
286        let result = rebalance_actor_vnode(&actors, &actors_to_remove, &actors_to_add);
287
288        check_bitmaps(&result);
289    }
290}