Skip to main content

risingwave_common/hash/consistent_hash/
vnode.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use parse_display::Display;
17
18use crate::array::{Array, ArrayImpl, DataChunk};
19use crate::hash::Crc32HashCode;
20use crate::row::{Row, RowExt};
21use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl};
22use crate::util::hash_util::Crc32FastBuilder;
23use crate::util::row_id::compute_vnode_from_row_id;
24
25/// `VirtualNode` (a.k.a. Vnode) is a minimal partition that a set of keys belong to. It is used for
26/// consistent hashing.
27#[repr(transparent)]
28#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
29#[display("{0}")]
30pub struct VirtualNode(VirtualNodeInner);
31
32/// The internal representation of a virtual node id.
33///
34/// Note: not all bits of the inner representation might be used.
35type VirtualNodeInner = u16;
36
37/// `vnode_count` must be provided to convert a hash code to a virtual node.
38///
39/// Use [`Crc32HashCodeToVnodeExt::to_vnode`] instead.
40impl !From<Crc32HashCode> for VirtualNode {}
41
42#[easy_ext::ext(Crc32HashCodeToVnodeExt)]
43impl Crc32HashCode {
44    /// Converts the hash code to a virtual node, based on the given total count of vnodes.
45    fn to_vnode(self, vnode_count: usize) -> VirtualNode {
46        // Take the least significant bits of the hash code.
47        // TODO: should we use the most significant bits?
48        let inner = (self.value() % vnode_count as u64) as VirtualNodeInner;
49        VirtualNode(inner)
50    }
51}
52
53impl VirtualNode {
54    /// The total count of virtual nodes, for compatibility purposes **ONLY**.
55    ///
56    /// Typical use cases:
57    ///
58    /// - As the default value for the session configuration.
59    /// - As the vnode count for all streaming jobs, fragments, and tables that were created before
60    ///   the variable vnode count support was introduced.
61    /// - As the vnode count for singletons.
62    pub const COUNT_FOR_COMPAT: usize = 1 << 8;
63}
64
65impl VirtualNode {
66    /// The total count of virtual nodes, for testing purposes.
67    pub const COUNT_FOR_TEST: usize = Self::COUNT_FOR_COMPAT;
68    /// The maximum value of the virtual node, for testing purposes.
69    pub const MAX_FOR_TEST: VirtualNode = VirtualNode::from_index(Self::COUNT_FOR_TEST - 1);
70}
71
72impl VirtualNode {
73    /// The maximum count of virtual nodes that fits in [`VirtualNodeInner`].
74    ///
75    /// Note that the most significant bit is not used. This is because we use signed integers (`i16`)
76    /// for the scalar representation, where overflow can be confusing in terms of ordering.
77    // TODO(var-vnode): the only usage is in log-store, shall we update it by storing the vnode as
78    // bytea to enable 2^16 vnodes?
79    pub const MAX_COUNT: usize = 1 << (VirtualNodeInner::BITS - 1);
80    /// The maximum value of the virtual node that can be represented.
81    ///
82    /// Note that this is **NOT** the maximum value of the virtual node, which depends on the configuration.
83    pub const MAX_REPRESENTABLE: VirtualNode = VirtualNode::from_index(Self::MAX_COUNT - 1);
84    /// The size of a virtual node in bytes, in memory or serialized representation.
85    pub const SIZE: usize = std::mem::size_of::<Self>();
86}
87
88/// An iterator over all virtual nodes.
89pub type AllVirtualNodeIter = std::iter::Map<std::ops::Range<usize>, fn(usize) -> VirtualNode>;
90
91impl VirtualNode {
92    /// We may use `VirtualNode` as a datum in a stream, or store it as a column.
93    /// Hence this reifies it as a RW datatype.
94    pub const RW_TYPE: DataType = DataType::Int16;
95    /// The minimum (zero) value of the virtual node.
96    pub const ZERO: VirtualNode = VirtualNode::from_index(0);
97
98    /// Creates a virtual node from the `usize` index.
99    pub const fn from_index(index: usize) -> Self {
100        debug_assert!(index < Self::MAX_COUNT);
101        Self(index as _)
102    }
103
104    /// Returns the `usize` the virtual node used for indexing.
105    pub const fn to_index(self) -> usize {
106        self.0 as _
107    }
108
109    /// Creates a virtual node from the given scalar representation. Used by `VNODE` expression.
110    pub const fn from_scalar(scalar: i16) -> Self {
111        debug_assert!(scalar >= 0);
112        Self(scalar as _)
113    }
114
115    pub fn from_datum_ref(datum: DatumRef<'_>) -> Self {
116        Self::from_scalar(datum.expect("should not be none").into_int16())
117    }
118
119    /// Returns the scalar representation of the virtual node. Used by `VNODE` expression.
120    pub const fn to_scalar(self) -> i16 {
121        self.0 as _
122    }
123
124    pub const fn to_datum(self) -> Datum {
125        Some(ScalarImpl::Int16(self.to_scalar()))
126    }
127
128    /// Creates a virtual node from the given big-endian bytes representation.
129    pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self {
130        let inner = VirtualNodeInner::from_be_bytes(bytes);
131        debug_assert!((inner as usize) < Self::MAX_COUNT);
132        Self(inner)
133    }
134
135    /// Returns the big-endian bytes representation of the virtual node.
136    pub const fn to_be_bytes(self) -> [u8; Self::SIZE] {
137        self.0.to_be_bytes()
138    }
139
140    /// Iterates over all virtual nodes.
141    pub fn all(vnode_count: usize) -> AllVirtualNodeIter {
142        (0..vnode_count).map(Self::from_index)
143    }
144}
145
146impl VirtualNode {
147    // `compute_chunk` is used to calculate the `VirtualNode` for the columns in the
148    // chunk. When only one column is provided and its type is `Serial`, we consider the column to
149    // be the one that contains RowId, and use a special method to skip the calculation of Hash
150    // and directly extract the `VirtualNode` from `RowId`.
151    pub fn compute_chunk(
152        data_chunk: &DataChunk,
153        keys: &[usize],
154        vnode_count: usize,
155    ) -> Vec<VirtualNode> {
156        if let Ok(idx) = Itertools::exactly_one(keys.iter())
157            && let ArrayImpl::Serial(serial_array) = &**data_chunk.column_at(*idx)
158        {
159            return serial_array
160                .iter()
161                .enumerate()
162                .map(|(idx, serial)| {
163                    if let Some(serial) = serial {
164                        compute_vnode_from_row_id(serial.as_row_id(), vnode_count)
165                    } else {
166                        // NOTE: here it will hash the entire row when the `_row_id` is missing,
167                        // which could result in rows from the same chunk being allocated to different chunks.
168                        // This process doesn’t guarantee the order of rows, producing indeterminate results in some cases,
169                        // such as when `distinct on` is used without an `order by`.
170                        let (row, _) = data_chunk.row_at(idx);
171                        row.hash(Crc32FastBuilder).to_vnode(vnode_count)
172                    }
173                })
174                .collect();
175        }
176
177        data_chunk
178            .get_hash_values(keys, Crc32FastBuilder)
179            .into_iter()
180            .map(|hash| hash.to_vnode(vnode_count))
181            .collect()
182    }
183
184    /// Equivalent to [`Self::compute_chunk`] with [`VirtualNode::COUNT_FOR_TEST`] as the vnode count.
185    pub fn compute_chunk_for_test(data_chunk: &DataChunk, keys: &[usize]) -> Vec<VirtualNode> {
186        Self::compute_chunk(data_chunk, keys, Self::COUNT_FOR_TEST)
187    }
188
189    // `compute_row` is used to calculate the `VirtualNode` for the corresponding columns in a
190    // `Row`. Similar to `compute_chunk`, it also contains special handling for serial columns.
191    pub fn compute_row(row: impl Row, indices: &[usize], vnode_count: usize) -> VirtualNode {
192        let project = row.project(indices);
193        if let Ok(Some(ScalarRefImpl::Serial(s))) = Itertools::exactly_one(project.iter()).as_ref()
194        {
195            return compute_vnode_from_row_id(s.as_row_id(), vnode_count);
196        }
197
198        project.hash(Crc32FastBuilder).to_vnode(vnode_count)
199    }
200
201    /// Equivalent to [`Self::compute_row`] with [`VirtualNode::COUNT_FOR_TEST`] as the vnode count.
202    pub fn compute_row_for_test(row: impl Row, indices: &[usize]) -> VirtualNode {
203        Self::compute_row(row, indices, Self::COUNT_FOR_TEST)
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use crate::array::DataChunkTestExt;
211    use crate::row::OwnedRow;
212    use crate::util::row_id::RowIdGenerator;
213
214    #[test]
215    fn test_serial_key_chunk() {
216        let mut r#gen =
217            RowIdGenerator::new([VirtualNode::from_index(100)], VirtualNode::COUNT_FOR_TEST);
218        let chunk = format!(
219            "SRL I
220             {} 1
221             {} 2",
222            r#gen.next(),
223            r#gen.next(),
224        );
225
226        let chunk = DataChunk::from_pretty(chunk.as_str());
227        let vnodes = VirtualNode::compute_chunk_for_test(&chunk, &[0]);
228
229        assert_eq!(
230            vnodes.as_slice(),
231            &[VirtualNode::from_index(100), VirtualNode::from_index(100)]
232        );
233    }
234
235    #[test]
236    fn test_serial_key_row() {
237        let mut r#gen =
238            RowIdGenerator::new([VirtualNode::from_index(100)], VirtualNode::COUNT_FOR_TEST);
239        let row = OwnedRow::new(vec![
240            Some(ScalarImpl::Serial(r#gen.next().into())),
241            Some(ScalarImpl::Int64(12345)),
242        ]);
243
244        let vnode = VirtualNode::compute_row_for_test(&row, &[0]);
245
246        assert_eq!(vnode, VirtualNode::from_index(100));
247    }
248
249    #[test]
250    fn test_serial_key_chunk_multiple_vnodes() {
251        let mut r#gen = RowIdGenerator::new(
252            [100, 200].map(VirtualNode::from_index),
253            VirtualNode::COUNT_FOR_TEST,
254        );
255        let chunk = format!(
256            "SRL I
257             {} 1
258             {} 2
259             {} 3
260             {} 4",
261            r#gen.next(),
262            r#gen.next(),
263            r#gen.next(),
264            r#gen.next(),
265        );
266
267        let chunk = DataChunk::from_pretty(chunk.as_str());
268        let vnodes = VirtualNode::compute_chunk_for_test(&chunk, &[0]);
269
270        assert_eq!(
271            vnodes.as_slice(),
272            &[
273                VirtualNode::from_index(100),
274                VirtualNode::from_index(200),
275                VirtualNode::from_index(100),
276                VirtualNode::from_index(200),
277            ]
278        );
279    }
280}