risingwave_common/hash/consistent_hash/
mapping.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::fmt::{Debug, Display, Formatter};
17use std::hash::Hash;
18use std::ops::Index;
19
20use educe::Educe;
21use itertools::Itertools;
22use risingwave_pb::common::PbWorkerSlotMapping;
23use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto;
24
25use super::bitmap::VnodeBitmapExt;
26use crate::bitmap::{Bitmap, BitmapBuilder};
27use crate::hash::VirtualNode;
28use crate::util::compress::compress_data;
29use crate::util::iter_util::ZipEqDebug;
30
31// TODO: find a better place for this.
32pub type ActorId = u32;
33
34#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
35pub struct ActorAlignmentId(u64);
36
37impl ActorAlignmentId {
38    pub fn worker_id(&self) -> u32 {
39        (self.0 >> 32) as u32
40    }
41
42    pub fn actor_idx(&self) -> u32 {
43        self.0 as u32
44    }
45
46    pub fn new(worker_id: u32, actor_idx: usize) -> Self {
47        Self((worker_id as u64) << 32 | actor_idx as u64)
48    }
49
50    pub fn new_single(worker_id: u32) -> Self {
51        Self::new(worker_id, 0)
52    }
53}
54
55impl From<ActorAlignmentId> for u64 {
56    fn from(id: ActorAlignmentId) -> Self {
57        id.0
58    }
59}
60
61impl From<u64> for ActorAlignmentId {
62    fn from(id: u64) -> Self {
63        Self(id)
64    }
65}
66
67impl Display for ActorAlignmentId {
68    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69        f.write_fmt(format_args!("[{}/{}]", self.worker_id(), self.actor_idx()))
70    }
71}
72
73impl Debug for ActorAlignmentId {
74    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
75        f.write_fmt(format_args!("[{}/{}]", self.worker_id(), self.actor_idx()))
76    }
77}
78
79#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
80pub struct WorkerSlotId(u64);
81
82impl WorkerSlotId {
83    pub fn worker_id(&self) -> u32 {
84        (self.0 >> 32) as u32
85    }
86
87    pub fn slot_idx(&self) -> u32 {
88        self.0 as u32
89    }
90
91    pub fn new(worker_id: u32, slot_idx: usize) -> Self {
92        Self((worker_id as u64) << 32 | slot_idx as u64)
93    }
94}
95
96impl From<WorkerSlotId> for u64 {
97    fn from(id: WorkerSlotId) -> Self {
98        id.0
99    }
100}
101
102impl From<u64> for WorkerSlotId {
103    fn from(id: u64) -> Self {
104        Self(id)
105    }
106}
107
108impl Display for WorkerSlotId {
109    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
110        f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
111    }
112}
113
114impl Debug for WorkerSlotId {
115    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
116        f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
117    }
118}
119
120/// Trait for items that can be used as keys in [`VnodeMapping`].
121pub trait VnodeMappingItem {
122    /// The type of the item.
123    ///
124    /// Currently, there are two types of items: [`WorkerSlotId`] and [`ActorId`]. We don't use
125    /// them directly as the generic parameter because they're the same type aliases.
126    type Item: Copy + Ord + Hash + Debug;
127}
128
129/// Exapnded mapping from virtual nodes to items, essentially a vector of items and can be indexed
130/// by virtual nodes.
131pub type ExpandedMapping<T> = Vec<<T as VnodeMappingItem>::Item>;
132
133/// Generic mapping from virtual nodes to items.
134///
135/// The representation is compressed as described in [`compress_data`], which is optimized for the
136/// mapping with a small number of items and good locality.
137#[derive(Educe)]
138#[educe(Debug, Clone, PartialEq, Eq, Hash)]
139pub struct VnodeMapping<T: VnodeMappingItem> {
140    original_indices: Vec<u32>,
141    data: Vec<T::Item>,
142}
143
144#[expect(
145    clippy::len_without_is_empty,
146    reason = "empty vnode mapping makes no sense"
147)]
148impl<T: VnodeMappingItem> VnodeMapping<T> {
149    /// Create a uniform vnode mapping with a **set** of items.
150    ///
151    /// For example, if `items` is `[0, 1, 2]`, and the total vnode count is 10, we'll generate
152    /// mapping like `[0, 0, 0, 0, 1, 1, 1, 2, 2, 2]`.
153    pub fn new_uniform(items: impl ExactSizeIterator<Item = T::Item>, vnode_count: usize) -> Self {
154        // If the number of items is greater than the total vnode count, no vnode will be mapped to
155        // some items and the mapping will be invalid.
156        assert!(items.len() <= vnode_count);
157
158        let mut original_indices = Vec::with_capacity(items.len());
159        let mut data = Vec::with_capacity(items.len());
160
161        let hash_shard_size = vnode_count / items.len();
162        let mut one_more_count = vnode_count % items.len();
163        let mut init_bound = 0;
164
165        for item in items {
166            let count = if one_more_count > 0 {
167                one_more_count -= 1;
168                hash_shard_size + 1
169            } else {
170                hash_shard_size
171            };
172            init_bound += count;
173
174            original_indices.push(init_bound as u32 - 1);
175            data.push(item);
176        }
177
178        // Assert that there's no duplicated items.
179        debug_assert_eq!(data.iter().duplicates().count(), 0);
180
181        Self {
182            original_indices,
183            data,
184        }
185    }
186
187    /// Create a vnode mapping with the single item and length of 1.
188    ///
189    /// Should only be used for singletons. If you want a different vnode count, call
190    /// [`VnodeMapping::new_uniform`] with `std::iter::once(item)` and desired length.
191    pub fn new_single(item: T::Item) -> Self {
192        Self::new_uniform(std::iter::once(item), 1)
193    }
194
195    /// The length (or count) of the vnode in this mapping.
196    pub fn len(&self) -> usize {
197        self.original_indices
198            .last()
199            .map(|&i| i as usize + 1)
200            .unwrap_or(0)
201    }
202
203    /// Get the item mapped to the given `vnode` by binary search.
204    ///
205    /// Note: to achieve better mapping performance, one should convert the mapping to the
206    /// [`ExpandedMapping`] first and directly access the item by index.
207    pub fn get(&self, vnode: VirtualNode) -> T::Item {
208        self[vnode]
209    }
210
211    /// Get the item matched by the virtual nodes indicated by high bits in the given `bitmap`.
212    /// Returns `None` if the no virtual node is set in the bitmap.
213    pub fn get_matched(&self, bitmap: &Bitmap) -> Option<T::Item> {
214        bitmap
215            .iter_vnodes()
216            .next() // only need to check the first one
217            .map(|v| self.get(v))
218    }
219
220    /// Iterate over all items in this mapping, in the order of vnodes.
221    pub fn iter(&self) -> impl Iterator<Item = T::Item> + '_ {
222        self.data
223            .iter()
224            .copied()
225            .zip_eq_debug(
226                std::iter::once(0)
227                    .chain(self.original_indices.iter().copied().map(|i| i + 1))
228                    .tuple_windows()
229                    .map(|(a, b)| (b - a) as usize),
230            )
231            .flat_map(|(item, c)| std::iter::repeat_n(item, c))
232    }
233
234    /// Iterate over all vnode-item pairs in this mapping.
235    pub fn iter_with_vnode(&self) -> impl Iterator<Item = (VirtualNode, T::Item)> + '_ {
236        self.iter()
237            .enumerate()
238            .map(|(v, item)| (VirtualNode::from_index(v), item))
239    }
240
241    /// Iterate over all unique items in this mapping. The order is deterministic.
242    pub fn iter_unique(&self) -> impl Iterator<Item = T::Item> + '_ {
243        // Note: we can't ensure there's no duplicated items in the `data` after some scaling.
244        self.data.iter().copied().sorted().dedup()
245    }
246
247    /// Returns the item if it's the only item in this mapping, otherwise returns `None`.
248    pub fn to_single(&self) -> Option<T::Item> {
249        self.data.iter().copied().dedup().exactly_one().ok()
250    }
251
252    /// Convert this vnode mapping to a mapping from items to bitmaps, where each bitmap represents
253    /// the vnodes mapped to the item.
254    pub fn to_bitmaps(&self) -> HashMap<T::Item, Bitmap> {
255        let vnode_count = self.len();
256        let mut vnode_bitmaps = HashMap::new();
257
258        for (vnode, item) in self.iter_with_vnode() {
259            vnode_bitmaps
260                .entry(item)
261                .or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
262                .set(vnode.to_index(), true);
263        }
264
265        vnode_bitmaps
266            .into_iter()
267            .map(|(item, b)| (item, b.finish()))
268            .collect()
269    }
270
271    /// Create a vnode mapping from the given mapping from items to bitmaps, where each bitmap
272    /// represents the vnodes mapped to the item.
273    pub fn from_bitmaps(bitmaps: &HashMap<T::Item, Bitmap>) -> Self {
274        let vnode_count = bitmaps.values().next().expect("empty bitmaps").len();
275        let mut items = vec![None; vnode_count];
276
277        for (&item, bitmap) in bitmaps {
278            assert_eq!(bitmap.len(), vnode_count);
279            for idx in bitmap.iter_ones() {
280                if let Some(prev) = items[idx].replace(item) {
281                    panic!("mapping at index `{idx}` is set to both `{prev:?}` and `{item:?}`");
282                }
283            }
284        }
285
286        let items = items
287            .into_iter()
288            .enumerate()
289            .map(|(i, o)| o.unwrap_or_else(|| panic!("mapping at index `{i}` is not set")))
290            .collect_vec();
291        Self::from_expanded(&items)
292    }
293
294    /// Create a vnode mapping from the expanded slice of items.
295    pub fn from_expanded(items: &[T::Item]) -> Self {
296        let (original_indices, data) = compress_data(items);
297        Self {
298            original_indices,
299            data,
300        }
301    }
302
303    /// Convert this vnode mapping to a expanded vector of items.
304    pub fn to_expanded(&self) -> ExpandedMapping<T> {
305        self.iter().collect()
306    }
307
308    /// Transform this vnode mapping to another type of vnode mapping, with the given mapping from
309    /// items of this mapping to items of the other mapping.
310    pub fn transform<T2, M>(&self, to_map: &M) -> VnodeMapping<T2>
311    where
312        T2: VnodeMappingItem,
313        M: for<'a> Index<&'a T::Item, Output = T2::Item>,
314    {
315        VnodeMapping {
316            original_indices: self.original_indices.clone(),
317            data: self.data.iter().map(|item| to_map[item]).collect(),
318        }
319    }
320}
321
322impl<T: VnodeMappingItem> Index<VirtualNode> for VnodeMapping<T> {
323    type Output = T::Item;
324
325    fn index(&self, vnode: VirtualNode) -> &Self::Output {
326        let index = self
327            .original_indices
328            .partition_point(|&i| i < vnode.to_index() as u32);
329        &self.data[index]
330    }
331}
332
333pub mod marker {
334    use super::*;
335
336    /// A marker type for items of [`ActorId`].
337    pub struct Actor;
338    impl VnodeMappingItem for Actor {
339        type Item = ActorId;
340    }
341
342    /// A marker type for items of [`WorkerSlotId`].
343    pub struct WorkerSlot;
344    impl VnodeMappingItem for WorkerSlot {
345        type Item = WorkerSlotId;
346    }
347
348    /// A marker type for items of [`ActorAlignmentId`].
349    pub struct ActorAlignment;
350    impl VnodeMappingItem for ActorAlignment {
351        type Item = ActorAlignmentId;
352    }
353}
354
355/// A mapping from [`VirtualNode`] to [`ActorId`].
356pub type ActorMapping = VnodeMapping<marker::Actor>;
357/// An expanded mapping from [`VirtualNode`] to [`ActorId`].
358pub type ExpandedActorMapping = ExpandedMapping<marker::Actor>;
359
360/// A mapping from [`VirtualNode`] to [`WorkerSlotId`].
361pub type WorkerSlotMapping = VnodeMapping<marker::WorkerSlot>;
362/// An expanded mapping from [`VirtualNode`] to [`WorkerSlotId`].
363pub type ExpandedWorkerSlotMapping = ExpandedMapping<marker::WorkerSlot>;
364
365/// A mapping from [`VirtualNode`] to [`ActorAlignmentId`].
366pub type ActorAlignmentMapping = VnodeMapping<marker::ActorAlignment>;
367/// An expanded mapping from [`VirtualNode`] to [`ActorAlignmentId`].
368pub type ExpandedActorAlignment = ExpandedMapping<marker::ActorAlignment>;
369
370impl ActorMapping {
371    /// Transform the actor mapping to the worker slot mapping. Note that the parameter is a mapping from actor to worker.
372    pub fn to_worker_slot(&self, actor_to_worker: &HashMap<ActorId, u32>) -> WorkerSlotMapping {
373        let mut worker_actors = HashMap::new();
374        for actor_id in self.iter_unique() {
375            let worker_id = actor_to_worker
376                .get(&actor_id)
377                .cloned()
378                .unwrap_or_else(|| panic!("location for actor {} not found", actor_id));
379
380            worker_actors
381                .entry(worker_id)
382                .or_insert(BTreeSet::new())
383                .insert(actor_id);
384        }
385
386        let mut actor_location = HashMap::new();
387        for (worker, actors) in worker_actors {
388            for (idx, &actor) in actors.iter().enumerate() {
389                actor_location.insert(actor, WorkerSlotId::new(worker, idx));
390            }
391        }
392
393        self.transform(&actor_location)
394    }
395
396    /// Transform the actor mapping to the actor alignment mapping. Note that the parameter is a mapping from actor to worker.
397    pub fn to_actor_alignment(
398        &self,
399        actor_to_worker: &HashMap<ActorId, u32>,
400    ) -> ActorAlignmentMapping {
401        let mut worker_actors = HashMap::new();
402
403        for (idx, actor_id) in self.iter_unique().enumerate() {
404            let worker_id = actor_to_worker
405                .get(&actor_id)
406                .cloned()
407                .unwrap_or_else(|| panic!("location for actor {} not found", actor_id));
408
409            worker_actors
410                .entry(worker_id)
411                .or_insert(BTreeSet::new())
412                .insert((actor_id, idx));
413        }
414
415        let mut actor_location = HashMap::new();
416        for (worker, idxes) in worker_actors {
417            for (actor, idx) in idxes {
418                actor_location.insert(actor, ActorAlignmentId::new(worker, idx));
419            }
420        }
421
422        self.transform(&actor_location)
423    }
424
425    /// Create an actor mapping from the protobuf representation.
426    pub fn from_protobuf(proto: &ActorMappingProto) -> Self {
427        assert_eq!(proto.original_indices.len(), proto.data.len());
428        Self {
429            original_indices: proto.original_indices.clone(),
430            data: proto.data.clone(),
431        }
432    }
433
434    /// Convert this actor mapping to the protobuf representation.
435    pub fn to_protobuf(&self) -> ActorMappingProto {
436        ActorMappingProto {
437            original_indices: self.original_indices.clone(),
438            data: self.data.clone(),
439        }
440    }
441}
442
443impl WorkerSlotMapping {
444    /// Create a uniform worker mapping from the given worker ids
445    pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId], vnode_count: usize) -> Self {
446        Self::new_uniform(worker_slot_ids.iter().cloned(), vnode_count)
447    }
448
449    /// Create a worker mapping from the protobuf representation.
450    pub fn from_protobuf(proto: &PbWorkerSlotMapping) -> Self {
451        assert_eq!(proto.original_indices.len(), proto.data.len());
452        Self {
453            original_indices: proto.original_indices.clone(),
454            data: proto.data.iter().map(|&id| WorkerSlotId(id)).collect(),
455        }
456    }
457
458    /// Convert this worker mapping to the protobuf representation.
459    pub fn to_protobuf(&self) -> PbWorkerSlotMapping {
460        PbWorkerSlotMapping {
461            original_indices: self.original_indices.clone(),
462            data: self.data.iter().map(|id| id.0).collect(),
463        }
464    }
465}
466
467impl WorkerSlotMapping {
468    /// Transform this worker slot mapping to an actor mapping, essentially `transform`.
469    pub fn to_actor(&self, to_map: &HashMap<WorkerSlotId, ActorId>) -> ActorMapping {
470        self.transform(to_map)
471    }
472}
473
474impl ActorAlignmentMapping {
475    pub fn from_assignment(
476        assignment: BTreeMap<u32, BTreeMap<usize, Vec<usize>>>,
477        vnode_size: usize,
478    ) -> Self {
479        let mut all_bitmaps = HashMap::new();
480
481        for (worker_id, actors) in &assignment {
482            for (actor_idx, vnodes) in actors {
483                let mut bitmap_builder = BitmapBuilder::zeroed(vnode_size);
484                vnodes
485                    .iter()
486                    .for_each(|vnode| bitmap_builder.set(*vnode, true));
487                all_bitmaps.insert(
488                    ActorAlignmentId::new(*worker_id, *actor_idx),
489                    bitmap_builder.finish(),
490                );
491            }
492        }
493
494        Self::from_bitmaps(&all_bitmaps)
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use std::iter::repeat_with;
501
502    use rand::Rng;
503
504    use super::*;
505
506    struct Test;
507    impl VnodeMappingItem for Test {
508        type Item = u32;
509    }
510
511    struct Test2;
512    impl VnodeMappingItem for Test2 {
513        type Item = u32;
514    }
515
516    type TestMapping = VnodeMapping<Test>;
517    type Test2Mapping = VnodeMapping<Test2>;
518
519    const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::COUNT_FOR_TEST];
520
521    fn uniforms() -> impl Iterator<Item = TestMapping> {
522        COUNTS
523            .iter()
524            .map(|&count| TestMapping::new_uniform(0..count as u32, VirtualNode::COUNT_FOR_TEST))
525    }
526
527    fn randoms() -> impl Iterator<Item = TestMapping> {
528        COUNTS.iter().map(|&count| {
529            let raw = repeat_with(|| rand::rng().random_range(0..count as u32))
530                .take(VirtualNode::COUNT_FOR_TEST)
531                .collect_vec();
532            TestMapping::from_expanded(&raw)
533        })
534    }
535
536    fn mappings() -> impl Iterator<Item = TestMapping> {
537        uniforms().chain(randoms())
538    }
539
540    #[test]
541    fn test_uniform() {
542        for vnode_mapping in uniforms() {
543            assert_eq!(vnode_mapping.len(), VirtualNode::COUNT_FOR_TEST);
544            let item_count = vnode_mapping.iter_unique().count();
545
546            let mut check: HashMap<u32, Vec<_>> = HashMap::new();
547            for (vnode, item) in vnode_mapping.iter_with_vnode() {
548                check.entry(item).or_default().push(vnode);
549            }
550
551            assert_eq!(check.len(), item_count);
552
553            let (min, max) = check
554                .values()
555                .map(|indexes| indexes.len())
556                .minmax()
557                .into_option()
558                .unwrap();
559
560            assert!(max - min <= 1);
561        }
562    }
563
564    #[test]
565    fn test_iter_with_get() {
566        for vnode_mapping in mappings() {
567            for (vnode, item) in vnode_mapping.iter_with_vnode() {
568                assert_eq!(vnode_mapping.get(vnode), item);
569            }
570        }
571    }
572
573    #[test]
574    fn test_from_to_bitmaps() {
575        for vnode_mapping in mappings() {
576            let bitmaps = vnode_mapping.to_bitmaps();
577            let new_vnode_mapping = TestMapping::from_bitmaps(&bitmaps);
578
579            assert_eq!(vnode_mapping, new_vnode_mapping);
580        }
581    }
582
583    #[test]
584    fn test_transform() {
585        for vnode_mapping in mappings() {
586            let transform_map: HashMap<_, _> = vnode_mapping
587                .iter_unique()
588                .map(|item| (item, item + 1))
589                .collect();
590            let vnode_mapping_2: Test2Mapping = vnode_mapping.transform(&transform_map);
591
592            for (item, item_2) in vnode_mapping.iter().zip_eq_debug(vnode_mapping_2.iter()) {
593                assert_eq!(item + 1, item_2);
594            }
595
596            let transform_back_map: HashMap<_, _> =
597                transform_map.into_iter().map(|(k, v)| (v, k)).collect();
598            let new_vnode_mapping: TestMapping = vnode_mapping_2.transform(&transform_back_map);
599
600            assert_eq!(vnode_mapping, new_vnode_mapping);
601        }
602    }
603}