risingwave_common/vnode_mapping/
vnode_placement.rsuse std::collections::{HashMap, HashSet, LinkedList, VecDeque};
use std::ops::BitOrAssign;
use itertools::Itertools;
use num_integer::Integer;
use risingwave_common::hash::WorkerSlotId;
use risingwave_pb::common::WorkerNode;
use crate::bitmap::{Bitmap, BitmapBuilder};
use crate::hash::{VirtualNode, WorkerSlotMapping};
pub fn place_vnode(
hint_worker_slot_mapping: Option<&WorkerSlotMapping>,
workers: &[WorkerNode],
max_parallelism: Option<usize>,
vnode_count: usize,
) -> Option<WorkerSlotMapping> {
if let Some(mapping) = hint_worker_slot_mapping {
assert_eq!(mapping.len(), vnode_count);
}
let mut worker_slots: LinkedList<_> = workers
.iter()
.filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving))
.sorted_by_key(|w| w.id)
.map(|w| (0..w.parallelism()).map(|idx| WorkerSlotId::new(w.id, idx)))
.collect();
let serving_parallelism = std::cmp::min(
worker_slots.iter().map(|slots| slots.len()).sum(),
std::cmp::min(max_parallelism.unwrap_or(usize::MAX), vnode_count),
);
let mut selected_slots = Vec::new();
while !worker_slots.is_empty() {
worker_slots
.extract_if(|slots| {
if let Some(slot) = slots.next() {
selected_slots.push(slot);
false
} else {
true
}
})
.for_each(drop);
}
selected_slots.drain(serving_parallelism..);
let selected_slots_set: HashSet<WorkerSlotId> = selected_slots.iter().cloned().collect();
if selected_slots_set.is_empty() {
return None;
}
#[derive(Debug)]
struct Balance {
slot: WorkerSlotId,
balance: i32,
builder: BitmapBuilder,
is_temp: bool,
}
let (expected, mut remain) = vnode_count.div_rem(&selected_slots.len());
let mut balances: HashMap<WorkerSlotId, Balance> = HashMap::default();
for slot in &selected_slots {
let mut balance = Balance {
slot: *slot,
balance: -(expected as i32),
builder: BitmapBuilder::zeroed(vnode_count),
is_temp: false,
};
if remain > 0 {
balance.balance -= 1;
remain -= 1;
}
balances.insert(*slot, balance);
}
let mut temp_slot = Balance {
slot: WorkerSlotId::new(0u32, usize::MAX), balance: 0,
builder: BitmapBuilder::zeroed(vnode_count),
is_temp: true,
};
match hint_worker_slot_mapping {
Some(hint_worker_slot_mapping) => {
for (vnode, worker_slot) in hint_worker_slot_mapping.iter_with_vnode() {
let b = if selected_slots_set.contains(&worker_slot) {
balances.get_mut(&worker_slot).unwrap()
} else {
&mut temp_slot
};
b.balance += 1;
b.builder.set(vnode.to_index(), true);
}
}
None => {
for vnode in VirtualNode::all(vnode_count) {
temp_slot.balance += 1;
temp_slot.builder.set(vnode.to_index(), true);
}
}
}
let mut balances: VecDeque<_> = balances
.into_values()
.chain(std::iter::once(temp_slot))
.sorted_by_key(|b| b.balance)
.rev()
.collect();
let mut results: HashMap<WorkerSlotId, Bitmap> = HashMap::default();
while !balances.is_empty() {
if balances.len() == 1 {
let single = balances.pop_front().unwrap();
assert_eq!(single.balance, 0);
if !single.is_temp {
results.insert(single.slot, single.builder.finish());
}
break;
}
let mut src = balances.pop_front().unwrap();
let mut dst = balances.pop_back().unwrap();
let n = std::cmp::min(src.balance.abs(), dst.balance.abs());
let mut moved = 0;
for idx in 0..vnode_count {
if moved >= n {
break;
}
if src.builder.is_set(idx) {
src.builder.set(idx, false);
assert!(!dst.builder.is_set(idx));
dst.builder.set(idx, true);
moved += 1;
}
}
src.balance -= n;
dst.balance += n;
if src.balance != 0 {
balances.push_front(src);
} else if !src.is_temp {
results.insert(src.slot, src.builder.finish());
}
if dst.balance != 0 {
balances.push_back(dst);
} else if !dst.is_temp {
results.insert(dst.slot, dst.builder.finish());
}
}
let mut worker_result = HashMap::new();
for (worker_slot, bitmap) in results {
worker_result
.entry(worker_slot)
.or_insert(Bitmap::zeros(vnode_count))
.bitor_assign(&bitmap);
}
Some(WorkerSlotMapping::from_bitmaps(&worker_result))
}
#[cfg(test)]
mod tests {
use risingwave_common::hash::WorkerSlotMapping;
use risingwave_pb::common::worker_node::Property;
use risingwave_pb::common::WorkerNode;
use crate::hash::VirtualNode;
fn place_vnode(
hint_worker_slot_mapping: Option<&WorkerSlotMapping>,
workers: &[WorkerNode],
max_parallelism: Option<usize>,
) -> Option<WorkerSlotMapping> {
super::place_vnode(
hint_worker_slot_mapping,
workers,
max_parallelism,
VirtualNode::COUNT_FOR_TEST,
)
}
#[test]
fn test_place_vnode() {
assert_eq!(VirtualNode::COUNT_FOR_TEST, 256);
let serving_property = Property {
is_unschedulable: false,
is_serving: true,
is_streaming: false,
internal_rpc_host_addr: "".to_string(),
};
let count_same_vnode_mapping = |wm1: &WorkerSlotMapping, wm2: &WorkerSlotMapping| {
assert_eq!(wm1.len(), 256);
assert_eq!(wm2.len(), 256);
let mut count: usize = 0;
for idx in 0..VirtualNode::COUNT_FOR_TEST {
let vnode = VirtualNode::from_index(idx);
if wm1.get(vnode) == wm2.get(vnode) {
count += 1;
}
}
count
};
let worker_1 = WorkerNode {
id: 1,
parallelism: 1,
property: Some(serving_property.clone()),
..Default::default()
};
assert!(
place_vnode(None, &[worker_1.clone()], Some(0)).is_none(),
"max_parallelism should >= 0"
);
let re_worker_mapping_2 = place_vnode(None, &[worker_1.clone()], None).unwrap();
assert_eq!(re_worker_mapping_2.iter_unique().count(), 1);
let worker_2 = WorkerNode {
id: 2,
parallelism: 50,
property: Some(serving_property.clone()),
..Default::default()
};
let re_worker_mapping = place_vnode(
Some(&re_worker_mapping_2),
&[worker_1.clone(), worker_2.clone()],
None,
)
.unwrap();
assert_eq!(re_worker_mapping.iter_unique().count(), 51);
let score = count_same_vnode_mapping(&re_worker_mapping_2, &re_worker_mapping);
assert!(score >= 5);
let worker_3 = WorkerNode {
id: 3,
parallelism: 60,
property: Some(serving_property.clone()),
..Default::default()
};
let re_pu_mapping_2 = place_vnode(
Some(&re_worker_mapping),
&[worker_1.clone(), worker_2.clone(), worker_3.clone()],
None,
)
.unwrap();
assert_eq!(re_pu_mapping_2.iter_unique().count(), 111);
let score = count_same_vnode_mapping(&re_pu_mapping_2, &re_worker_mapping);
assert!(score >= (2 + 50 * 2));
let re_pu_mapping = place_vnode(
Some(&re_pu_mapping_2),
&[worker_1.clone(), worker_2.clone(), worker_3.clone()],
Some(50),
)
.unwrap();
assert_eq!(re_pu_mapping.iter_unique().count(), 50);
let score = count_same_vnode_mapping(&re_pu_mapping, &re_pu_mapping_2);
assert!(score >= 50 * 2);
let re_pu_mapping_2 = place_vnode(
Some(&re_pu_mapping),
&[worker_1.clone(), worker_2, worker_3.clone()],
None,
)
.unwrap();
assert_eq!(re_pu_mapping_2.iter_unique().count(), 111);
let score = count_same_vnode_mapping(&re_pu_mapping_2, &re_pu_mapping);
assert!(score >= 50 * 2);
let re_pu_mapping =
place_vnode(Some(&re_pu_mapping_2), &[worker_1, worker_3.clone()], None).unwrap();
assert_eq!(re_pu_mapping.iter_unique().count(), 61);
let score = count_same_vnode_mapping(&re_pu_mapping, &re_pu_mapping_2);
assert!(score >= 61 * 2);
assert!(place_vnode(Some(&re_pu_mapping), &[], None).is_none());
let re_pu_mapping = place_vnode(Some(&re_pu_mapping), &[worker_3], None).unwrap();
assert_eq!(re_pu_mapping.iter_unique().count(), 60);
assert!(place_vnode(Some(&re_pu_mapping), &[], None).is_none());
}
}