1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
1415use itertools::Itertools;
16use parse_display::Display;
1718use crate::array::{Array, ArrayImpl, DataChunk};
19use crate::hash::Crc32HashCode;
20use crate::row::{Row, RowExt};
21use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl};
22use crate::util::hash_util::Crc32FastBuilder;
23use crate::util::row_id::compute_vnode_from_row_id;
2425/// `VirtualNode` (a.k.a. Vnode) is a minimal partition that a set of keys belong to. It is used for
26/// consistent hashing.
27#[repr(transparent)]
28#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
29#[display("{0}")]
30pub struct VirtualNode(VirtualNodeInner);
3132/// The internal representation of a virtual node id.
33///
34/// Note: not all bits of the inner representation might be used.
35type VirtualNodeInner = u16;
3637/// `vnode_count` must be provided to convert a hash code to a virtual node.
38///
39/// Use [`Crc32HashCodeToVnodeExt::to_vnode`] instead.
40impl !From<Crc32HashCode> for VirtualNode {}
4142#[easy_ext::ext(Crc32HashCodeToVnodeExt)]
43impl Crc32HashCode {
44/// Converts the hash code to a virtual node, based on the given total count of vnodes.
45fn to_vnode(self, vnode_count: usize) -> VirtualNode {
46// Take the least significant bits of the hash code.
47 // TODO: should we use the most significant bits?
48let inner = (self.value() % vnode_count as u64) as VirtualNodeInner;
49 VirtualNode(inner)
50 }
51}
5253impl VirtualNode {
54/// The total count of virtual nodes, for compatibility purposes **ONLY**.
55 ///
56 /// Typical use cases:
57 ///
58 /// - As the default value for the session configuration.
59 /// - As the vnode count for all streaming jobs, fragments, and tables that were created before
60 /// the variable vnode count support was introduced.
61 /// - As the vnode count for singletons.
62pub const COUNT_FOR_COMPAT: usize = 1 << 8;
63}
6465impl VirtualNode {
66/// The total count of virtual nodes, for testing purposes.
67pub const COUNT_FOR_TEST: usize = Self::COUNT_FOR_COMPAT;
68/// The maximum value of the virtual node, for testing purposes.
69pub const MAX_FOR_TEST: VirtualNode = VirtualNode::from_index(Self::COUNT_FOR_TEST - 1);
70}
7172impl VirtualNode {
73/// The maximum count of virtual nodes that fits in [`VirtualNodeInner`].
74 ///
75 /// Note that the most significant bit is not used. This is because we use signed integers (`i16`)
76 /// for the scalar representation, where overflow can be confusing in terms of ordering.
77// TODO(var-vnode): the only usage is in log-store, shall we update it by storing the vnode as
78 // bytea to enable 2^16 vnodes?
79pub const MAX_COUNT: usize = 1 << (VirtualNodeInner::BITS - 1);
80/// The maximum value of the virtual node that can be represented.
81 ///
82 /// Note that this is **NOT** the maximum value of the virtual node, which depends on the configuration.
83pub const MAX_REPRESENTABLE: VirtualNode = VirtualNode::from_index(Self::MAX_COUNT - 1);
84/// The size of a virtual node in bytes, in memory or serialized representation.
85pub const SIZE: usize = std::mem::size_of::<Self>();
86}
8788/// An iterator over all virtual nodes.
89pub type AllVirtualNodeIter = std::iter::Map<std::ops::Range<usize>, fn(usize) -> VirtualNode>;
9091impl VirtualNode {
92/// We may use `VirtualNode` as a datum in a stream, or store it as a column.
93 /// Hence this reifies it as a RW datatype.
94pub const RW_TYPE: DataType = DataType::Int16;
95/// The minimum (zero) value of the virtual node.
96pub const ZERO: VirtualNode = VirtualNode::from_index(0);
9798/// Creates a virtual node from the `usize` index.
99pub const fn from_index(index: usize) -> Self {
100debug_assert!(index < Self::MAX_COUNT);
101Self(index as _)
102 }
103104/// Returns the `usize` the virtual node used for indexing.
105pub const fn to_index(self) -> usize {
106self.0 as _
107}
108109/// Creates a virtual node from the given scalar representation. Used by `VNODE` expression.
110pub const fn from_scalar(scalar: i16) -> Self {
111debug_assert!(scalar >= 0);
112Self(scalar as _)
113 }
114115pub fn from_datum(datum: DatumRef<'_>) -> Self {
116Self::from_scalar(datum.expect("should not be none").into_int16())
117 }
118119/// Returns the scalar representation of the virtual node. Used by `VNODE` expression.
120pub const fn to_scalar(self) -> i16 {
121self.0 as _
122}
123124pub const fn to_datum(self) -> Datum {
125Some(ScalarImpl::Int16(self.to_scalar()))
126 }
127128/// Creates a virtual node from the given big-endian bytes representation.
129pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self {
130let inner = VirtualNodeInner::from_be_bytes(bytes);
131debug_assert!((inner as usize) < Self::MAX_COUNT);
132Self(inner)
133 }
134135/// Returns the big-endian bytes representation of the virtual node.
136pub const fn to_be_bytes(self) -> [u8; Self::SIZE] {
137self.0.to_be_bytes()
138 }
139140/// Iterates over all virtual nodes.
141pub fn all(vnode_count: usize) -> AllVirtualNodeIter {
142 (0..vnode_count).map(Self::from_index)
143 }
144}
145146impl VirtualNode {
147// `compute_chunk` is used to calculate the `VirtualNode` for the columns in the
148 // chunk. When only one column is provided and its type is `Serial`, we consider the column to
149 // be the one that contains RowId, and use a special method to skip the calculation of Hash
150 // and directly extract the `VirtualNode` from `RowId`.
151pub fn compute_chunk(
152 data_chunk: &DataChunk,
153 keys: &[usize],
154 vnode_count: usize,
155 ) -> Vec<VirtualNode> {
156if let Ok(idx) = keys.iter().exactly_one()
157 && let ArrayImpl::Serial(serial_array) = &**data_chunk.column_at(*idx)
158 {
159return serial_array
160 .iter()
161 .enumerate()
162 .map(|(idx, serial)| {
163if let Some(serial) = serial {
164 compute_vnode_from_row_id(serial.as_row_id(), vnode_count)
165 } else {
166// NOTE: here it will hash the entire row when the `_row_id` is missing,
167 // which could result in rows from the same chunk being allocated to different chunks.
168 // This process doesn’t guarantee the order of rows, producing indeterminate results in some cases,
169 // such as when `distinct on` is used without an `order by`.
170let (row, _) = data_chunk.row_at(idx);
171 row.hash(Crc32FastBuilder).to_vnode(vnode_count)
172 }
173 })
174 .collect();
175 }
176177 data_chunk
178 .get_hash_values(keys, Crc32FastBuilder)
179 .into_iter()
180 .map(|hash| hash.to_vnode(vnode_count))
181 .collect()
182 }
183184/// Equivalent to [`Self::compute_chunk`] with [`VirtualNode::COUNT_FOR_TEST`] as the vnode count.
185pub fn compute_chunk_for_test(data_chunk: &DataChunk, keys: &[usize]) -> Vec<VirtualNode> {
186Self::compute_chunk(data_chunk, keys, Self::COUNT_FOR_TEST)
187 }
188189// `compute_row` is used to calculate the `VirtualNode` for the corresponding columns in a
190 // `Row`. Similar to `compute_chunk`, it also contains special handling for serial columns.
191pub fn compute_row(row: impl Row, indices: &[usize], vnode_count: usize) -> VirtualNode {
192let project = row.project(indices);
193if let Ok(Some(ScalarRefImpl::Serial(s))) = project.iter().exactly_one().as_ref() {
194return compute_vnode_from_row_id(s.as_row_id(), vnode_count);
195 }
196197 project.hash(Crc32FastBuilder).to_vnode(vnode_count)
198 }
199200/// Equivalent to [`Self::compute_row`] with [`VirtualNode::COUNT_FOR_TEST`] as the vnode count.
201pub fn compute_row_for_test(row: impl Row, indices: &[usize]) -> VirtualNode {
202Self::compute_row(row, indices, Self::COUNT_FOR_TEST)
203 }
204}
205206#[cfg(test)]
207mod tests {
208use super::*;
209use crate::array::DataChunkTestExt;
210use crate::row::OwnedRow;
211use crate::util::row_id::RowIdGenerator;
212213#[test]
214fn test_serial_key_chunk() {
215let mut r#gen =
216 RowIdGenerator::new([VirtualNode::from_index(100)], VirtualNode::COUNT_FOR_TEST);
217let chunk = format!(
218"SRL I
219 {} 1
220 {} 2",
221 r#gen.next(),
222 r#gen.next(),
223 );
224225let chunk = DataChunk::from_pretty(chunk.as_str());
226let vnodes = VirtualNode::compute_chunk_for_test(&chunk, &[0]);
227228assert_eq!(
229 vnodes.as_slice(),
230&[VirtualNode::from_index(100), VirtualNode::from_index(100)]
231 );
232 }
233234#[test]
235fn test_serial_key_row() {
236let mut r#gen =
237 RowIdGenerator::new([VirtualNode::from_index(100)], VirtualNode::COUNT_FOR_TEST);
238let row = OwnedRow::new(vec![
239Some(ScalarImpl::Serial(r#gen.next().into())),
240Some(ScalarImpl::Int64(12345)),
241 ]);
242243let vnode = VirtualNode::compute_row_for_test(&row, &[0]);
244245assert_eq!(vnode, VirtualNode::from_index(100));
246 }
247248#[test]
249fn test_serial_key_chunk_multiple_vnodes() {
250let mut r#gen = RowIdGenerator::new(
251 [100, 200].map(VirtualNode::from_index),
252 VirtualNode::COUNT_FOR_TEST,
253 );
254let chunk = format!(
255"SRL I
256 {} 1
257 {} 2
258 {} 3
259 {} 4",
260 r#gen.next(),
261 r#gen.next(),
262 r#gen.next(),
263 r#gen.next(),
264 );
265266let chunk = DataChunk::from_pretty(chunk.as_str());
267let vnodes = VirtualNode::compute_chunk_for_test(&chunk, &[0]);
268269assert_eq!(
270 vnodes.as_slice(),
271&[
272 VirtualNode::from_index(100),
273 VirtualNode::from_index(200),
274 VirtualNode::from_index(100),
275 VirtualNode::from_index(200),
276 ]
277 );
278 }
279}