risingwave_common/hash/consistent_hash/
vnode.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use parse_display::Display;
17
18use crate::array::{Array, ArrayImpl, DataChunk};
19use crate::hash::Crc32HashCode;
20use crate::row::{Row, RowExt};
21use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl};
22use crate::util::hash_util::Crc32FastBuilder;
23use crate::util::row_id::compute_vnode_from_row_id;
24
25/// `VirtualNode` (a.k.a. Vnode) is a minimal partition that a set of keys belong to. It is used for
26/// consistent hashing.
27#[repr(transparent)]
28#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
29#[display("{0}")]
30pub struct VirtualNode(VirtualNodeInner);
31
32/// The internal representation of a virtual node id.
33///
34/// Note: not all bits of the inner representation might be used.
35type VirtualNodeInner = u16;
36
37/// `vnode_count` must be provided to convert a hash code to a virtual node.
38///
39/// Use [`Crc32HashCodeToVnodeExt::to_vnode`] instead.
40impl !From<Crc32HashCode> for VirtualNode {}
41
42#[easy_ext::ext(Crc32HashCodeToVnodeExt)]
43impl Crc32HashCode {
44    /// Converts the hash code to a virtual node, based on the given total count of vnodes.
45    fn to_vnode(self, vnode_count: usize) -> VirtualNode {
46        // Take the least significant bits of the hash code.
47        // TODO: should we use the most significant bits?
48        let inner = (self.value() % vnode_count as u64) as VirtualNodeInner;
49        VirtualNode(inner)
50    }
51}
52
53impl VirtualNode {
54    /// The total count of virtual nodes, for compatibility purposes **ONLY**.
55    ///
56    /// Typical use cases:
57    ///
58    /// - As the default value for the session configuration.
59    /// - As the vnode count for all streaming jobs, fragments, and tables that were created before
60    ///   the variable vnode count support was introduced.
61    /// - As the vnode count for singletons.
62    pub const COUNT_FOR_COMPAT: usize = 1 << 8;
63}
64
65impl VirtualNode {
66    /// The total count of virtual nodes, for testing purposes.
67    pub const COUNT_FOR_TEST: usize = Self::COUNT_FOR_COMPAT;
68    /// The maximum value of the virtual node, for testing purposes.
69    pub const MAX_FOR_TEST: VirtualNode = VirtualNode::from_index(Self::COUNT_FOR_TEST - 1);
70}
71
72impl VirtualNode {
73    /// The maximum count of virtual nodes that fits in [`VirtualNodeInner`].
74    ///
75    /// Note that the most significant bit is not used. This is because we use signed integers (`i16`)
76    /// for the scalar representation, where overflow can be confusing in terms of ordering.
77    // TODO(var-vnode): the only usage is in log-store, shall we update it by storing the vnode as
78    // bytea to enable 2^16 vnodes?
79    pub const MAX_COUNT: usize = 1 << (VirtualNodeInner::BITS - 1);
80    /// The maximum value of the virtual node that can be represented.
81    ///
82    /// Note that this is **NOT** the maximum value of the virtual node, which depends on the configuration.
83    pub const MAX_REPRESENTABLE: VirtualNode = VirtualNode::from_index(Self::MAX_COUNT - 1);
84    /// The size of a virtual node in bytes, in memory or serialized representation.
85    pub const SIZE: usize = std::mem::size_of::<Self>();
86}
87
88/// An iterator over all virtual nodes.
89pub type AllVirtualNodeIter = std::iter::Map<std::ops::Range<usize>, fn(usize) -> VirtualNode>;
90
91impl VirtualNode {
92    /// We may use `VirtualNode` as a datum in a stream, or store it as a column.
93    /// Hence this reifies it as a RW datatype.
94    pub const RW_TYPE: DataType = DataType::Int16;
95    /// The minimum (zero) value of the virtual node.
96    pub const ZERO: VirtualNode = VirtualNode::from_index(0);
97
98    /// Creates a virtual node from the `usize` index.
99    pub const fn from_index(index: usize) -> Self {
100        debug_assert!(index < Self::MAX_COUNT);
101        Self(index as _)
102    }
103
104    /// Returns the `usize` the virtual node used for indexing.
105    pub const fn to_index(self) -> usize {
106        self.0 as _
107    }
108
109    /// Creates a virtual node from the given scalar representation. Used by `VNODE` expression.
110    pub const fn from_scalar(scalar: i16) -> Self {
111        debug_assert!(scalar >= 0);
112        Self(scalar as _)
113    }
114
115    pub fn from_datum(datum: DatumRef<'_>) -> Self {
116        Self::from_scalar(datum.expect("should not be none").into_int16())
117    }
118
119    /// Returns the scalar representation of the virtual node. Used by `VNODE` expression.
120    pub const fn to_scalar(self) -> i16 {
121        self.0 as _
122    }
123
124    pub const fn to_datum(self) -> Datum {
125        Some(ScalarImpl::Int16(self.to_scalar()))
126    }
127
128    /// Creates a virtual node from the given big-endian bytes representation.
129    pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self {
130        let inner = VirtualNodeInner::from_be_bytes(bytes);
131        debug_assert!((inner as usize) < Self::MAX_COUNT);
132        Self(inner)
133    }
134
135    /// Returns the big-endian bytes representation of the virtual node.
136    pub const fn to_be_bytes(self) -> [u8; Self::SIZE] {
137        self.0.to_be_bytes()
138    }
139
140    /// Iterates over all virtual nodes.
141    pub fn all(vnode_count: usize) -> AllVirtualNodeIter {
142        (0..vnode_count).map(Self::from_index)
143    }
144}
145
146impl VirtualNode {
147    // `compute_chunk` is used to calculate the `VirtualNode` for the columns in the
148    // chunk. When only one column is provided and its type is `Serial`, we consider the column to
149    // be the one that contains RowId, and use a special method to skip the calculation of Hash
150    // and directly extract the `VirtualNode` from `RowId`.
151    pub fn compute_chunk(
152        data_chunk: &DataChunk,
153        keys: &[usize],
154        vnode_count: usize,
155    ) -> Vec<VirtualNode> {
156        if let Ok(idx) = keys.iter().exactly_one()
157            && let ArrayImpl::Serial(serial_array) = &**data_chunk.column_at(*idx)
158        {
159            return serial_array
160                .iter()
161                .enumerate()
162                .map(|(idx, serial)| {
163                    if let Some(serial) = serial {
164                        compute_vnode_from_row_id(serial.as_row_id(), vnode_count)
165                    } else {
166                        // NOTE: here it will hash the entire row when the `_row_id` is missing,
167                        // which could result in rows from the same chunk being allocated to different chunks.
168                        // This process doesn’t guarantee the order of rows, producing indeterminate results in some cases,
169                        // such as when `distinct on` is used without an `order by`.
170                        let (row, _) = data_chunk.row_at(idx);
171                        row.hash(Crc32FastBuilder).to_vnode(vnode_count)
172                    }
173                })
174                .collect();
175        }
176
177        data_chunk
178            .get_hash_values(keys, Crc32FastBuilder)
179            .into_iter()
180            .map(|hash| hash.to_vnode(vnode_count))
181            .collect()
182    }
183
184    /// Equivalent to [`Self::compute_chunk`] with [`VirtualNode::COUNT_FOR_TEST`] as the vnode count.
185    pub fn compute_chunk_for_test(data_chunk: &DataChunk, keys: &[usize]) -> Vec<VirtualNode> {
186        Self::compute_chunk(data_chunk, keys, Self::COUNT_FOR_TEST)
187    }
188
189    // `compute_row` is used to calculate the `VirtualNode` for the corresponding columns in a
190    // `Row`. Similar to `compute_chunk`, it also contains special handling for serial columns.
191    pub fn compute_row(row: impl Row, indices: &[usize], vnode_count: usize) -> VirtualNode {
192        let project = row.project(indices);
193        if let Ok(Some(ScalarRefImpl::Serial(s))) = project.iter().exactly_one().as_ref() {
194            return compute_vnode_from_row_id(s.as_row_id(), vnode_count);
195        }
196
197        project.hash(Crc32FastBuilder).to_vnode(vnode_count)
198    }
199
200    /// Equivalent to [`Self::compute_row`] with [`VirtualNode::COUNT_FOR_TEST`] as the vnode count.
201    pub fn compute_row_for_test(row: impl Row, indices: &[usize]) -> VirtualNode {
202        Self::compute_row(row, indices, Self::COUNT_FOR_TEST)
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209    use crate::array::DataChunkTestExt;
210    use crate::row::OwnedRow;
211    use crate::util::row_id::RowIdGenerator;
212
213    #[test]
214    fn test_serial_key_chunk() {
215        let mut r#gen =
216            RowIdGenerator::new([VirtualNode::from_index(100)], VirtualNode::COUNT_FOR_TEST);
217        let chunk = format!(
218            "SRL I
219             {} 1
220             {} 2",
221            r#gen.next(),
222            r#gen.next(),
223        );
224
225        let chunk = DataChunk::from_pretty(chunk.as_str());
226        let vnodes = VirtualNode::compute_chunk_for_test(&chunk, &[0]);
227
228        assert_eq!(
229            vnodes.as_slice(),
230            &[VirtualNode::from_index(100), VirtualNode::from_index(100)]
231        );
232    }
233
234    #[test]
235    fn test_serial_key_row() {
236        let mut r#gen =
237            RowIdGenerator::new([VirtualNode::from_index(100)], VirtualNode::COUNT_FOR_TEST);
238        let row = OwnedRow::new(vec![
239            Some(ScalarImpl::Serial(r#gen.next().into())),
240            Some(ScalarImpl::Int64(12345)),
241        ]);
242
243        let vnode = VirtualNode::compute_row_for_test(&row, &[0]);
244
245        assert_eq!(vnode, VirtualNode::from_index(100));
246    }
247
248    #[test]
249    fn test_serial_key_chunk_multiple_vnodes() {
250        let mut r#gen = RowIdGenerator::new(
251            [100, 200].map(VirtualNode::from_index),
252            VirtualNode::COUNT_FOR_TEST,
253        );
254        let chunk = format!(
255            "SRL I
256             {} 1
257             {} 2
258             {} 3
259             {} 4",
260            r#gen.next(),
261            r#gen.next(),
262            r#gen.next(),
263            r#gen.next(),
264        );
265
266        let chunk = DataChunk::from_pretty(chunk.as_str());
267        let vnodes = VirtualNode::compute_chunk_for_test(&chunk, &[0]);
268
269        assert_eq!(
270            vnodes.as_slice(),
271            &[
272                VirtualNode::from_index(100),
273                VirtualNode::from_index(200),
274                VirtualNode::from_index(100),
275                VirtualNode::from_index(200),
276            ]
277        );
278    }
279}