risingwave_common/hash/
key_v2.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::hash::{Hash, Hasher};
17use std::marker::PhantomData;
18
19use bytes::BufMut;
20use educe::Educe;
21use either::{Either, for_both};
22use itertools::Itertools;
23use risingwave_common_estimate_size::EstimateSize;
24use tinyvec::ArrayVec;
25
26use super::{HeapNullBitmap, NullBitmap, XxHash64HashCode};
27use crate::array::{Array, ArrayBuilder, ArrayBuilderImpl, ArrayResult, DataChunk};
28use crate::hash::{HashKeyDe, HashKeySer};
29use crate::row::OwnedRow;
30use crate::types::{DataType, Datum, ScalarImpl};
31use crate::util::hash_util::XxHash64Builder;
32use crate::util::iter_util::ZipEqFast;
33use crate::{dispatch_array_builder_variants, dispatch_array_variants, dispatch_data_types};
34
35/// The storage where the hash key resides in memory.
36pub trait KeyStorage: 'static {
37    /// The key type that is used to store the hash key.
38    type Key: AsRef<[u8]> + EstimateSize + Clone + Send + Sync + 'static;
39
40    /// The buffer type that is used to build the hash key.
41    type Buffer: Buffer<Sealed = Self::Key>;
42}
43
44/// Associated type for [`KeyStorage`] used to build the hash key.
45pub trait Buffer: BufMut + 'static {
46    /// The sealed key type.
47    type Sealed;
48
49    /// Returns whether this buffer allocates on the heap.
50    fn alloc() -> bool;
51
52    /// Creates a new buffer with the given capacity.
53    fn with_capacity(cap: usize) -> Self;
54
55    /// Seals the buffer and returns the sealed key.
56    fn seal(self) -> Self::Sealed;
57}
58
59/// Key storage that uses a on-stack buffer and key, backed by a byte array with length `N`.
60///
61/// Used when the encoded length of the hash key is known to be less than or equal to `N`.
62pub struct StackStorage<const N: usize>;
63
64impl<const N: usize> KeyStorage for StackStorage<N> {
65    type Buffer = StackBuffer<N>;
66    type Key = [u8; N];
67}
68
69/// The buffer for building a hash key on a fixed-size byte array on the stack.
70pub struct StackBuffer<const N: usize>(ArrayVec<[u8; N]>);
71
72unsafe impl<const N: usize> BufMut for StackBuffer<N> {
73    #[inline]
74    fn remaining_mut(&self) -> usize {
75        self.0.grab_spare_slice().len()
76    }
77
78    #[inline]
79    unsafe fn advance_mut(&mut self, cnt: usize) {
80        self.0.set_len(self.0.len() + cnt);
81    }
82
83    #[inline]
84    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
85        // SAFETY: copied from the implementation of `impl BufMut for &mut [u8]`.
86        // UninitSlice is repr(transparent), so safe to transmute
87        unsafe { &mut *(self.0.grab_spare_slice_mut() as *mut [u8] as *mut _) }
88    }
89
90    #[inline]
91    fn put_slice(&mut self, src: &[u8]) {
92        // Specialize this method to avoid the `while` loop for inconsecutive slices in the default
93        // implementation.
94        self.0.extend_from_slice(src);
95    }
96}
97
98impl<const N: usize> Buffer for StackBuffer<N> {
99    type Sealed = [u8; N];
100
101    fn alloc() -> bool {
102        false
103    }
104
105    fn with_capacity(_cap: usize) -> Self {
106        // Ignore the capacity.
107        Self(ArrayVec::new())
108    }
109
110    fn seal(self) -> Self::Sealed {
111        self.0.into_inner()
112    }
113}
114
115/// Key storage that uses a in-heap buffer and key, backed by a boxed slice.
116///
117/// Used when the encoded length of the hash key might be arbitrarily large.
118pub struct HeapStorage;
119
120impl KeyStorage for HeapStorage {
121    type Buffer = Vec<u8>;
122    type Key = Box<[u8]>; // To avoid unnecessary spare spaces.
123}
124
125impl Buffer for Vec<u8> {
126    type Sealed = Box<[u8]>;
127
128    fn alloc() -> bool {
129        true
130    }
131
132    fn with_capacity(cap: usize) -> Self {
133        Self::with_capacity(cap)
134    }
135
136    fn seal(self) -> Self::Sealed {
137        self.into_boxed_slice()
138    }
139}
140
141/// Serializer for building a hash key from datums on the specified storage.
142struct Serializer<S: KeyStorage, N: NullBitmap> {
143    buffer: S::Buffer,
144    null_bitmap: N,
145    idx: usize,
146    hash_code: XxHash64HashCode,
147}
148
149impl<S: KeyStorage, N: NullBitmap> Serializer<S, N> {
150    fn new(buffer: S::Buffer, hash_code: XxHash64HashCode) -> Self {
151        Self {
152            buffer,
153            null_bitmap: N::empty(),
154            idx: 0,
155            hash_code,
156        }
157    }
158
159    /// Serializes a generic datum into the hash key.
160    fn serialize<'a, D: HashKeySer<'a>>(&mut self, datum: Option<D>) {
161        match datum {
162            Some(scalar) => HashKeySer::serialize_into(scalar, &mut self.buffer),
163            None => self.null_bitmap.set_true(self.idx),
164        }
165        self.idx += 1;
166    }
167
168    /// Finishes serialization and returns the hash key.
169    fn finish(self) -> HashKeyImpl<S, N> {
170        HashKeyImpl {
171            hash_code: self.hash_code,
172            key: self.buffer.seal(),
173            null_bitmap: self.null_bitmap,
174        }
175    }
176}
177
178/// Deserializer for deserializing a hash key into datums.
179struct Deserializer<'a, S: KeyStorage, N: NullBitmap> {
180    key: &'a [u8],
181    null_bitmap: &'a N,
182    idx: usize,
183    _phantom: PhantomData<&'a S::Key>,
184}
185
186impl<'a, S: KeyStorage, N: NullBitmap> Deserializer<'a, S, N> {
187    fn new(key: &'a S::Key, null_bitmap: &'a N) -> Self {
188        Self {
189            key: key.as_ref(),
190            null_bitmap,
191            idx: 0,
192            _phantom: PhantomData,
193        }
194    }
195
196    /// Deserializes a generic datum from the hash key.
197    fn deserialize<D: HashKeyDe>(&mut self, data_type: &DataType) -> Option<D> {
198        let datum = if !self.null_bitmap.contains(self.idx) {
199            Some(HashKeyDe::deserialize(data_type, &mut self.key))
200        } else {
201            None
202        };
203
204        self.idx += 1;
205        datum
206    }
207
208    /// Deserializes a type-erased datum from the hash key.
209    fn deserialize_impl(&mut self, data_type: &DataType) -> Datum {
210        dispatch_data_types!(data_type, [S = Scalar], {
211            self.deserialize::<S>(data_type).map(ScalarImpl::from)
212        })
213    }
214}
215
216/// Trait for different kinds of hash keys.
217///
218/// Current comparison implementation treats `null == null`. This is consistent with postgresql's
219/// group by implementation, but not join. In pg's join implementation, `null != null`, and the join
220/// executor should take care of this.
221pub trait HashKey:
222    EstimateSize + Clone + Debug + Hash + Eq + Sized + Send + Sync + 'static
223{
224    // TODO: rename to `NullBitmap` and note that high bit represents null!
225    type Bitmap: NullBitmap;
226
227    /// Build hash keys from the given `data_chunk` with `column_indices` in a batch.
228    fn build_many(column_indices: &[usize], data_chunk: &DataChunk) -> Vec<Self>;
229
230    /// Deserializes the hash key into a row.
231    fn deserialize(&self, data_types: &[DataType]) -> ArrayResult<OwnedRow>;
232
233    /// Deserializes the hash key into array builders.
234    fn deserialize_to_builders(
235        &self,
236        array_builders: &mut [ArrayBuilderImpl],
237        data_types: &[DataType],
238    ) -> ArrayResult<()>;
239
240    /// Get the null bitmap of the hash key.
241    fn null_bitmap(&self) -> &Self::Bitmap;
242}
243
244/// The implementation of the hash key.
245///
246/// - Precompute the hash code of the key to accelerate the hash table look-up.
247/// - Serialize the key into a compact byte buffer to save the memory usage.
248#[derive(Educe)]
249#[educe(Clone)]
250pub struct HashKeyImpl<S: KeyStorage, N: NullBitmap> {
251    hash_code: XxHash64HashCode,
252    key: S::Key,
253    null_bitmap: N,
254}
255
256impl<S: KeyStorage, N: NullBitmap> Hash for HashKeyImpl<S, N> {
257    fn hash<H: Hasher>(&self, state: &mut H) {
258        // Caveat: this should only be used along with `PrecomputedHashBuilder`.
259        state.write_u64(self.hash_code.value());
260    }
261}
262
263impl<S: KeyStorage, N: NullBitmap> PartialEq for HashKeyImpl<S, N> {
264    fn eq(&self, other: &Self) -> bool {
265        // Compare the hash code first for short-circuit.
266        self.hash_code == other.hash_code
267            && self.key.as_ref() == other.key.as_ref()
268            && self.null_bitmap == other.null_bitmap
269    }
270}
271impl<S: KeyStorage, N: NullBitmap> Eq for HashKeyImpl<S, N> {}
272
273impl<S: KeyStorage, N: NullBitmap> std::fmt::Debug for HashKeyImpl<S, N> {
274    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275        f.debug_struct("HashKey")
276            .field("key", &self.key.as_ref())
277            .finish_non_exhaustive()
278    }
279}
280
281impl<S: KeyStorage, N: NullBitmap> EstimateSize for HashKeyImpl<S, N> {
282    fn estimated_heap_size(&self) -> usize {
283        self.key.estimated_heap_size() + self.null_bitmap.estimated_heap_size()
284    }
285}
286
287impl<S: KeyStorage, N: NullBitmap> HashKey for HashKeyImpl<S, N> {
288    type Bitmap = N;
289
290    fn build_many(column_indices: &[usize], data_chunk: &DataChunk) -> Vec<Self> {
291        let hash_codes = data_chunk.get_hash_values(column_indices, XxHash64Builder);
292
293        let mut serializers = {
294            let buffers = if S::Buffer::alloc() {
295                // Pre-estimate the key size to avoid reallocation as much as possible.
296                let estimated_key_sizes = data_chunk.estimate_hash_key_sizes(column_indices);
297
298                Either::Left(
299                    estimated_key_sizes
300                        .into_iter()
301                        .map(S::Buffer::with_capacity),
302                )
303            } else {
304                Either::Right((0..data_chunk.capacity()).map(|_| S::Buffer::with_capacity(0)))
305            };
306
307            for_both!(buffers, buffers =>
308                hash_codes
309                    .into_iter()
310                    .zip_eq_fast(buffers)
311                    .map(|(hash_code, buffer)| Serializer::new(buffer, hash_code))
312                    .collect_vec()
313            )
314        };
315
316        for &i in column_indices {
317            let array = data_chunk.column_at(i).as_ref();
318
319            // Dispatch types once to accelerate the inner call.
320            dispatch_array_variants!(array, array, {
321                for i in data_chunk.visibility().iter_ones() {
322                    // SAFETY(value_at_unchecked): the idx is always in bound.
323                    unsafe { serializers[i].serialize(array.value_at_unchecked(i)) }
324                }
325            });
326        }
327
328        serializers.into_iter().map(|s| s.finish()).collect()
329    }
330
331    fn deserialize(&self, data_types: &[DataType]) -> ArrayResult<OwnedRow> {
332        let mut deserializer = Deserializer::<S, N>::new(&self.key, &self.null_bitmap);
333        let mut row = Vec::with_capacity(data_types.len());
334
335        for data_type in data_types {
336            let datum = deserializer.deserialize_impl(data_type);
337            row.push(datum);
338        }
339
340        Ok(OwnedRow::new(row))
341    }
342
343    fn deserialize_to_builders(
344        &self,
345        array_builders: &mut [ArrayBuilderImpl],
346        data_types: &[DataType],
347    ) -> ArrayResult<()> {
348        let mut deserializer = Deserializer::<S, N>::new(&self.key, &self.null_bitmap);
349
350        for (data_type, array_builder) in data_types.iter().zip_eq_fast(array_builders.iter_mut()) {
351            // Dispatch types once to accelerate the inner call.
352            dispatch_array_builder_variants!(array_builder, array_builder, {
353                let datum = deserializer.deserialize(data_type);
354                array_builder.append_owned(datum);
355            });
356        }
357
358        Ok(())
359    }
360
361    fn null_bitmap(&self) -> &Self::Bitmap {
362        &self.null_bitmap
363    }
364}
365
366#[easy_ext::ext]
367impl DataChunk {
368    fn estimate_hash_key_sizes(&self, column_indices: &[usize]) -> Vec<usize> {
369        let mut estimated_column_indices = Vec::new();
370        let mut exact_size = 0;
371
372        for &i in column_indices {
373            dispatch_array_variants!(&*self.columns()[i], [S = ScalarRef], {
374                match S::exact_size() {
375                    Some(size) => exact_size += size,
376                    None => estimated_column_indices.push(i),
377                }
378            })
379        }
380        let mut sizes = vec![exact_size; self.capacity()];
381
382        for i in estimated_column_indices {
383            dispatch_array_variants!(&*self.columns()[i], col, {
384                for i in self.visibility().iter_ones() {
385                    // SAFETY(value_at_unchecked): the idx is always in bound.
386                    unsafe {
387                        if let Some(scalar) = col.value_at_unchecked(i) {
388                            sizes[i] += HashKeySer::estimated_size(scalar);
389                        }
390                    }
391                }
392            })
393        }
394
395        sizes
396    }
397}
398
399pub type FixedSizeKey<const N: usize, B> = HashKeyImpl<StackStorage<N>, B>;
400pub type Key8<B = HeapNullBitmap> = FixedSizeKey<1, B>;
401pub type Key16<B = HeapNullBitmap> = FixedSizeKey<2, B>;
402pub type Key32<B = HeapNullBitmap> = FixedSizeKey<4, B>;
403pub type Key64<B = HeapNullBitmap> = FixedSizeKey<8, B>;
404pub type Key128<B = HeapNullBitmap> = FixedSizeKey<16, B>;
405pub type Key256<B = HeapNullBitmap> = FixedSizeKey<32, B>;
406pub type KeySerialized<B = HeapNullBitmap> = SerializedKey<B>;
407
408pub type SerializedKey<B = HeapNullBitmap> = HashKeyImpl<HeapStorage, B>;