risingwave_common/hash/consistent_hash/
mapping.rs1use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::fmt::{Debug, Display, Formatter};
17use std::hash::Hash;
18use std::ops::Index;
19
20use educe::Educe;
21use itertools::Itertools;
22use risingwave_pb::common::PbWorkerSlotMapping;
23use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto;
24
25use super::bitmap::VnodeBitmapExt;
26use crate::bitmap::{Bitmap, BitmapBuilder};
27use crate::hash::VirtualNode;
28pub use crate::id::ActorId;
29use crate::id::WorkerId;
30use crate::util::compress::compress_data;
31use crate::util::iter_util::ZipEqDebug;
32
33#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
34pub struct ActorAlignmentId(u64);
35
36impl ActorAlignmentId {
37 pub fn worker_id(&self) -> WorkerId {
38 ((self.0 >> 32) as u32).into()
39 }
40
41 pub fn actor_idx(&self) -> u32 {
42 self.0 as u32
43 }
44
45 pub fn new(worker_id: WorkerId, actor_idx: usize) -> Self {
46 Self((worker_id.as_raw_id() as u64) << 32 | actor_idx as u64)
47 }
48
49 pub fn new_single(worker_id: WorkerId) -> Self {
50 Self::new(worker_id, 0)
51 }
52}
53
54impl From<ActorAlignmentId> for u64 {
55 fn from(id: ActorAlignmentId) -> Self {
56 id.0
57 }
58}
59
60impl From<u64> for ActorAlignmentId {
61 fn from(id: u64) -> Self {
62 Self(id)
63 }
64}
65
66impl Display for ActorAlignmentId {
67 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68 f.write_fmt(format_args!("[{}/{}]", self.worker_id(), self.actor_idx()))
69 }
70}
71
72impl Debug for ActorAlignmentId {
73 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74 f.write_fmt(format_args!("[{}/{}]", self.worker_id(), self.actor_idx()))
75 }
76}
77
78#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
79pub struct WorkerSlotId(u64);
80
81impl WorkerSlotId {
82 pub fn worker_id(&self) -> WorkerId {
83 WorkerId::new((self.0 >> 32) as u32)
84 }
85
86 pub fn slot_idx(&self) -> u32 {
87 self.0 as u32
88 }
89
90 pub fn new(worker_id: WorkerId, slot_idx: usize) -> Self {
91 Self((worker_id.as_raw_id() as u64) << 32 | slot_idx as u64)
92 }
93}
94
95impl From<WorkerSlotId> for u64 {
96 fn from(id: WorkerSlotId) -> Self {
97 id.0
98 }
99}
100
101impl From<u64> for WorkerSlotId {
102 fn from(id: u64) -> Self {
103 Self(id)
104 }
105}
106
107impl Display for WorkerSlotId {
108 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109 f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
110 }
111}
112
113impl Debug for WorkerSlotId {
114 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115 f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
116 }
117}
118
119pub trait VnodeMappingItem {
121 type Item: Copy + Ord + Hash + Debug;
126}
127
128pub type ExpandedMapping<T> = Vec<<T as VnodeMappingItem>::Item>;
131
132#[derive(Educe)]
137#[educe(Debug, Clone, PartialEq, Eq, Hash)]
138pub struct VnodeMapping<T: VnodeMappingItem> {
139 original_indices: Vec<u32>,
140 data: Vec<T::Item>,
141}
142
143#[expect(
144 clippy::len_without_is_empty,
145 reason = "empty vnode mapping makes no sense"
146)]
147impl<T: VnodeMappingItem> VnodeMapping<T> {
148 pub fn new_uniform(items: impl ExactSizeIterator<Item = T::Item>, vnode_count: usize) -> Self {
153 assert!(items.len() <= vnode_count);
156
157 let mut original_indices = Vec::with_capacity(items.len());
158 let mut data = Vec::with_capacity(items.len());
159
160 let hash_shard_size = vnode_count / items.len();
161 let mut one_more_count = vnode_count % items.len();
162 let mut init_bound = 0;
163
164 for item in items {
165 let count = if one_more_count > 0 {
166 one_more_count -= 1;
167 hash_shard_size + 1
168 } else {
169 hash_shard_size
170 };
171 init_bound += count;
172
173 original_indices.push(init_bound as u32 - 1);
174 data.push(item);
175 }
176
177 debug_assert_eq!(data.iter().duplicates().count(), 0);
179
180 Self {
181 original_indices,
182 data,
183 }
184 }
185
186 pub fn new_single(item: T::Item) -> Self {
191 Self::new_uniform(std::iter::once(item), 1)
192 }
193
194 pub fn len(&self) -> usize {
196 self.original_indices
197 .last()
198 .map(|&i| i as usize + 1)
199 .unwrap_or(0)
200 }
201
202 pub fn get(&self, vnode: VirtualNode) -> T::Item {
207 self[vnode]
208 }
209
210 pub fn get_matched(&self, bitmap: &Bitmap) -> Option<T::Item> {
213 bitmap
214 .iter_vnodes()
215 .next() .map(|v| self.get(v))
217 }
218
219 pub fn iter(&self) -> impl Iterator<Item = T::Item> + '_ {
221 self.data
222 .iter()
223 .copied()
224 .zip_eq_debug(
225 std::iter::once(0)
226 .chain(self.original_indices.iter().copied().map(|i| i + 1))
227 .tuple_windows()
228 .map(|(a, b)| (b - a) as usize),
229 )
230 .flat_map(|(item, c)| std::iter::repeat_n(item, c))
231 }
232
233 pub fn iter_with_vnode(&self) -> impl Iterator<Item = (VirtualNode, T::Item)> + '_ {
235 self.iter()
236 .enumerate()
237 .map(|(v, item)| (VirtualNode::from_index(v), item))
238 }
239
240 pub fn iter_unique(&self) -> impl Iterator<Item = T::Item> + '_ {
242 self.data.iter().copied().sorted().dedup()
244 }
245
246 pub fn to_single(&self) -> Option<T::Item> {
248 self.data.iter().copied().dedup().exactly_one().ok()
249 }
250
251 pub fn to_bitmaps(&self) -> HashMap<T::Item, Bitmap> {
254 let vnode_count = self.len();
255 let mut vnode_bitmaps = HashMap::new();
256
257 for (vnode, item) in self.iter_with_vnode() {
258 vnode_bitmaps
259 .entry(item)
260 .or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
261 .set(vnode.to_index(), true);
262 }
263
264 vnode_bitmaps
265 .into_iter()
266 .map(|(item, b)| (item, b.finish()))
267 .collect()
268 }
269
270 pub fn from_bitmaps(bitmaps: &HashMap<T::Item, Bitmap>) -> Self {
273 let vnode_count = bitmaps.values().next().expect("empty bitmaps").len();
274 let mut items = vec![None; vnode_count];
275
276 for (&item, bitmap) in bitmaps {
277 assert_eq!(bitmap.len(), vnode_count);
278 for idx in bitmap.iter_ones() {
279 if let Some(prev) = items[idx].replace(item) {
280 panic!("mapping at index `{idx}` is set to both `{prev:?}` and `{item:?}`");
281 }
282 }
283 }
284
285 let items = items
286 .into_iter()
287 .enumerate()
288 .map(|(i, o)| o.unwrap_or_else(|| panic!("mapping at index `{i}` is not set")))
289 .collect_vec();
290 Self::from_expanded(&items)
291 }
292
293 pub fn from_expanded(items: &[T::Item]) -> Self {
295 let (original_indices, data) = compress_data(items);
296 Self {
297 original_indices,
298 data,
299 }
300 }
301
302 pub fn to_expanded(&self) -> ExpandedMapping<T> {
304 self.iter().collect()
305 }
306
307 pub fn transform<T2, M>(&self, to_map: &M) -> VnodeMapping<T2>
310 where
311 T2: VnodeMappingItem,
312 M: for<'a> Index<&'a T::Item, Output = T2::Item>,
313 {
314 VnodeMapping {
315 original_indices: self.original_indices.clone(),
316 data: self.data.iter().map(|item| to_map[item]).collect(),
317 }
318 }
319}
320
321impl<T: VnodeMappingItem> Index<VirtualNode> for VnodeMapping<T> {
322 type Output = T::Item;
323
324 fn index(&self, vnode: VirtualNode) -> &Self::Output {
325 let index = self
326 .original_indices
327 .partition_point(|&i| i < vnode.to_index() as u32);
328 &self.data[index]
329 }
330}
331
332pub mod marker {
333 use super::*;
334
335 pub struct Actor;
337 impl VnodeMappingItem for Actor {
338 type Item = ActorId;
339 }
340
341 pub struct WorkerSlot;
343 impl VnodeMappingItem for WorkerSlot {
344 type Item = WorkerSlotId;
345 }
346
347 pub struct ActorAlignment;
349 impl VnodeMappingItem for ActorAlignment {
350 type Item = ActorAlignmentId;
351 }
352}
353
354pub type ActorMapping = VnodeMapping<marker::Actor>;
356pub type ExpandedActorMapping = ExpandedMapping<marker::Actor>;
358
359pub type WorkerSlotMapping = VnodeMapping<marker::WorkerSlot>;
361pub type ExpandedWorkerSlotMapping = ExpandedMapping<marker::WorkerSlot>;
363
364pub type ActorAlignmentMapping = VnodeMapping<marker::ActorAlignment>;
366pub type ExpandedActorAlignment = ExpandedMapping<marker::ActorAlignment>;
368
369impl ActorMapping {
370 pub fn to_worker_slot(
372 &self,
373 actor_to_worker: &HashMap<ActorId, WorkerId>,
374 ) -> WorkerSlotMapping {
375 let mut worker_actors = HashMap::new();
376 for actor_id in self.iter_unique() {
377 let worker_id = actor_to_worker
378 .get(&actor_id)
379 .cloned()
380 .unwrap_or_else(|| panic!("location for actor {} not found", actor_id));
381
382 worker_actors
383 .entry(worker_id)
384 .or_insert(BTreeSet::new())
385 .insert(actor_id);
386 }
387
388 let mut actor_location = HashMap::new();
389 for (worker, actors) in worker_actors {
390 for (idx, &actor) in actors.iter().enumerate() {
391 actor_location.insert(actor, WorkerSlotId::new(worker, idx));
392 }
393 }
394
395 self.transform(&actor_location)
396 }
397
398 pub fn to_actor_alignment(
400 &self,
401 actor_to_worker: &HashMap<ActorId, WorkerId>,
402 ) -> ActorAlignmentMapping {
403 let mut worker_actors = HashMap::new();
404
405 for (idx, actor_id) in self.iter_unique().enumerate() {
406 let worker_id = actor_to_worker
407 .get(&actor_id)
408 .cloned()
409 .unwrap_or_else(|| panic!("location for actor {} not found", actor_id));
410
411 worker_actors
412 .entry(worker_id)
413 .or_insert(BTreeSet::new())
414 .insert((actor_id, idx));
415 }
416
417 let mut actor_location = HashMap::new();
418 for (worker, idxes) in worker_actors {
419 for (actor, idx) in idxes {
420 actor_location.insert(actor, ActorAlignmentId::new(worker, idx));
421 }
422 }
423
424 self.transform(&actor_location)
425 }
426
427 pub fn from_protobuf(proto: &ActorMappingProto) -> Self {
429 assert_eq!(proto.original_indices.len(), proto.data.len());
430 Self {
431 original_indices: proto.original_indices.clone(),
432 data: proto.data.clone(),
433 }
434 }
435
436 pub fn to_protobuf(&self) -> ActorMappingProto {
438 ActorMappingProto {
439 original_indices: self.original_indices.clone(),
440 data: self.data.clone(),
441 }
442 }
443}
444
445impl WorkerSlotMapping {
446 pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId], vnode_count: usize) -> Self {
448 Self::new_uniform(worker_slot_ids.iter().cloned(), vnode_count)
449 }
450
451 pub fn from_protobuf(proto: &PbWorkerSlotMapping) -> Self {
453 assert_eq!(proto.original_indices.len(), proto.data.len());
454 Self {
455 original_indices: proto.original_indices.clone(),
456 data: proto.data.iter().map(|&id| WorkerSlotId(id)).collect(),
457 }
458 }
459
460 pub fn to_protobuf(&self) -> PbWorkerSlotMapping {
462 PbWorkerSlotMapping {
463 original_indices: self.original_indices.clone(),
464 data: self.data.iter().map(|id| id.0).collect(),
465 }
466 }
467}
468
469impl WorkerSlotMapping {
470 pub fn to_actor(&self, to_map: &HashMap<WorkerSlotId, ActorId>) -> ActorMapping {
472 self.transform(to_map)
473 }
474}
475
476impl ActorAlignmentMapping {
477 pub fn from_assignment(
478 assignment: BTreeMap<WorkerId, BTreeMap<usize, Vec<usize>>>,
479 vnode_size: usize,
480 ) -> Self {
481 let mut all_bitmaps = HashMap::new();
482
483 for (worker_id, actors) in &assignment {
484 for (actor_idx, vnodes) in actors {
485 let mut bitmap_builder = BitmapBuilder::zeroed(vnode_size);
486 vnodes
487 .iter()
488 .for_each(|vnode| bitmap_builder.set(*vnode, true));
489 all_bitmaps.insert(
490 ActorAlignmentId::new(*worker_id, *actor_idx),
491 bitmap_builder.finish(),
492 );
493 }
494 }
495
496 Self::from_bitmaps(&all_bitmaps)
497 }
498}
499
500#[cfg(test)]
501mod tests {
502 use std::iter::repeat_with;
503
504 use rand::Rng;
505
506 use super::*;
507
508 struct Test;
509 impl VnodeMappingItem for Test {
510 type Item = u32;
511 }
512
513 struct Test2;
514 impl VnodeMappingItem for Test2 {
515 type Item = u32;
516 }
517
518 type TestMapping = VnodeMapping<Test>;
519 type Test2Mapping = VnodeMapping<Test2>;
520
521 const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::COUNT_FOR_TEST];
522
523 fn uniforms() -> impl Iterator<Item = TestMapping> {
524 COUNTS
525 .iter()
526 .map(|&count| TestMapping::new_uniform(0..count as u32, VirtualNode::COUNT_FOR_TEST))
527 }
528
529 fn randoms() -> impl Iterator<Item = TestMapping> {
530 COUNTS.iter().map(|&count| {
531 let raw = repeat_with(|| rand::rng().random_range(0..count as u32))
532 .take(VirtualNode::COUNT_FOR_TEST)
533 .collect_vec();
534 TestMapping::from_expanded(&raw)
535 })
536 }
537
538 fn mappings() -> impl Iterator<Item = TestMapping> {
539 uniforms().chain(randoms())
540 }
541
542 #[test]
543 fn test_uniform() {
544 for vnode_mapping in uniforms() {
545 assert_eq!(vnode_mapping.len(), VirtualNode::COUNT_FOR_TEST);
546 let item_count = vnode_mapping.iter_unique().count();
547
548 let mut check: HashMap<u32, Vec<_>> = HashMap::new();
549 for (vnode, item) in vnode_mapping.iter_with_vnode() {
550 check.entry(item).or_default().push(vnode);
551 }
552
553 assert_eq!(check.len(), item_count);
554
555 let (min, max) = check
556 .values()
557 .map(|indexes| indexes.len())
558 .minmax()
559 .into_option()
560 .unwrap();
561
562 assert!(max - min <= 1);
563 }
564 }
565
566 #[test]
567 fn test_iter_with_get() {
568 for vnode_mapping in mappings() {
569 for (vnode, item) in vnode_mapping.iter_with_vnode() {
570 assert_eq!(vnode_mapping.get(vnode), item);
571 }
572 }
573 }
574
575 #[test]
576 fn test_from_to_bitmaps() {
577 for vnode_mapping in mappings() {
578 let bitmaps = vnode_mapping.to_bitmaps();
579 let new_vnode_mapping = TestMapping::from_bitmaps(&bitmaps);
580
581 assert_eq!(vnode_mapping, new_vnode_mapping);
582 }
583 }
584
585 #[test]
586 fn test_transform() {
587 for vnode_mapping in mappings() {
588 let transform_map: HashMap<_, _> = vnode_mapping
589 .iter_unique()
590 .map(|item| (item, item + 1))
591 .collect();
592 let vnode_mapping_2: Test2Mapping = vnode_mapping.transform(&transform_map);
593
594 for (item, item_2) in vnode_mapping.iter().zip_eq_debug(vnode_mapping_2.iter()) {
595 assert_eq!(item + 1, item_2);
596 }
597
598 let transform_back_map: HashMap<_, _> =
599 transform_map.into_iter().map(|(k, v)| (v, k)).collect();
600 let new_vnode_mapping: TestMapping = vnode_mapping_2.transform(&transform_back_map);
601
602 assert_eq!(vnode_mapping, new_vnode_mapping);
603 }
604 }
605}