risingwave_common/hash/consistent_hash/
mapping.rs1use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::fmt::{Debug, Display, Formatter};
17use std::hash::Hash;
18use std::ops::Index;
19
20use educe::Educe;
21use itertools::Itertools;
22use risingwave_pb::common::PbWorkerSlotMapping;
23use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto;
24
25use super::bitmap::VnodeBitmapExt;
26use crate::bitmap::{Bitmap, BitmapBuilder};
27use crate::hash::VirtualNode;
28use crate::util::compress::compress_data;
29use crate::util::iter_util::ZipEqDebug;
30
31pub type ActorId = u32;
33
34#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
35pub struct ActorAlignmentId(u64);
36
37impl ActorAlignmentId {
38 pub fn worker_id(&self) -> u32 {
39 (self.0 >> 32) as u32
40 }
41
42 pub fn actor_idx(&self) -> u32 {
43 self.0 as u32
44 }
45
46 pub fn new(worker_id: u32, actor_idx: usize) -> Self {
47 Self((worker_id as u64) << 32 | actor_idx as u64)
48 }
49
50 pub fn new_single(worker_id: u32) -> Self {
51 Self::new(worker_id, 0)
52 }
53}
54
55impl From<ActorAlignmentId> for u64 {
56 fn from(id: ActorAlignmentId) -> Self {
57 id.0
58 }
59}
60
61impl From<u64> for ActorAlignmentId {
62 fn from(id: u64) -> Self {
63 Self(id)
64 }
65}
66
67impl Display for ActorAlignmentId {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 f.write_fmt(format_args!("[{}/{}]", self.worker_id(), self.actor_idx()))
70 }
71}
72
73impl Debug for ActorAlignmentId {
74 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
75 f.write_fmt(format_args!("[{}/{}]", self.worker_id(), self.actor_idx()))
76 }
77}
78
79#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
80pub struct WorkerSlotId(u64);
81
82impl WorkerSlotId {
83 pub fn worker_id(&self) -> u32 {
84 (self.0 >> 32) as u32
85 }
86
87 pub fn slot_idx(&self) -> u32 {
88 self.0 as u32
89 }
90
91 pub fn new(worker_id: u32, slot_idx: usize) -> Self {
92 Self((worker_id as u64) << 32 | slot_idx as u64)
93 }
94}
95
96impl From<WorkerSlotId> for u64 {
97 fn from(id: WorkerSlotId) -> Self {
98 id.0
99 }
100}
101
102impl From<u64> for WorkerSlotId {
103 fn from(id: u64) -> Self {
104 Self(id)
105 }
106}
107
108impl Display for WorkerSlotId {
109 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
110 f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
111 }
112}
113
114impl Debug for WorkerSlotId {
115 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
116 f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
117 }
118}
119
120pub trait VnodeMappingItem {
122 type Item: Copy + Ord + Hash + Debug;
127}
128
129pub type ExpandedMapping<T> = Vec<<T as VnodeMappingItem>::Item>;
132
133#[derive(Educe)]
138#[educe(Debug, Clone, PartialEq, Eq, Hash)]
139pub struct VnodeMapping<T: VnodeMappingItem> {
140 original_indices: Vec<u32>,
141 data: Vec<T::Item>,
142}
143
144#[expect(
145 clippy::len_without_is_empty,
146 reason = "empty vnode mapping makes no sense"
147)]
148impl<T: VnodeMappingItem> VnodeMapping<T> {
149 pub fn new_uniform(items: impl ExactSizeIterator<Item = T::Item>, vnode_count: usize) -> Self {
154 assert!(items.len() <= vnode_count);
157
158 let mut original_indices = Vec::with_capacity(items.len());
159 let mut data = Vec::with_capacity(items.len());
160
161 let hash_shard_size = vnode_count / items.len();
162 let mut one_more_count = vnode_count % items.len();
163 let mut init_bound = 0;
164
165 for item in items {
166 let count = if one_more_count > 0 {
167 one_more_count -= 1;
168 hash_shard_size + 1
169 } else {
170 hash_shard_size
171 };
172 init_bound += count;
173
174 original_indices.push(init_bound as u32 - 1);
175 data.push(item);
176 }
177
178 debug_assert_eq!(data.iter().duplicates().count(), 0);
180
181 Self {
182 original_indices,
183 data,
184 }
185 }
186
187 pub fn new_single(item: T::Item) -> Self {
192 Self::new_uniform(std::iter::once(item), 1)
193 }
194
195 pub fn len(&self) -> usize {
197 self.original_indices
198 .last()
199 .map(|&i| i as usize + 1)
200 .unwrap_or(0)
201 }
202
203 pub fn get(&self, vnode: VirtualNode) -> T::Item {
208 self[vnode]
209 }
210
211 pub fn get_matched(&self, bitmap: &Bitmap) -> Option<T::Item> {
214 bitmap
215 .iter_vnodes()
216 .next() .map(|v| self.get(v))
218 }
219
220 pub fn iter(&self) -> impl Iterator<Item = T::Item> + '_ {
222 self.data
223 .iter()
224 .copied()
225 .zip_eq_debug(
226 std::iter::once(0)
227 .chain(self.original_indices.iter().copied().map(|i| i + 1))
228 .tuple_windows()
229 .map(|(a, b)| (b - a) as usize),
230 )
231 .flat_map(|(item, c)| std::iter::repeat_n(item, c))
232 }
233
234 pub fn iter_with_vnode(&self) -> impl Iterator<Item = (VirtualNode, T::Item)> + '_ {
236 self.iter()
237 .enumerate()
238 .map(|(v, item)| (VirtualNode::from_index(v), item))
239 }
240
241 pub fn iter_unique(&self) -> impl Iterator<Item = T::Item> + '_ {
243 self.data.iter().copied().sorted().dedup()
245 }
246
247 pub fn to_single(&self) -> Option<T::Item> {
249 self.data.iter().copied().dedup().exactly_one().ok()
250 }
251
252 pub fn to_bitmaps(&self) -> HashMap<T::Item, Bitmap> {
255 let vnode_count = self.len();
256 let mut vnode_bitmaps = HashMap::new();
257
258 for (vnode, item) in self.iter_with_vnode() {
259 vnode_bitmaps
260 .entry(item)
261 .or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
262 .set(vnode.to_index(), true);
263 }
264
265 vnode_bitmaps
266 .into_iter()
267 .map(|(item, b)| (item, b.finish()))
268 .collect()
269 }
270
271 pub fn from_bitmaps(bitmaps: &HashMap<T::Item, Bitmap>) -> Self {
274 let vnode_count = bitmaps.values().next().expect("empty bitmaps").len();
275 let mut items = vec![None; vnode_count];
276
277 for (&item, bitmap) in bitmaps {
278 assert_eq!(bitmap.len(), vnode_count);
279 for idx in bitmap.iter_ones() {
280 if let Some(prev) = items[idx].replace(item) {
281 panic!("mapping at index `{idx}` is set to both `{prev:?}` and `{item:?}`");
282 }
283 }
284 }
285
286 let items = items
287 .into_iter()
288 .enumerate()
289 .map(|(i, o)| o.unwrap_or_else(|| panic!("mapping at index `{i}` is not set")))
290 .collect_vec();
291 Self::from_expanded(&items)
292 }
293
294 pub fn from_expanded(items: &[T::Item]) -> Self {
296 let (original_indices, data) = compress_data(items);
297 Self {
298 original_indices,
299 data,
300 }
301 }
302
303 pub fn to_expanded(&self) -> ExpandedMapping<T> {
305 self.iter().collect()
306 }
307
308 pub fn transform<T2, M>(&self, to_map: &M) -> VnodeMapping<T2>
311 where
312 T2: VnodeMappingItem,
313 M: for<'a> Index<&'a T::Item, Output = T2::Item>,
314 {
315 VnodeMapping {
316 original_indices: self.original_indices.clone(),
317 data: self.data.iter().map(|item| to_map[item]).collect(),
318 }
319 }
320}
321
322impl<T: VnodeMappingItem> Index<VirtualNode> for VnodeMapping<T> {
323 type Output = T::Item;
324
325 fn index(&self, vnode: VirtualNode) -> &Self::Output {
326 let index = self
327 .original_indices
328 .partition_point(|&i| i < vnode.to_index() as u32);
329 &self.data[index]
330 }
331}
332
333pub mod marker {
334 use super::*;
335
336 pub struct Actor;
338 impl VnodeMappingItem for Actor {
339 type Item = ActorId;
340 }
341
342 pub struct WorkerSlot;
344 impl VnodeMappingItem for WorkerSlot {
345 type Item = WorkerSlotId;
346 }
347
348 pub struct ActorAlignment;
350 impl VnodeMappingItem for ActorAlignment {
351 type Item = ActorAlignmentId;
352 }
353}
354
355pub type ActorMapping = VnodeMapping<marker::Actor>;
357pub type ExpandedActorMapping = ExpandedMapping<marker::Actor>;
359
360pub type WorkerSlotMapping = VnodeMapping<marker::WorkerSlot>;
362pub type ExpandedWorkerSlotMapping = ExpandedMapping<marker::WorkerSlot>;
364
365pub type ActorAlignmentMapping = VnodeMapping<marker::ActorAlignment>;
367pub type ExpandedActorAlignment = ExpandedMapping<marker::ActorAlignment>;
369
370impl ActorMapping {
371 pub fn to_worker_slot(&self, actor_to_worker: &HashMap<ActorId, u32>) -> WorkerSlotMapping {
373 let mut worker_actors = HashMap::new();
374 for actor_id in self.iter_unique() {
375 let worker_id = actor_to_worker
376 .get(&actor_id)
377 .cloned()
378 .unwrap_or_else(|| panic!("location for actor {} not found", actor_id));
379
380 worker_actors
381 .entry(worker_id)
382 .or_insert(BTreeSet::new())
383 .insert(actor_id);
384 }
385
386 let mut actor_location = HashMap::new();
387 for (worker, actors) in worker_actors {
388 for (idx, &actor) in actors.iter().enumerate() {
389 actor_location.insert(actor, WorkerSlotId::new(worker, idx));
390 }
391 }
392
393 self.transform(&actor_location)
394 }
395
396 pub fn to_actor_alignment(
398 &self,
399 actor_to_worker: &HashMap<ActorId, u32>,
400 ) -> ActorAlignmentMapping {
401 let mut worker_actors = HashMap::new();
402
403 for (idx, actor_id) in self.iter_unique().enumerate() {
404 let worker_id = actor_to_worker
405 .get(&actor_id)
406 .cloned()
407 .unwrap_or_else(|| panic!("location for actor {} not found", actor_id));
408
409 worker_actors
410 .entry(worker_id)
411 .or_insert(BTreeSet::new())
412 .insert((actor_id, idx));
413 }
414
415 let mut actor_location = HashMap::new();
416 for (worker, idxes) in worker_actors {
417 for (actor, idx) in idxes {
418 actor_location.insert(actor, ActorAlignmentId::new(worker, idx));
419 }
420 }
421
422 self.transform(&actor_location)
423 }
424
425 pub fn from_protobuf(proto: &ActorMappingProto) -> Self {
427 assert_eq!(proto.original_indices.len(), proto.data.len());
428 Self {
429 original_indices: proto.original_indices.clone(),
430 data: proto.data.clone(),
431 }
432 }
433
434 pub fn to_protobuf(&self) -> ActorMappingProto {
436 ActorMappingProto {
437 original_indices: self.original_indices.clone(),
438 data: self.data.clone(),
439 }
440 }
441}
442
443impl WorkerSlotMapping {
444 pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId], vnode_count: usize) -> Self {
446 Self::new_uniform(worker_slot_ids.iter().cloned(), vnode_count)
447 }
448
449 pub fn from_protobuf(proto: &PbWorkerSlotMapping) -> Self {
451 assert_eq!(proto.original_indices.len(), proto.data.len());
452 Self {
453 original_indices: proto.original_indices.clone(),
454 data: proto.data.iter().map(|&id| WorkerSlotId(id)).collect(),
455 }
456 }
457
458 pub fn to_protobuf(&self) -> PbWorkerSlotMapping {
460 PbWorkerSlotMapping {
461 original_indices: self.original_indices.clone(),
462 data: self.data.iter().map(|id| id.0).collect(),
463 }
464 }
465}
466
467impl WorkerSlotMapping {
468 pub fn to_actor(&self, to_map: &HashMap<WorkerSlotId, ActorId>) -> ActorMapping {
470 self.transform(to_map)
471 }
472}
473
474impl ActorAlignmentMapping {
475 pub fn from_assignment(
476 assignment: BTreeMap<u32, BTreeMap<usize, Vec<usize>>>,
477 vnode_size: usize,
478 ) -> Self {
479 let mut all_bitmaps = HashMap::new();
480
481 for (worker_id, actors) in &assignment {
482 for (actor_idx, vnodes) in actors {
483 let mut bitmap_builder = BitmapBuilder::zeroed(vnode_size);
484 vnodes
485 .iter()
486 .for_each(|vnode| bitmap_builder.set(*vnode, true));
487 all_bitmaps.insert(
488 ActorAlignmentId::new(*worker_id, *actor_idx),
489 bitmap_builder.finish(),
490 );
491 }
492 }
493
494 Self::from_bitmaps(&all_bitmaps)
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use std::iter::repeat_with;
501
502 use rand::Rng;
503
504 use super::*;
505
506 struct Test;
507 impl VnodeMappingItem for Test {
508 type Item = u32;
509 }
510
511 struct Test2;
512 impl VnodeMappingItem for Test2 {
513 type Item = u32;
514 }
515
516 type TestMapping = VnodeMapping<Test>;
517 type Test2Mapping = VnodeMapping<Test2>;
518
519 const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::COUNT_FOR_TEST];
520
521 fn uniforms() -> impl Iterator<Item = TestMapping> {
522 COUNTS
523 .iter()
524 .map(|&count| TestMapping::new_uniform(0..count as u32, VirtualNode::COUNT_FOR_TEST))
525 }
526
527 fn randoms() -> impl Iterator<Item = TestMapping> {
528 COUNTS.iter().map(|&count| {
529 let raw = repeat_with(|| rand::rng().random_range(0..count as u32))
530 .take(VirtualNode::COUNT_FOR_TEST)
531 .collect_vec();
532 TestMapping::from_expanded(&raw)
533 })
534 }
535
536 fn mappings() -> impl Iterator<Item = TestMapping> {
537 uniforms().chain(randoms())
538 }
539
540 #[test]
541 fn test_uniform() {
542 for vnode_mapping in uniforms() {
543 assert_eq!(vnode_mapping.len(), VirtualNode::COUNT_FOR_TEST);
544 let item_count = vnode_mapping.iter_unique().count();
545
546 let mut check: HashMap<u32, Vec<_>> = HashMap::new();
547 for (vnode, item) in vnode_mapping.iter_with_vnode() {
548 check.entry(item).or_default().push(vnode);
549 }
550
551 assert_eq!(check.len(), item_count);
552
553 let (min, max) = check
554 .values()
555 .map(|indexes| indexes.len())
556 .minmax()
557 .into_option()
558 .unwrap();
559
560 assert!(max - min <= 1);
561 }
562 }
563
564 #[test]
565 fn test_iter_with_get() {
566 for vnode_mapping in mappings() {
567 for (vnode, item) in vnode_mapping.iter_with_vnode() {
568 assert_eq!(vnode_mapping.get(vnode), item);
569 }
570 }
571 }
572
573 #[test]
574 fn test_from_to_bitmaps() {
575 for vnode_mapping in mappings() {
576 let bitmaps = vnode_mapping.to_bitmaps();
577 let new_vnode_mapping = TestMapping::from_bitmaps(&bitmaps);
578
579 assert_eq!(vnode_mapping, new_vnode_mapping);
580 }
581 }
582
583 #[test]
584 fn test_transform() {
585 for vnode_mapping in mappings() {
586 let transform_map: HashMap<_, _> = vnode_mapping
587 .iter_unique()
588 .map(|item| (item, item + 1))
589 .collect();
590 let vnode_mapping_2: Test2Mapping = vnode_mapping.transform(&transform_map);
591
592 for (item, item_2) in vnode_mapping.iter().zip_eq_debug(vnode_mapping_2.iter()) {
593 assert_eq!(item + 1, item_2);
594 }
595
596 let transform_back_map: HashMap<_, _> =
597 transform_map.into_iter().map(|(k, v)| (v, k)).collect();
598 let new_vnode_mapping: TestMapping = vnode_mapping_2.transform(&transform_back_map);
599
600 assert_eq!(vnode_mapping, new_vnode_mapping);
601 }
602 }
603}