risingwave_common/hash/consistent_hash/
vnode.rs1use itertools::Itertools;
16use parse_display::Display;
17
18use crate::array::{Array, ArrayImpl, DataChunk};
19use crate::hash::Crc32HashCode;
20use crate::row::{Row, RowExt};
21use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl};
22use crate::util::hash_util::Crc32FastBuilder;
23use crate::util::row_id::compute_vnode_from_row_id;
24
25#[repr(transparent)]
28#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
29#[display("{0}")]
30pub struct VirtualNode(VirtualNodeInner);
31
32type VirtualNodeInner = u16;
36
37impl !From<Crc32HashCode> for VirtualNode {}
41
42#[easy_ext::ext(Crc32HashCodeToVnodeExt)]
43impl Crc32HashCode {
44 fn to_vnode(self, vnode_count: usize) -> VirtualNode {
46 let inner = (self.value() % vnode_count as u64) as VirtualNodeInner;
49 VirtualNode(inner)
50 }
51}
52
53impl VirtualNode {
54 pub const COUNT_FOR_COMPAT: usize = 1 << 8;
63}
64
65impl VirtualNode {
66 pub const COUNT_FOR_TEST: usize = Self::COUNT_FOR_COMPAT;
68 pub const MAX_FOR_TEST: VirtualNode = VirtualNode::from_index(Self::COUNT_FOR_TEST - 1);
70}
71
72impl VirtualNode {
73 pub const MAX_COUNT: usize = 1 << (VirtualNodeInner::BITS - 1);
80 pub const MAX_REPRESENTABLE: VirtualNode = VirtualNode::from_index(Self::MAX_COUNT - 1);
84 pub const SIZE: usize = std::mem::size_of::<Self>();
86}
87
88pub type AllVirtualNodeIter = std::iter::Map<std::ops::Range<usize>, fn(usize) -> VirtualNode>;
90
91impl VirtualNode {
92 pub const RW_TYPE: DataType = DataType::Int16;
95 pub const ZERO: VirtualNode = VirtualNode::from_index(0);
97
98 pub const fn from_index(index: usize) -> Self {
100 debug_assert!(index < Self::MAX_COUNT);
101 Self(index as _)
102 }
103
104 pub const fn to_index(self) -> usize {
106 self.0 as _
107 }
108
109 pub const fn from_scalar(scalar: i16) -> Self {
111 debug_assert!(scalar >= 0);
112 Self(scalar as _)
113 }
114
115 pub fn from_datum_ref(datum: DatumRef<'_>) -> Self {
116 Self::from_scalar(datum.expect("should not be none").into_int16())
117 }
118
119 pub const fn to_scalar(self) -> i16 {
121 self.0 as _
122 }
123
124 pub const fn to_datum(self) -> Datum {
125 Some(ScalarImpl::Int16(self.to_scalar()))
126 }
127
128 pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self {
130 let inner = VirtualNodeInner::from_be_bytes(bytes);
131 debug_assert!((inner as usize) < Self::MAX_COUNT);
132 Self(inner)
133 }
134
135 pub const fn to_be_bytes(self) -> [u8; Self::SIZE] {
137 self.0.to_be_bytes()
138 }
139
140 pub fn all(vnode_count: usize) -> AllVirtualNodeIter {
142 (0..vnode_count).map(Self::from_index)
143 }
144}
145
146impl VirtualNode {
147 pub fn compute_chunk(
152 data_chunk: &DataChunk,
153 keys: &[usize],
154 vnode_count: usize,
155 ) -> Vec<VirtualNode> {
156 if let Ok(idx) = Itertools::exactly_one(keys.iter())
157 && let ArrayImpl::Serial(serial_array) = &**data_chunk.column_at(*idx)
158 {
159 return serial_array
160 .iter()
161 .enumerate()
162 .map(|(idx, serial)| {
163 if let Some(serial) = serial {
164 compute_vnode_from_row_id(serial.as_row_id(), vnode_count)
165 } else {
166 let (row, _) = data_chunk.row_at(idx);
171 row.hash(Crc32FastBuilder).to_vnode(vnode_count)
172 }
173 })
174 .collect();
175 }
176
177 data_chunk
178 .get_hash_values(keys, Crc32FastBuilder)
179 .into_iter()
180 .map(|hash| hash.to_vnode(vnode_count))
181 .collect()
182 }
183
184 pub fn compute_chunk_for_test(data_chunk: &DataChunk, keys: &[usize]) -> Vec<VirtualNode> {
186 Self::compute_chunk(data_chunk, keys, Self::COUNT_FOR_TEST)
187 }
188
189 pub fn compute_row(row: impl Row, indices: &[usize], vnode_count: usize) -> VirtualNode {
192 let project = row.project(indices);
193 if let Ok(Some(ScalarRefImpl::Serial(s))) = Itertools::exactly_one(project.iter()).as_ref()
194 {
195 return compute_vnode_from_row_id(s.as_row_id(), vnode_count);
196 }
197
198 project.hash(Crc32FastBuilder).to_vnode(vnode_count)
199 }
200
201 pub fn compute_row_for_test(row: impl Row, indices: &[usize]) -> VirtualNode {
203 Self::compute_row(row, indices, Self::COUNT_FOR_TEST)
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use crate::array::DataChunkTestExt;
211 use crate::row::OwnedRow;
212 use crate::util::row_id::RowIdGenerator;
213
214 #[test]
215 fn test_serial_key_chunk() {
216 let mut r#gen =
217 RowIdGenerator::new([VirtualNode::from_index(100)], VirtualNode::COUNT_FOR_TEST);
218 let chunk = format!(
219 "SRL I
220 {} 1
221 {} 2",
222 r#gen.next(),
223 r#gen.next(),
224 );
225
226 let chunk = DataChunk::from_pretty(chunk.as_str());
227 let vnodes = VirtualNode::compute_chunk_for_test(&chunk, &[0]);
228
229 assert_eq!(
230 vnodes.as_slice(),
231 &[VirtualNode::from_index(100), VirtualNode::from_index(100)]
232 );
233 }
234
235 #[test]
236 fn test_serial_key_row() {
237 let mut r#gen =
238 RowIdGenerator::new([VirtualNode::from_index(100)], VirtualNode::COUNT_FOR_TEST);
239 let row = OwnedRow::new(vec![
240 Some(ScalarImpl::Serial(r#gen.next().into())),
241 Some(ScalarImpl::Int64(12345)),
242 ]);
243
244 let vnode = VirtualNode::compute_row_for_test(&row, &[0]);
245
246 assert_eq!(vnode, VirtualNode::from_index(100));
247 }
248
249 #[test]
250 fn test_serial_key_chunk_multiple_vnodes() {
251 let mut r#gen = RowIdGenerator::new(
252 [100, 200].map(VirtualNode::from_index),
253 VirtualNode::COUNT_FOR_TEST,
254 );
255 let chunk = format!(
256 "SRL I
257 {} 1
258 {} 2
259 {} 3
260 {} 4",
261 r#gen.next(),
262 r#gen.next(),
263 r#gen.next(),
264 r#gen.next(),
265 );
266
267 let chunk = DataChunk::from_pretty(chunk.as_str());
268 let vnodes = VirtualNode::compute_chunk_for_test(&chunk, &[0]);
269
270 assert_eq!(
271 vnodes.as_slice(),
272 &[
273 VirtualNode::from_index(100),
274 VirtualNode::from_index(200),
275 VirtualNode::from_index(100),
276 VirtualNode::from_index(200),
277 ]
278 );
279 }
280}