risingwave_common/hash/consistent_hash/
mapping.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::fmt::{Debug, Display, Formatter};
17use std::hash::Hash;
18use std::ops::Index;
19
20use educe::Educe;
21use itertools::Itertools;
22use risingwave_pb::common::PbWorkerSlotMapping;
23use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto;
24
25use super::bitmap::VnodeBitmapExt;
26use crate::bitmap::{Bitmap, BitmapBuilder};
27use crate::hash::VirtualNode;
28pub use crate::id::ActorId;
29use crate::id::WorkerId;
30use crate::util::compress::compress_data;
31use crate::util::iter_util::ZipEqDebug;
32
33#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
34pub struct ActorAlignmentId(u64);
35
36impl ActorAlignmentId {
37    pub fn worker_id(&self) -> WorkerId {
38        ((self.0 >> 32) as u32).into()
39    }
40
41    pub fn actor_idx(&self) -> u32 {
42        self.0 as u32
43    }
44
45    pub fn new(worker_id: WorkerId, actor_idx: usize) -> Self {
46        Self((worker_id.as_raw_id() as u64) << 32 | actor_idx as u64)
47    }
48
49    pub fn new_single(worker_id: WorkerId) -> Self {
50        Self::new(worker_id, 0)
51    }
52}
53
54impl From<ActorAlignmentId> for u64 {
55    fn from(id: ActorAlignmentId) -> Self {
56        id.0
57    }
58}
59
60impl From<u64> for ActorAlignmentId {
61    fn from(id: u64) -> Self {
62        Self(id)
63    }
64}
65
66impl Display for ActorAlignmentId {
67    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68        f.write_fmt(format_args!("[{}/{}]", self.worker_id(), self.actor_idx()))
69    }
70}
71
72impl Debug for ActorAlignmentId {
73    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74        f.write_fmt(format_args!("[{}/{}]", self.worker_id(), self.actor_idx()))
75    }
76}
77
78#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
79pub struct WorkerSlotId(u64);
80
81impl WorkerSlotId {
82    pub fn worker_id(&self) -> WorkerId {
83        WorkerId::new((self.0 >> 32) as u32)
84    }
85
86    pub fn slot_idx(&self) -> u32 {
87        self.0 as u32
88    }
89
90    pub fn new(worker_id: WorkerId, slot_idx: usize) -> Self {
91        Self((worker_id.as_raw_id() as u64) << 32 | slot_idx as u64)
92    }
93}
94
95impl From<WorkerSlotId> for u64 {
96    fn from(id: WorkerSlotId) -> Self {
97        id.0
98    }
99}
100
101impl From<u64> for WorkerSlotId {
102    fn from(id: u64) -> Self {
103        Self(id)
104    }
105}
106
107impl Display for WorkerSlotId {
108    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109        f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
110    }
111}
112
113impl Debug for WorkerSlotId {
114    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115        f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
116    }
117}
118
119/// Trait for items that can be used as keys in [`VnodeMapping`].
120pub trait VnodeMappingItem {
121    /// The type of the item.
122    ///
123    /// Currently, there are two types of items: [`WorkerSlotId`] and [`ActorId`]. We don't use
124    /// them directly as the generic parameter because they're the same type aliases.
125    type Item: Copy + Ord + Hash + Debug;
126}
127
128/// Expanded mapping from virtual nodes to items, essentially a vector of items and can be indexed
129/// by virtual nodes.
130pub type ExpandedMapping<T> = Vec<<T as VnodeMappingItem>::Item>;
131
132/// Generic mapping from virtual nodes to items.
133///
134/// The representation is compressed as described in [`compress_data`], which is optimized for the
135/// mapping with a small number of items and good locality.
136#[derive(Educe)]
137#[educe(Debug, Clone, PartialEq, Eq, Hash)]
138pub struct VnodeMapping<T: VnodeMappingItem> {
139    original_indices: Vec<u32>,
140    data: Vec<T::Item>,
141}
142
143#[expect(
144    clippy::len_without_is_empty,
145    reason = "empty vnode mapping makes no sense"
146)]
147impl<T: VnodeMappingItem> VnodeMapping<T> {
148    /// Create a uniform vnode mapping with a **set** of items.
149    ///
150    /// For example, if `items` is `[0, 1, 2]`, and the total vnode count is 10, we'll generate
151    /// mapping like `[0, 0, 0, 0, 1, 1, 1, 2, 2, 2]`.
152    pub fn new_uniform(items: impl ExactSizeIterator<Item = T::Item>, vnode_count: usize) -> Self {
153        // If the number of items is greater than the total vnode count, no vnode will be mapped to
154        // some items and the mapping will be invalid.
155        assert!(items.len() <= vnode_count);
156
157        let mut original_indices = Vec::with_capacity(items.len());
158        let mut data = Vec::with_capacity(items.len());
159
160        let hash_shard_size = vnode_count / items.len();
161        let mut one_more_count = vnode_count % items.len();
162        let mut init_bound = 0;
163
164        for item in items {
165            let count = if one_more_count > 0 {
166                one_more_count -= 1;
167                hash_shard_size + 1
168            } else {
169                hash_shard_size
170            };
171            init_bound += count;
172
173            original_indices.push(init_bound as u32 - 1);
174            data.push(item);
175        }
176
177        // Assert that there's no duplicated items.
178        debug_assert_eq!(data.iter().duplicates().count(), 0);
179
180        Self {
181            original_indices,
182            data,
183        }
184    }
185
186    /// Create a vnode mapping with the single item and length of 1.
187    ///
188    /// Should only be used for singletons. If you want a different vnode count, call
189    /// [`VnodeMapping::new_uniform`] with `std::iter::once(item)` and desired length.
190    pub fn new_single(item: T::Item) -> Self {
191        Self::new_uniform(std::iter::once(item), 1)
192    }
193
194    /// The length (or count) of the vnode in this mapping.
195    pub fn len(&self) -> usize {
196        self.original_indices
197            .last()
198            .map(|&i| i as usize + 1)
199            .unwrap_or(0)
200    }
201
202    /// Get the item mapped to the given `vnode` by binary search.
203    ///
204    /// Note: to achieve better mapping performance, one should convert the mapping to the
205    /// [`ExpandedMapping`] first and directly access the item by index.
206    pub fn get(&self, vnode: VirtualNode) -> T::Item {
207        self[vnode]
208    }
209
210    /// Get the item matched by the virtual nodes indicated by high bits in the given `bitmap`.
211    /// Returns `None` if the no virtual node is set in the bitmap.
212    pub fn get_matched(&self, bitmap: &Bitmap) -> Option<T::Item> {
213        bitmap
214            .iter_vnodes()
215            .next() // only need to check the first one
216            .map(|v| self.get(v))
217    }
218
219    /// Iterate over all items in this mapping, in the order of vnodes.
220    pub fn iter(&self) -> impl Iterator<Item = T::Item> + '_ {
221        self.data
222            .iter()
223            .copied()
224            .zip_eq_debug(
225                std::iter::once(0)
226                    .chain(self.original_indices.iter().copied().map(|i| i + 1))
227                    .tuple_windows()
228                    .map(|(a, b)| (b - a) as usize),
229            )
230            .flat_map(|(item, c)| std::iter::repeat_n(item, c))
231    }
232
233    /// Iterate over all vnode-item pairs in this mapping.
234    pub fn iter_with_vnode(&self) -> impl Iterator<Item = (VirtualNode, T::Item)> + '_ {
235        self.iter()
236            .enumerate()
237            .map(|(v, item)| (VirtualNode::from_index(v), item))
238    }
239
240    /// Iterate over all unique items in this mapping. The order is deterministic.
241    pub fn iter_unique(&self) -> impl Iterator<Item = T::Item> + '_ {
242        // Note: we can't ensure there's no duplicated items in the `data` after some scaling.
243        self.data.iter().copied().sorted().dedup()
244    }
245
246    /// Returns the item if it's the only item in this mapping, otherwise returns `None`.
247    pub fn to_single(&self) -> Option<T::Item> {
248        self.data.iter().copied().dedup().exactly_one().ok()
249    }
250
251    /// Convert this vnode mapping to a mapping from items to bitmaps, where each bitmap represents
252    /// the vnodes mapped to the item.
253    pub fn to_bitmaps(&self) -> HashMap<T::Item, Bitmap> {
254        let vnode_count = self.len();
255        let mut vnode_bitmaps = HashMap::new();
256
257        for (vnode, item) in self.iter_with_vnode() {
258            vnode_bitmaps
259                .entry(item)
260                .or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
261                .set(vnode.to_index(), true);
262        }
263
264        vnode_bitmaps
265            .into_iter()
266            .map(|(item, b)| (item, b.finish()))
267            .collect()
268    }
269
270    /// Create a vnode mapping from the given mapping from items to bitmaps, where each bitmap
271    /// represents the vnodes mapped to the item.
272    pub fn from_bitmaps(bitmaps: &HashMap<T::Item, Bitmap>) -> Self {
273        let vnode_count = bitmaps.values().next().expect("empty bitmaps").len();
274        let mut items = vec![None; vnode_count];
275
276        for (&item, bitmap) in bitmaps {
277            assert_eq!(bitmap.len(), vnode_count);
278            for idx in bitmap.iter_ones() {
279                if let Some(prev) = items[idx].replace(item) {
280                    panic!("mapping at index `{idx}` is set to both `{prev:?}` and `{item:?}`");
281                }
282            }
283        }
284
285        let items = items
286            .into_iter()
287            .enumerate()
288            .map(|(i, o)| o.unwrap_or_else(|| panic!("mapping at index `{i}` is not set")))
289            .collect_vec();
290        Self::from_expanded(&items)
291    }
292
293    /// Create a vnode mapping from the expanded slice of items.
294    pub fn from_expanded(items: &[T::Item]) -> Self {
295        let (original_indices, data) = compress_data(items);
296        Self {
297            original_indices,
298            data,
299        }
300    }
301
302    /// Convert this vnode mapping to a expanded vector of items.
303    pub fn to_expanded(&self) -> ExpandedMapping<T> {
304        self.iter().collect()
305    }
306
307    /// Transform this vnode mapping to another type of vnode mapping, with the given mapping from
308    /// items of this mapping to items of the other mapping.
309    pub fn transform<T2, M>(&self, to_map: &M) -> VnodeMapping<T2>
310    where
311        T2: VnodeMappingItem,
312        M: for<'a> Index<&'a T::Item, Output = T2::Item>,
313    {
314        VnodeMapping {
315            original_indices: self.original_indices.clone(),
316            data: self.data.iter().map(|item| to_map[item]).collect(),
317        }
318    }
319}
320
321impl<T: VnodeMappingItem> Index<VirtualNode> for VnodeMapping<T> {
322    type Output = T::Item;
323
324    fn index(&self, vnode: VirtualNode) -> &Self::Output {
325        let index = self
326            .original_indices
327            .partition_point(|&i| i < vnode.to_index() as u32);
328        &self.data[index]
329    }
330}
331
332pub mod marker {
333    use super::*;
334
335    /// A marker type for items of [`ActorId`].
336    pub struct Actor;
337    impl VnodeMappingItem for Actor {
338        type Item = ActorId;
339    }
340
341    /// A marker type for items of [`WorkerSlotId`].
342    pub struct WorkerSlot;
343    impl VnodeMappingItem for WorkerSlot {
344        type Item = WorkerSlotId;
345    }
346
347    /// A marker type for items of [`ActorAlignmentId`].
348    pub struct ActorAlignment;
349    impl VnodeMappingItem for ActorAlignment {
350        type Item = ActorAlignmentId;
351    }
352}
353
354/// A mapping from [`VirtualNode`] to [`ActorId`].
355pub type ActorMapping = VnodeMapping<marker::Actor>;
356/// An expanded mapping from [`VirtualNode`] to [`ActorId`].
357pub type ExpandedActorMapping = ExpandedMapping<marker::Actor>;
358
359/// A mapping from [`VirtualNode`] to [`WorkerSlotId`].
360pub type WorkerSlotMapping = VnodeMapping<marker::WorkerSlot>;
361/// An expanded mapping from [`VirtualNode`] to [`WorkerSlotId`].
362pub type ExpandedWorkerSlotMapping = ExpandedMapping<marker::WorkerSlot>;
363
364/// A mapping from [`VirtualNode`] to [`ActorAlignmentId`].
365pub type ActorAlignmentMapping = VnodeMapping<marker::ActorAlignment>;
366/// An expanded mapping from [`VirtualNode`] to [`ActorAlignmentId`].
367pub type ExpandedActorAlignment = ExpandedMapping<marker::ActorAlignment>;
368
369impl ActorMapping {
370    /// Transform the actor mapping to the worker slot mapping. Note that the parameter is a mapping from actor to worker.
371    pub fn to_worker_slot(
372        &self,
373        actor_to_worker: &HashMap<ActorId, WorkerId>,
374    ) -> WorkerSlotMapping {
375        let mut worker_actors = HashMap::new();
376        for actor_id in self.iter_unique() {
377            let worker_id = actor_to_worker
378                .get(&actor_id)
379                .cloned()
380                .unwrap_or_else(|| panic!("location for actor {} not found", actor_id));
381
382            worker_actors
383                .entry(worker_id)
384                .or_insert(BTreeSet::new())
385                .insert(actor_id);
386        }
387
388        let mut actor_location = HashMap::new();
389        for (worker, actors) in worker_actors {
390            for (idx, &actor) in actors.iter().enumerate() {
391                actor_location.insert(actor, WorkerSlotId::new(worker, idx));
392            }
393        }
394
395        self.transform(&actor_location)
396    }
397
398    /// Transform the actor mapping to the actor alignment mapping. Note that the parameter is a mapping from actor to worker.
399    pub fn to_actor_alignment(
400        &self,
401        actor_to_worker: &HashMap<ActorId, WorkerId>,
402    ) -> ActorAlignmentMapping {
403        let mut worker_actors = HashMap::new();
404
405        for (idx, actor_id) in self.iter_unique().enumerate() {
406            let worker_id = actor_to_worker
407                .get(&actor_id)
408                .cloned()
409                .unwrap_or_else(|| panic!("location for actor {} not found", actor_id));
410
411            worker_actors
412                .entry(worker_id)
413                .or_insert(BTreeSet::new())
414                .insert((actor_id, idx));
415        }
416
417        let mut actor_location = HashMap::new();
418        for (worker, idxes) in worker_actors {
419            for (actor, idx) in idxes {
420                actor_location.insert(actor, ActorAlignmentId::new(worker, idx));
421            }
422        }
423
424        self.transform(&actor_location)
425    }
426
427    /// Create an actor mapping from the protobuf representation.
428    pub fn from_protobuf(proto: &ActorMappingProto) -> Self {
429        assert_eq!(proto.original_indices.len(), proto.data.len());
430        Self {
431            original_indices: proto.original_indices.clone(),
432            data: proto.data.clone(),
433        }
434    }
435
436    /// Convert this actor mapping to the protobuf representation.
437    pub fn to_protobuf(&self) -> ActorMappingProto {
438        ActorMappingProto {
439            original_indices: self.original_indices.clone(),
440            data: self.data.clone(),
441        }
442    }
443}
444
445impl WorkerSlotMapping {
446    /// Create a uniform worker mapping from the given worker ids
447    pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId], vnode_count: usize) -> Self {
448        Self::new_uniform(worker_slot_ids.iter().cloned(), vnode_count)
449    }
450
451    /// Create a worker mapping from the protobuf representation.
452    pub fn from_protobuf(proto: &PbWorkerSlotMapping) -> Self {
453        assert_eq!(proto.original_indices.len(), proto.data.len());
454        Self {
455            original_indices: proto.original_indices.clone(),
456            data: proto.data.iter().map(|&id| WorkerSlotId(id)).collect(),
457        }
458    }
459
460    /// Convert this worker mapping to the protobuf representation.
461    pub fn to_protobuf(&self) -> PbWorkerSlotMapping {
462        PbWorkerSlotMapping {
463            original_indices: self.original_indices.clone(),
464            data: self.data.iter().map(|id| id.0).collect(),
465        }
466    }
467}
468
469impl WorkerSlotMapping {
470    /// Transform this worker slot mapping to an actor mapping, essentially `transform`.
471    pub fn to_actor(&self, to_map: &HashMap<WorkerSlotId, ActorId>) -> ActorMapping {
472        self.transform(to_map)
473    }
474}
475
476impl ActorAlignmentMapping {
477    pub fn from_assignment(
478        assignment: BTreeMap<WorkerId, BTreeMap<usize, Vec<usize>>>,
479        vnode_size: usize,
480    ) -> Self {
481        let mut all_bitmaps = HashMap::new();
482
483        for (worker_id, actors) in &assignment {
484            for (actor_idx, vnodes) in actors {
485                let mut bitmap_builder = BitmapBuilder::zeroed(vnode_size);
486                vnodes
487                    .iter()
488                    .for_each(|vnode| bitmap_builder.set(*vnode, true));
489                all_bitmaps.insert(
490                    ActorAlignmentId::new(*worker_id, *actor_idx),
491                    bitmap_builder.finish(),
492                );
493            }
494        }
495
496        Self::from_bitmaps(&all_bitmaps)
497    }
498}
499
500#[cfg(test)]
501mod tests {
502    use std::iter::repeat_with;
503
504    use rand::Rng;
505
506    use super::*;
507
508    struct Test;
509    impl VnodeMappingItem for Test {
510        type Item = u32;
511    }
512
513    struct Test2;
514    impl VnodeMappingItem for Test2 {
515        type Item = u32;
516    }
517
518    type TestMapping = VnodeMapping<Test>;
519    type Test2Mapping = VnodeMapping<Test2>;
520
521    const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::COUNT_FOR_TEST];
522
523    fn uniforms() -> impl Iterator<Item = TestMapping> {
524        COUNTS
525            .iter()
526            .map(|&count| TestMapping::new_uniform(0..count as u32, VirtualNode::COUNT_FOR_TEST))
527    }
528
529    fn randoms() -> impl Iterator<Item = TestMapping> {
530        COUNTS.iter().map(|&count| {
531            let raw = repeat_with(|| rand::rng().random_range(0..count as u32))
532                .take(VirtualNode::COUNT_FOR_TEST)
533                .collect_vec();
534            TestMapping::from_expanded(&raw)
535        })
536    }
537
538    fn mappings() -> impl Iterator<Item = TestMapping> {
539        uniforms().chain(randoms())
540    }
541
542    #[test]
543    fn test_uniform() {
544        for vnode_mapping in uniforms() {
545            assert_eq!(vnode_mapping.len(), VirtualNode::COUNT_FOR_TEST);
546            let item_count = vnode_mapping.iter_unique().count();
547
548            let mut check: HashMap<u32, Vec<_>> = HashMap::new();
549            for (vnode, item) in vnode_mapping.iter_with_vnode() {
550                check.entry(item).or_default().push(vnode);
551            }
552
553            assert_eq!(check.len(), item_count);
554
555            let (min, max) = check
556                .values()
557                .map(|indexes| indexes.len())
558                .minmax()
559                .into_option()
560                .unwrap();
561
562            assert!(max - min <= 1);
563        }
564    }
565
566    #[test]
567    fn test_iter_with_get() {
568        for vnode_mapping in mappings() {
569            for (vnode, item) in vnode_mapping.iter_with_vnode() {
570                assert_eq!(vnode_mapping.get(vnode), item);
571            }
572        }
573    }
574
575    #[test]
576    fn test_from_to_bitmaps() {
577        for vnode_mapping in mappings() {
578            let bitmaps = vnode_mapping.to_bitmaps();
579            let new_vnode_mapping = TestMapping::from_bitmaps(&bitmaps);
580
581            assert_eq!(vnode_mapping, new_vnode_mapping);
582        }
583    }
584
585    #[test]
586    fn test_transform() {
587        for vnode_mapping in mappings() {
588            let transform_map: HashMap<_, _> = vnode_mapping
589                .iter_unique()
590                .map(|item| (item, item + 1))
591                .collect();
592            let vnode_mapping_2: Test2Mapping = vnode_mapping.transform(&transform_map);
593
594            for (item, item_2) in vnode_mapping.iter().zip_eq_debug(vnode_mapping_2.iter()) {
595                assert_eq!(item + 1, item_2);
596            }
597
598            let transform_back_map: HashMap<_, _> =
599                transform_map.into_iter().map(|(k, v)| (v, k)).collect();
600            let new_vnode_mapping: TestMapping = vnode_mapping_2.transform(&transform_back_map);
601
602            assert_eq!(vnode_mapping, new_vnode_mapping);
603        }
604    }
605}