risingwave_common/hash/consistent_hash/
mapping.rs1use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::fmt::{Debug, Display, Formatter};
17use std::hash::Hash;
18use std::ops::Index;
19
20use educe::Educe;
21use itertools::Itertools;
22use risingwave_pb::common::PbWorkerSlotMapping;
23use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto;
24
25use super::bitmap::VnodeBitmapExt;
26use crate::bitmap::{Bitmap, BitmapBuilder};
27use crate::hash::VirtualNode;
28pub use crate::id::ActorId;
29use crate::id::WorkerId;
30use crate::util::compress::compress_data;
31use crate::util::iter_util::ZipEqDebug;
32
33#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
34pub struct ActorAlignmentId(u64);
35
36impl ActorAlignmentId {
37 pub fn worker_id(&self) -> WorkerId {
38 ((self.0 >> 32) as u32).into()
39 }
40
41 pub fn actor_idx(&self) -> u32 {
42 self.0 as u32
43 }
44
45 pub fn new(worker_id: WorkerId, actor_idx: usize) -> Self {
46 Self((worker_id.as_raw_id() as u64) << 32 | actor_idx as u64)
47 }
48
49 pub fn new_single(worker_id: WorkerId) -> Self {
50 Self::new(worker_id, 0)
51 }
52}
53
54impl From<ActorAlignmentId> for u64 {
55 fn from(id: ActorAlignmentId) -> Self {
56 id.0
57 }
58}
59
60impl From<u64> for ActorAlignmentId {
61 fn from(id: u64) -> Self {
62 Self(id)
63 }
64}
65
66impl Display for ActorAlignmentId {
67 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
68 f.write_fmt(format_args!("[{}/{}]", self.worker_id(), self.actor_idx()))
69 }
70}
71
72impl Debug for ActorAlignmentId {
73 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
74 f.write_fmt(format_args!("[{}/{}]", self.worker_id(), self.actor_idx()))
75 }
76}
77
78#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
79pub struct WorkerSlotId(u64);
80
81impl WorkerSlotId {
82 pub fn worker_id(&self) -> WorkerId {
83 WorkerId::new((self.0 >> 32) as u32)
84 }
85
86 pub fn slot_idx(&self) -> u32 {
87 self.0 as u32
88 }
89
90 pub fn new(worker_id: WorkerId, slot_idx: usize) -> Self {
91 Self((worker_id.as_raw_id() as u64) << 32 | slot_idx as u64)
92 }
93}
94
95impl From<WorkerSlotId> for u64 {
96 fn from(id: WorkerSlotId) -> Self {
97 id.0
98 }
99}
100
101impl From<u64> for WorkerSlotId {
102 fn from(id: u64) -> Self {
103 Self(id)
104 }
105}
106
107impl Display for WorkerSlotId {
108 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109 f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
110 }
111}
112
113impl Debug for WorkerSlotId {
114 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115 f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
116 }
117}
118
119pub trait VnodeMappingItem {
121 type Item: Copy + Ord + Hash + Debug;
126}
127
128pub type ExpandedMapping<T> = Vec<<T as VnodeMappingItem>::Item>;
131
132#[derive(Educe)]
137#[educe(Debug, Clone, PartialEq, Eq, Hash)]
138pub struct VnodeMapping<T: VnodeMappingItem> {
139 original_indices: Vec<u32>,
140 data: Vec<T::Item>,
141}
142
143#[expect(
144 clippy::len_without_is_empty,
145 reason = "empty vnode mapping makes no sense"
146)]
147impl<T: VnodeMappingItem> VnodeMapping<T> {
148 pub fn new_uniform(items: impl ExactSizeIterator<Item = T::Item>, vnode_count: usize) -> Self {
153 assert!(items.len() <= vnode_count);
156
157 let mut original_indices = Vec::with_capacity(items.len());
158 let mut data = Vec::with_capacity(items.len());
159
160 let hash_shard_size = vnode_count / items.len();
161 let mut one_more_count = vnode_count % items.len();
162 let mut init_bound = 0;
163
164 for item in items {
165 let count = if one_more_count > 0 {
166 one_more_count -= 1;
167 hash_shard_size + 1
168 } else {
169 hash_shard_size
170 };
171 init_bound += count;
172
173 original_indices.push(init_bound as u32 - 1);
174 data.push(item);
175 }
176
177 debug_assert_eq!(data.iter().duplicates().count(), 0);
179
180 Self {
181 original_indices,
182 data,
183 }
184 }
185
186 pub fn new_single(item: T::Item) -> Self {
191 Self::new_uniform(std::iter::once(item), 1)
192 }
193
194 pub fn len(&self) -> usize {
196 self.original_indices
197 .last()
198 .map(|&i| i as usize + 1)
199 .unwrap_or(0)
200 }
201
202 pub fn get(&self, vnode: VirtualNode) -> T::Item {
207 self[vnode]
208 }
209
210 pub fn get_matched(&self, bitmap: &Bitmap) -> Option<T::Item> {
213 bitmap
214 .iter_vnodes()
215 .next() .map(|v| self.get(v))
217 }
218
219 pub fn iter(&self) -> impl Iterator<Item = T::Item> + '_ {
221 self.data
222 .iter()
223 .copied()
224 .zip_eq_debug(
225 std::iter::once(0)
226 .chain(self.original_indices.iter().copied().map(|i| i + 1))
227 .tuple_windows()
228 .map(|(a, b)| (b - a) as usize),
229 )
230 .flat_map(|(item, c)| std::iter::repeat_n(item, c))
231 }
232
233 pub fn iter_with_vnode(&self) -> impl Iterator<Item = (VirtualNode, T::Item)> + '_ {
235 self.iter()
236 .enumerate()
237 .map(|(v, item)| (VirtualNode::from_index(v), item))
238 }
239
240 pub fn iter_unique(&self) -> impl Iterator<Item = T::Item> + '_ {
242 self.data.iter().copied().sorted().dedup()
244 }
245
246 pub fn to_single(&self) -> Option<T::Item> {
248 Itertools::exactly_one(self.data.iter().copied().dedup()).ok()
250 }
251
252 pub fn to_bitmaps(&self) -> HashMap<T::Item, Bitmap> {
255 let vnode_count = self.len();
256 let mut vnode_bitmaps = HashMap::new();
257
258 for (vnode, item) in self.iter_with_vnode() {
259 vnode_bitmaps
260 .entry(item)
261 .or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
262 .set(vnode.to_index(), true);
263 }
264
265 vnode_bitmaps
266 .into_iter()
267 .map(|(item, b)| (item, b.finish()))
268 .collect()
269 }
270
271 pub fn from_bitmaps(bitmaps: &HashMap<T::Item, Bitmap>) -> Self {
274 let vnode_count = bitmaps.values().next().expect("empty bitmaps").len();
275 let mut items = vec![None; vnode_count];
276
277 for (&item, bitmap) in bitmaps {
278 assert_eq!(bitmap.len(), vnode_count);
279 for idx in bitmap.iter_ones() {
280 if let Some(prev) = items[idx].replace(item) {
281 panic!("mapping at index `{idx}` is set to both `{prev:?}` and `{item:?}`");
282 }
283 }
284 }
285
286 let items = items
287 .into_iter()
288 .enumerate()
289 .map(|(i, o)| o.unwrap_or_else(|| panic!("mapping at index `{i}` is not set")))
290 .collect_vec();
291 Self::from_expanded(&items)
292 }
293
294 pub fn from_expanded(items: &[T::Item]) -> Self {
296 let (original_indices, data) = compress_data(items);
297 Self {
298 original_indices,
299 data,
300 }
301 }
302
303 pub fn to_expanded(&self) -> ExpandedMapping<T> {
305 self.iter().collect()
306 }
307
308 pub fn transform<T2, M>(&self, to_map: &M) -> VnodeMapping<T2>
311 where
312 T2: VnodeMappingItem,
313 M: for<'a> Index<&'a T::Item, Output = T2::Item>,
314 {
315 VnodeMapping {
316 original_indices: self.original_indices.clone(),
317 data: self.data.iter().map(|item| to_map[item]).collect(),
318 }
319 }
320}
321
322impl<T: VnodeMappingItem> Index<VirtualNode> for VnodeMapping<T> {
323 type Output = T::Item;
324
325 fn index(&self, vnode: VirtualNode) -> &Self::Output {
326 let index = self
327 .original_indices
328 .partition_point(|&i| i < vnode.to_index() as u32);
329 &self.data[index]
330 }
331}
332
333pub mod marker {
334 use super::*;
335
336 pub struct Actor;
338 impl VnodeMappingItem for Actor {
339 type Item = ActorId;
340 }
341
342 pub struct WorkerSlot;
344 impl VnodeMappingItem for WorkerSlot {
345 type Item = WorkerSlotId;
346 }
347
348 pub struct ActorAlignment;
350 impl VnodeMappingItem for ActorAlignment {
351 type Item = ActorAlignmentId;
352 }
353}
354
355pub type ActorMapping = VnodeMapping<marker::Actor>;
357pub type ExpandedActorMapping = ExpandedMapping<marker::Actor>;
359
360pub type WorkerSlotMapping = VnodeMapping<marker::WorkerSlot>;
362pub type ExpandedWorkerSlotMapping = ExpandedMapping<marker::WorkerSlot>;
364
365pub type ActorAlignmentMapping = VnodeMapping<marker::ActorAlignment>;
367pub type ExpandedActorAlignment = ExpandedMapping<marker::ActorAlignment>;
369
370impl ActorMapping {
371 pub fn to_worker_slot(
373 &self,
374 actor_to_worker: &HashMap<ActorId, WorkerId>,
375 ) -> WorkerSlotMapping {
376 let mut worker_actors = HashMap::new();
377 for actor_id in self.iter_unique() {
378 let worker_id = actor_to_worker
379 .get(&actor_id)
380 .cloned()
381 .unwrap_or_else(|| panic!("location for actor {} not found", actor_id));
382
383 worker_actors
384 .entry(worker_id)
385 .or_insert(BTreeSet::new())
386 .insert(actor_id);
387 }
388
389 let mut actor_location = HashMap::new();
390 for (worker, actors) in worker_actors {
391 for (idx, &actor) in actors.iter().enumerate() {
392 actor_location.insert(actor, WorkerSlotId::new(worker, idx));
393 }
394 }
395
396 self.transform(&actor_location)
397 }
398
399 pub fn to_actor_alignment(
401 &self,
402 actor_to_worker: &HashMap<ActorId, WorkerId>,
403 ) -> ActorAlignmentMapping {
404 let mut worker_actors = HashMap::new();
405
406 for (idx, actor_id) in self.iter_unique().enumerate() {
407 let worker_id = actor_to_worker
408 .get(&actor_id)
409 .cloned()
410 .unwrap_or_else(|| panic!("location for actor {} not found", actor_id));
411
412 worker_actors
413 .entry(worker_id)
414 .or_insert(BTreeSet::new())
415 .insert((actor_id, idx));
416 }
417
418 let mut actor_location = HashMap::new();
419 for (worker, idxes) in worker_actors {
420 for (actor, idx) in idxes {
421 actor_location.insert(actor, ActorAlignmentId::new(worker, idx));
422 }
423 }
424
425 self.transform(&actor_location)
426 }
427
428 pub fn from_protobuf(proto: &ActorMappingProto) -> Self {
430 assert_eq!(proto.original_indices.len(), proto.data.len());
431 Self {
432 original_indices: proto.original_indices.clone(),
433 data: proto.data.clone(),
434 }
435 }
436
437 pub fn to_protobuf(&self) -> ActorMappingProto {
439 ActorMappingProto {
440 original_indices: self.original_indices.clone(),
441 data: self.data.clone(),
442 }
443 }
444}
445
446impl WorkerSlotMapping {
447 pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId], vnode_count: usize) -> Self {
449 Self::new_uniform(worker_slot_ids.iter().cloned(), vnode_count)
450 }
451
452 pub fn from_protobuf(proto: &PbWorkerSlotMapping) -> Self {
454 assert_eq!(proto.original_indices.len(), proto.data.len());
455 Self {
456 original_indices: proto.original_indices.clone(),
457 data: proto.data.iter().map(|&id| WorkerSlotId(id)).collect(),
458 }
459 }
460
461 pub fn to_protobuf(&self) -> PbWorkerSlotMapping {
463 PbWorkerSlotMapping {
464 original_indices: self.original_indices.clone(),
465 data: self.data.iter().map(|id| id.0).collect(),
466 }
467 }
468}
469
470impl WorkerSlotMapping {
471 pub fn to_actor(&self, to_map: &HashMap<WorkerSlotId, ActorId>) -> ActorMapping {
473 self.transform(to_map)
474 }
475}
476
477impl ActorAlignmentMapping {
478 pub fn from_assignment(
479 assignment: BTreeMap<WorkerId, BTreeMap<usize, Vec<usize>>>,
480 vnode_size: usize,
481 ) -> Self {
482 let mut all_bitmaps = HashMap::new();
483
484 for (worker_id, actors) in &assignment {
485 for (actor_idx, vnodes) in actors {
486 let mut bitmap_builder = BitmapBuilder::zeroed(vnode_size);
487 vnodes
488 .iter()
489 .for_each(|vnode| bitmap_builder.set(*vnode, true));
490 all_bitmaps.insert(
491 ActorAlignmentId::new(*worker_id, *actor_idx),
492 bitmap_builder.finish(),
493 );
494 }
495 }
496
497 Self::from_bitmaps(&all_bitmaps)
498 }
499}
500
501#[cfg(test)]
502mod tests {
503 use std::iter::repeat_with;
504
505 use rand::Rng;
506
507 use super::*;
508
509 struct Test;
510 impl VnodeMappingItem for Test {
511 type Item = u32;
512 }
513
514 struct Test2;
515 impl VnodeMappingItem for Test2 {
516 type Item = u32;
517 }
518
519 type TestMapping = VnodeMapping<Test>;
520 type Test2Mapping = VnodeMapping<Test2>;
521
522 const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::COUNT_FOR_TEST];
523
524 fn uniforms() -> impl Iterator<Item = TestMapping> {
525 COUNTS
526 .iter()
527 .map(|&count| TestMapping::new_uniform(0..count as u32, VirtualNode::COUNT_FOR_TEST))
528 }
529
530 fn randoms() -> impl Iterator<Item = TestMapping> {
531 COUNTS.iter().map(|&count| {
532 let raw = repeat_with(|| rand::rng().random_range(0..count as u32))
533 .take(VirtualNode::COUNT_FOR_TEST)
534 .collect_vec();
535 TestMapping::from_expanded(&raw)
536 })
537 }
538
539 fn mappings() -> impl Iterator<Item = TestMapping> {
540 uniforms().chain(randoms())
541 }
542
543 #[test]
544 fn test_uniform() {
545 for vnode_mapping in uniforms() {
546 assert_eq!(vnode_mapping.len(), VirtualNode::COUNT_FOR_TEST);
547 let item_count = vnode_mapping.iter_unique().count();
548
549 let mut check: HashMap<u32, Vec<_>> = HashMap::new();
550 for (vnode, item) in vnode_mapping.iter_with_vnode() {
551 check.entry(item).or_default().push(vnode);
552 }
553
554 assert_eq!(check.len(), item_count);
555
556 let (min, max) = check
557 .values()
558 .map(|indexes| indexes.len())
559 .minmax()
560 .into_option()
561 .unwrap();
562
563 assert!(max - min <= 1);
564 }
565 }
566
567 #[test]
568 fn test_iter_with_get() {
569 for vnode_mapping in mappings() {
570 for (vnode, item) in vnode_mapping.iter_with_vnode() {
571 assert_eq!(vnode_mapping.get(vnode), item);
572 }
573 }
574 }
575
576 #[test]
577 fn test_from_to_bitmaps() {
578 for vnode_mapping in mappings() {
579 let bitmaps = vnode_mapping.to_bitmaps();
580 let new_vnode_mapping = TestMapping::from_bitmaps(&bitmaps);
581
582 assert_eq!(vnode_mapping, new_vnode_mapping);
583 }
584 }
585
586 #[test]
587 fn test_transform() {
588 for vnode_mapping in mappings() {
589 let transform_map: HashMap<_, _> = vnode_mapping
590 .iter_unique()
591 .map(|item| (item, item + 1))
592 .collect();
593 let vnode_mapping_2: Test2Mapping = vnode_mapping.transform(&transform_map);
594
595 for (item, item_2) in vnode_mapping.iter().zip_eq_debug(vnode_mapping_2.iter()) {
596 assert_eq!(item + 1, item_2);
597 }
598
599 let transform_back_map: HashMap<_, _> =
600 transform_map.into_iter().map(|(k, v)| (v, k)).collect();
601 let new_vnode_mapping: TestMapping = vnode_mapping_2.transform(&transform_back_map);
602
603 assert_eq!(vnode_mapping, new_vnode_mapping);
604 }
605 }
606}