risingwave_common/hash/consistent_hash/
mapping.rsuse std::collections::{BTreeSet, HashMap};
use std::fmt::{Debug, Display, Formatter};
use std::hash::Hash;
use std::ops::Index;
use educe::Educe;
use itertools::Itertools;
use risingwave_pb::common::PbWorkerSlotMapping;
use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto;
use super::bitmap::VnodeBitmapExt;
use crate::bitmap::{Bitmap, BitmapBuilder};
use crate::hash::VirtualNode;
use crate::util::compress::compress_data;
use crate::util::iter_util::ZipEqDebug;
pub type ActorId = u32;
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub struct WorkerSlotId(u64);
impl WorkerSlotId {
pub fn worker_id(&self) -> u32 {
(self.0 >> 32) as u32
}
pub fn slot_idx(&self) -> u32 {
self.0 as u32
}
pub fn new(worker_id: u32, slot_idx: usize) -> Self {
Self((worker_id as u64) << 32 | slot_idx as u64)
}
}
impl From<WorkerSlotId> for u64 {
fn from(id: WorkerSlotId) -> Self {
id.0
}
}
impl From<u64> for WorkerSlotId {
fn from(id: u64) -> Self {
Self(id)
}
}
impl Display for WorkerSlotId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
}
}
impl Debug for WorkerSlotId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("[{}:{}]", self.worker_id(), self.slot_idx()))
}
}
pub trait VnodeMappingItem {
type Item: Copy + Ord + Hash + Debug;
}
pub type ExpandedMapping<T> = Vec<<T as VnodeMappingItem>::Item>;
#[derive(Educe)]
#[educe(Debug, Clone, PartialEq, Eq, Hash)]
pub struct VnodeMapping<T: VnodeMappingItem> {
original_indices: Vec<u32>,
data: Vec<T::Item>,
}
#[expect(
clippy::len_without_is_empty,
reason = "empty vnode mapping makes no sense"
)]
impl<T: VnodeMappingItem> VnodeMapping<T> {
pub fn new_uniform(items: impl ExactSizeIterator<Item = T::Item>, vnode_count: usize) -> Self {
assert!(items.len() <= vnode_count);
let mut original_indices = Vec::with_capacity(items.len());
let mut data = Vec::with_capacity(items.len());
let hash_shard_size = vnode_count / items.len();
let mut one_more_count = vnode_count % items.len();
let mut init_bound = 0;
for item in items {
let count = if one_more_count > 0 {
one_more_count -= 1;
hash_shard_size + 1
} else {
hash_shard_size
};
init_bound += count;
original_indices.push(init_bound as u32 - 1);
data.push(item);
}
debug_assert_eq!(data.iter().duplicates().count(), 0);
Self {
original_indices,
data,
}
}
pub fn new_single(item: T::Item) -> Self {
Self::new_uniform(std::iter::once(item), 1)
}
pub fn len(&self) -> usize {
self.original_indices
.last()
.map(|&i| i as usize + 1)
.unwrap_or(0)
}
pub fn get(&self, vnode: VirtualNode) -> T::Item {
self[vnode]
}
pub fn get_matched(&self, bitmap: &Bitmap) -> Option<T::Item> {
bitmap
.iter_vnodes()
.next() .map(|v| self.get(v))
}
pub fn iter(&self) -> impl Iterator<Item = T::Item> + '_ {
self.data
.iter()
.copied()
.zip_eq_debug(
std::iter::once(0)
.chain(self.original_indices.iter().copied().map(|i| i + 1))
.tuple_windows()
.map(|(a, b)| (b - a) as usize),
)
.flat_map(|(item, c)| std::iter::repeat(item).take(c))
}
pub fn iter_with_vnode(&self) -> impl Iterator<Item = (VirtualNode, T::Item)> + '_ {
self.iter()
.enumerate()
.map(|(v, item)| (VirtualNode::from_index(v), item))
}
pub fn iter_unique(&self) -> impl Iterator<Item = T::Item> + '_ {
self.data.iter().copied().sorted().dedup()
}
pub fn to_single(&self) -> Option<T::Item> {
self.data.iter().copied().dedup().exactly_one().ok()
}
pub fn to_bitmaps(&self) -> HashMap<T::Item, Bitmap> {
let vnode_count = self.len();
let mut vnode_bitmaps = HashMap::new();
for (vnode, item) in self.iter_with_vnode() {
vnode_bitmaps
.entry(item)
.or_insert_with(|| BitmapBuilder::zeroed(vnode_count))
.set(vnode.to_index(), true);
}
vnode_bitmaps
.into_iter()
.map(|(item, b)| (item, b.finish()))
.collect()
}
pub fn from_bitmaps(bitmaps: &HashMap<T::Item, Bitmap>) -> Self {
let vnode_count = bitmaps.values().next().expect("empty bitmaps").len();
let mut items = vec![None; vnode_count];
for (&item, bitmap) in bitmaps {
assert_eq!(bitmap.len(), vnode_count);
for idx in bitmap.iter_ones() {
if let Some(prev) = items[idx].replace(item) {
panic!("mapping at index `{idx}` is set to both `{prev:?}` and `{item:?}`");
}
}
}
let items = items
.into_iter()
.enumerate()
.map(|(i, o)| o.unwrap_or_else(|| panic!("mapping at index `{i}` is not set")))
.collect_vec();
Self::from_expanded(&items)
}
pub fn from_expanded(items: &[T::Item]) -> Self {
let (original_indices, data) = compress_data(items);
Self {
original_indices,
data,
}
}
pub fn to_expanded(&self) -> ExpandedMapping<T> {
self.iter().collect()
}
pub fn transform<T2, M>(&self, to_map: &M) -> VnodeMapping<T2>
where
T2: VnodeMappingItem,
M: for<'a> Index<&'a T::Item, Output = T2::Item>,
{
VnodeMapping {
original_indices: self.original_indices.clone(),
data: self.data.iter().map(|item| to_map[item]).collect(),
}
}
}
impl<T: VnodeMappingItem> Index<VirtualNode> for VnodeMapping<T> {
type Output = T::Item;
fn index(&self, vnode: VirtualNode) -> &Self::Output {
let index = self
.original_indices
.partition_point(|&i| i < vnode.to_index() as u32);
&self.data[index]
}
}
pub mod marker {
use super::*;
pub struct Actor;
impl VnodeMappingItem for Actor {
type Item = ActorId;
}
pub struct WorkerSlot;
impl VnodeMappingItem for WorkerSlot {
type Item = WorkerSlotId;
}
}
pub type ActorMapping = VnodeMapping<marker::Actor>;
pub type ExpandedActorMapping = ExpandedMapping<marker::Actor>;
pub type WorkerSlotMapping = VnodeMapping<marker::WorkerSlot>;
pub type ExpandedWorkerSlotMapping = ExpandedMapping<marker::WorkerSlot>;
impl ActorMapping {
pub fn to_worker_slot(&self, actor_to_worker: &HashMap<ActorId, u32>) -> WorkerSlotMapping {
let mut worker_actors = HashMap::new();
for actor_id in self.iter_unique() {
let worker_id = actor_to_worker
.get(&actor_id)
.cloned()
.unwrap_or_else(|| panic!("location for actor {} not found", actor_id));
worker_actors
.entry(worker_id)
.or_insert(BTreeSet::new())
.insert(actor_id);
}
let mut actor_location = HashMap::new();
for (worker, actors) in worker_actors {
for (idx, &actor) in actors.iter().enumerate() {
actor_location.insert(actor, WorkerSlotId::new(worker, idx));
}
}
self.transform(&actor_location)
}
pub fn from_protobuf(proto: &ActorMappingProto) -> Self {
assert_eq!(proto.original_indices.len(), proto.data.len());
Self {
original_indices: proto.original_indices.clone(),
data: proto.data.clone(),
}
}
pub fn to_protobuf(&self) -> ActorMappingProto {
ActorMappingProto {
original_indices: self.original_indices.clone(),
data: self.data.clone(),
}
}
}
impl WorkerSlotMapping {
pub fn build_from_ids(worker_slot_ids: &[WorkerSlotId], vnode_count: usize) -> Self {
Self::new_uniform(worker_slot_ids.iter().cloned(), vnode_count)
}
pub fn from_protobuf(proto: &PbWorkerSlotMapping) -> Self {
assert_eq!(proto.original_indices.len(), proto.data.len());
Self {
original_indices: proto.original_indices.clone(),
data: proto.data.iter().map(|&id| WorkerSlotId(id)).collect(),
}
}
pub fn to_protobuf(&self) -> PbWorkerSlotMapping {
PbWorkerSlotMapping {
original_indices: self.original_indices.clone(),
data: self.data.iter().map(|id| id.0).collect(),
}
}
}
impl WorkerSlotMapping {
pub fn to_actor(&self, to_map: &HashMap<WorkerSlotId, ActorId>) -> ActorMapping {
self.transform(to_map)
}
}
#[cfg(test)]
mod tests {
use std::iter::repeat_with;
use rand::Rng;
use super::*;
struct Test;
impl VnodeMappingItem for Test {
type Item = u32;
}
struct Test2;
impl VnodeMappingItem for Test2 {
type Item = u32;
}
type TestMapping = VnodeMapping<Test>;
type Test2Mapping = VnodeMapping<Test2>;
const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::COUNT_FOR_TEST];
fn uniforms() -> impl Iterator<Item = TestMapping> {
COUNTS
.iter()
.map(|&count| TestMapping::new_uniform(0..count as u32, VirtualNode::COUNT_FOR_TEST))
}
fn randoms() -> impl Iterator<Item = TestMapping> {
COUNTS.iter().map(|&count| {
let raw = repeat_with(|| rand::thread_rng().gen_range(0..count as u32))
.take(VirtualNode::COUNT_FOR_TEST)
.collect_vec();
TestMapping::from_expanded(&raw)
})
}
fn mappings() -> impl Iterator<Item = TestMapping> {
uniforms().chain(randoms())
}
#[test]
fn test_uniform() {
for vnode_mapping in uniforms() {
assert_eq!(vnode_mapping.len(), VirtualNode::COUNT_FOR_TEST);
let item_count = vnode_mapping.iter_unique().count();
let mut check: HashMap<u32, Vec<_>> = HashMap::new();
for (vnode, item) in vnode_mapping.iter_with_vnode() {
check.entry(item).or_default().push(vnode);
}
assert_eq!(check.len(), item_count);
let (min, max) = check
.values()
.map(|indexes| indexes.len())
.minmax()
.into_option()
.unwrap();
assert!(max - min <= 1);
}
}
#[test]
fn test_iter_with_get() {
for vnode_mapping in mappings() {
for (vnode, item) in vnode_mapping.iter_with_vnode() {
assert_eq!(vnode_mapping.get(vnode), item);
}
}
}
#[test]
fn test_from_to_bitmaps() {
for vnode_mapping in mappings() {
let bitmaps = vnode_mapping.to_bitmaps();
let new_vnode_mapping = TestMapping::from_bitmaps(&bitmaps);
assert_eq!(vnode_mapping, new_vnode_mapping);
}
}
#[test]
fn test_transform() {
for vnode_mapping in mappings() {
let transform_map: HashMap<_, _> = vnode_mapping
.iter_unique()
.map(|item| (item, item + 1))
.collect();
let vnode_mapping_2: Test2Mapping = vnode_mapping.transform(&transform_map);
for (item, item_2) in vnode_mapping.iter().zip_eq_debug(vnode_mapping_2.iter()) {
assert_eq!(item + 1, item_2);
}
let transform_back_map: HashMap<_, _> =
transform_map.into_iter().map(|(k, v)| (v, k)).collect();
let new_vnode_mapping: TestMapping = vnode_mapping_2.transform(&transform_back_map);
assert_eq!(vnode_mapping, new_vnode_mapping);
}
}
}