risingwave_common/hash/consistent_hash/
vnode.rs1use itertools::Itertools;
16use parse_display::Display;
17
18use crate::array::{Array, ArrayImpl, DataChunk};
19use crate::hash::Crc32HashCode;
20use crate::row::{Row, RowExt};
21use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl};
22use crate::util::hash_util::Crc32FastBuilder;
23use crate::util::row_id::compute_vnode_from_row_id;
24
25#[repr(transparent)]
28#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
29#[display("{0}")]
30pub struct VirtualNode(VirtualNodeInner);
31
32type VirtualNodeInner = u16;
36
37impl !From<Crc32HashCode> for VirtualNode {}
41
42#[easy_ext::ext(Crc32HashCodeToVnodeExt)]
43impl Crc32HashCode {
44 fn to_vnode(self, vnode_count: usize) -> VirtualNode {
46 let inner = (self.value() % vnode_count as u64) as VirtualNodeInner;
49 VirtualNode(inner)
50 }
51}
52
53impl VirtualNode {
54 pub const COUNT_FOR_COMPAT: usize = 1 << 8;
63}
64
65impl VirtualNode {
66 pub const COUNT_FOR_TEST: usize = Self::COUNT_FOR_COMPAT;
68 pub const MAX_FOR_TEST: VirtualNode = VirtualNode::from_index(Self::COUNT_FOR_TEST - 1);
70}
71
72impl VirtualNode {
73 pub const MAX_COUNT: usize = 1 << (VirtualNodeInner::BITS - 1);
80 pub const MAX_REPRESENTABLE: VirtualNode = VirtualNode::from_index(Self::MAX_COUNT - 1);
84 pub const SIZE: usize = std::mem::size_of::<Self>();
86}
87
88pub type AllVirtualNodeIter = std::iter::Map<std::ops::Range<usize>, fn(usize) -> VirtualNode>;
90
91impl VirtualNode {
92 pub const RW_TYPE: DataType = DataType::Int16;
95 pub const ZERO: VirtualNode = VirtualNode::from_index(0);
97
98 pub const fn from_index(index: usize) -> Self {
100 debug_assert!(index < Self::MAX_COUNT);
101 Self(index as _)
102 }
103
104 pub const fn to_index(self) -> usize {
106 self.0 as _
107 }
108
109 pub const fn from_scalar(scalar: i16) -> Self {
111 debug_assert!(scalar >= 0);
112 Self(scalar as _)
113 }
114
115 pub fn from_datum_ref(datum: DatumRef<'_>) -> Self {
116 Self::from_scalar(datum.expect("should not be none").into_int16())
117 }
118
119 pub const fn to_scalar(self) -> i16 {
121 self.0 as _
122 }
123
124 pub const fn to_datum(self) -> Datum {
125 Some(ScalarImpl::Int16(self.to_scalar()))
126 }
127
128 pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self {
130 let inner = VirtualNodeInner::from_be_bytes(bytes);
131 debug_assert!((inner as usize) < Self::MAX_COUNT);
132 Self(inner)
133 }
134
135 pub const fn to_be_bytes(self) -> [u8; Self::SIZE] {
137 self.0.to_be_bytes()
138 }
139
140 pub fn all(vnode_count: usize) -> AllVirtualNodeIter {
142 (0..vnode_count).map(Self::from_index)
143 }
144}
145
146impl VirtualNode {
147 pub fn compute_chunk(
152 data_chunk: &DataChunk,
153 keys: &[usize],
154 vnode_count: usize,
155 ) -> Vec<VirtualNode> {
156 if let Ok(idx) = keys.iter().exactly_one()
157 && let ArrayImpl::Serial(serial_array) = &**data_chunk.column_at(*idx)
158 {
159 return serial_array
160 .iter()
161 .enumerate()
162 .map(|(idx, serial)| {
163 if let Some(serial) = serial {
164 compute_vnode_from_row_id(serial.as_row_id(), vnode_count)
165 } else {
166 let (row, _) = data_chunk.row_at(idx);
171 row.hash(Crc32FastBuilder).to_vnode(vnode_count)
172 }
173 })
174 .collect();
175 }
176
177 data_chunk
178 .get_hash_values(keys, Crc32FastBuilder)
179 .into_iter()
180 .map(|hash| hash.to_vnode(vnode_count))
181 .collect()
182 }
183
184 pub fn compute_chunk_for_test(data_chunk: &DataChunk, keys: &[usize]) -> Vec<VirtualNode> {
186 Self::compute_chunk(data_chunk, keys, Self::COUNT_FOR_TEST)
187 }
188
189 pub fn compute_row(row: impl Row, indices: &[usize], vnode_count: usize) -> VirtualNode {
192 let project = row.project(indices);
193 if let Ok(Some(ScalarRefImpl::Serial(s))) = project.iter().exactly_one().as_ref() {
194 return compute_vnode_from_row_id(s.as_row_id(), vnode_count);
195 }
196
197 project.hash(Crc32FastBuilder).to_vnode(vnode_count)
198 }
199
200 pub fn compute_row_for_test(row: impl Row, indices: &[usize]) -> VirtualNode {
202 Self::compute_row(row, indices, Self::COUNT_FOR_TEST)
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209 use crate::array::DataChunkTestExt;
210 use crate::row::OwnedRow;
211 use crate::util::row_id::RowIdGenerator;
212
213 #[test]
214 fn test_serial_key_chunk() {
215 let mut r#gen =
216 RowIdGenerator::new([VirtualNode::from_index(100)], VirtualNode::COUNT_FOR_TEST);
217 let chunk = format!(
218 "SRL I
219 {} 1
220 {} 2",
221 r#gen.next(),
222 r#gen.next(),
223 );
224
225 let chunk = DataChunk::from_pretty(chunk.as_str());
226 let vnodes = VirtualNode::compute_chunk_for_test(&chunk, &[0]);
227
228 assert_eq!(
229 vnodes.as_slice(),
230 &[VirtualNode::from_index(100), VirtualNode::from_index(100)]
231 );
232 }
233
234 #[test]
235 fn test_serial_key_row() {
236 let mut r#gen =
237 RowIdGenerator::new([VirtualNode::from_index(100)], VirtualNode::COUNT_FOR_TEST);
238 let row = OwnedRow::new(vec![
239 Some(ScalarImpl::Serial(r#gen.next().into())),
240 Some(ScalarImpl::Int64(12345)),
241 ]);
242
243 let vnode = VirtualNode::compute_row_for_test(&row, &[0]);
244
245 assert_eq!(vnode, VirtualNode::from_index(100));
246 }
247
248 #[test]
249 fn test_serial_key_chunk_multiple_vnodes() {
250 let mut r#gen = RowIdGenerator::new(
251 [100, 200].map(VirtualNode::from_index),
252 VirtualNode::COUNT_FOR_TEST,
253 );
254 let chunk = format!(
255 "SRL I
256 {} 1
257 {} 2
258 {} 3
259 {} 4",
260 r#gen.next(),
261 r#gen.next(),
262 r#gen.next(),
263 r#gen.next(),
264 );
265
266 let chunk = DataChunk::from_pretty(chunk.as_str());
267 let vnodes = VirtualNode::compute_chunk_for_test(&chunk, &[0]);
268
269 assert_eq!(
270 vnodes.as_slice(),
271 &[
272 VirtualNode::from_index(100),
273 VirtualNode::from_index(200),
274 VirtualNode::from_index(100),
275 VirtualNode::from_index(200),
276 ]
277 );
278 }
279}