risingwave_common/hash/
key.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module contains implementation for hash key serialization for
16//! hash-agg, hash-join, and perhaps other hash-based operators.
17//!
18//! There may be multiple columns in one row being combined and encoded into
19//! one single hash key.
20//! For example, `SELECT sum(t.a) FROM t GROUP BY t.b, t.c`, the hash keys
21//! are encoded from both `t.b` and `t.c`. If `t.b="abc"` and `t.c=1`, the hashkey may be
22//! encoded in certain format of `("abc", 1)`.
23
24use std::default::Default;
25use std::fmt::Debug;
26use std::hash::{BuildHasher, Hasher};
27use std::marker::PhantomData;
28
29use bytes::{Buf, BufMut};
30use chrono::{Datelike, Timelike};
31use fixedbitset::FixedBitSet;
32use risingwave_common_estimate_size::EstimateSize;
33use smallbitset::Set64;
34use static_assertions::const_assert_eq;
35
36use crate::array::{ListValue, MapValue, StructValue};
37use crate::types::{
38    DataType, Date, Decimal, F32, F64, Int256, Int256Ref, JsonbVal, Scalar, ScalarRef,
39    ScalarRefImpl, Serial, Time, Timestamp, Timestamptz,
40};
41use crate::util::hash_util::{Crc32FastBuilder, XxHash64Builder};
42use crate::util::sort_util::OrderType;
43use crate::util::{memcmp_encoding, value_encoding};
44
45/// This is determined by the stack based data structure we use,
46/// `StackNullBitmap`, which can store 64 bits at most.
47pub static MAX_GROUP_KEYS_ON_STACK: usize = 64;
48
49/// Null bitmap on heap.
50/// We use this for the **edge case** where group key sizes are larger than 64.
51/// This is because group key null bits cannot fit into a u64 on the stack
52/// if they exceed 64 bits.
53/// NOTE(kwannoel): This is not really optimized as it is an edge case.
54#[repr(transparent)]
55#[derive(Clone, Debug, PartialEq)]
56pub struct HeapNullBitmap {
57    inner: FixedBitSet,
58}
59
60impl HeapNullBitmap {
61    fn with_capacity(n: usize) -> Self {
62        HeapNullBitmap {
63            inner: FixedBitSet::with_capacity(n),
64        }
65    }
66}
67
68/// Null Bitmap on stack.
69/// This is specialized for the common case where group keys (<= 64).
70#[repr(transparent)]
71#[derive(Clone, Debug, PartialEq)]
72pub struct StackNullBitmap {
73    inner: Set64,
74}
75
76const_assert_eq!(
77    std::mem::size_of::<StackNullBitmap>(),
78    std::mem::size_of::<u64>()
79);
80
81const_assert_eq!(
82    std::mem::size_of::<HeapNullBitmap>(),
83    std::mem::size_of::<usize>() * 4,
84);
85
86/// We use a trait for `NullBitmap` so we can parameterize structs on it.
87/// This is because `NullBitmap` is used often, and we want it to occupy
88/// the minimal stack space.
89///
90/// ### Example
91/// ```rust
92/// use risingwave_common::hash::{NullBitmap, StackNullBitmap};
93/// struct A<B: NullBitmap> {
94///     null_bitmap: B,
95/// }
96/// ```
97///
98/// Then `A<StackNullBitmap>` occupies 64 bytes,
99/// and in cases which require it,
100/// `A<HeapNullBitmap>` will occupy 4 * usize bytes (on 64 bit arch that would be 256 bytes).
101pub trait NullBitmap: EstimateSize + Clone + PartialEq + Debug + Send + Sync + 'static {
102    fn empty() -> Self;
103
104    fn is_empty(&self) -> bool;
105
106    fn set_true(&mut self, idx: usize);
107
108    fn contains(&self, x: usize) -> bool;
109
110    fn is_subset(&self, other: &Self) -> bool;
111
112    fn from_bool_vec<T: AsRef<[bool]> + IntoIterator<Item = bool>>(value: T) -> Self;
113}
114
115impl NullBitmap for StackNullBitmap {
116    fn empty() -> Self {
117        StackNullBitmap {
118            inner: Set64::empty(),
119        }
120    }
121
122    fn is_empty(&self) -> bool {
123        self.inner.is_empty()
124    }
125
126    fn set_true(&mut self, idx: usize) {
127        self.inner.add_inplace(idx);
128    }
129
130    fn contains(&self, x: usize) -> bool {
131        self.inner.contains(x)
132    }
133
134    fn is_subset(&self, other: &Self) -> bool {
135        other.inner.contains_all(self.inner)
136    }
137
138    fn from_bool_vec<T: AsRef<[bool]> + IntoIterator<Item = bool>>(value: T) -> Self {
139        value.into()
140    }
141}
142
143impl NullBitmap for HeapNullBitmap {
144    fn empty() -> Self {
145        HeapNullBitmap {
146            inner: FixedBitSet::new(),
147        }
148    }
149
150    fn is_empty(&self) -> bool {
151        self.inner.is_empty()
152    }
153
154    fn set_true(&mut self, idx: usize) {
155        self.inner.grow(idx + 1);
156        self.inner.insert(idx)
157    }
158
159    fn contains(&self, x: usize) -> bool {
160        self.inner.contains(x)
161    }
162
163    fn is_subset(&self, other: &Self) -> bool {
164        self.inner.is_subset(&other.inner)
165    }
166
167    fn from_bool_vec<T: AsRef<[bool]> + IntoIterator<Item = bool>>(value: T) -> Self {
168        value.into()
169    }
170}
171
172impl EstimateSize for StackNullBitmap {
173    fn estimated_heap_size(&self) -> usize {
174        0
175    }
176}
177
178impl EstimateSize for HeapNullBitmap {
179    fn estimated_heap_size(&self) -> usize {
180        self.inner.estimated_heap_size()
181    }
182}
183
184impl<T: AsRef<[bool]> + IntoIterator<Item = bool>> From<T> for StackNullBitmap {
185    fn from(value: T) -> Self {
186        let mut bitmap = StackNullBitmap::empty();
187        for (idx, is_true) in value.into_iter().enumerate() {
188            if is_true {
189                bitmap.set_true(idx);
190            }
191        }
192        bitmap
193    }
194}
195
196impl<T: AsRef<[bool]> + IntoIterator<Item = bool>> From<T> for HeapNullBitmap {
197    fn from(value: T) -> Self {
198        let mut bitmap = HeapNullBitmap::with_capacity(value.as_ref().len());
199        for (idx, is_true) in value.into_iter().enumerate() {
200            if is_true {
201                bitmap.set_true(idx);
202            }
203        }
204        bitmap
205    }
206}
207
208/// A wrapper for u64 hash result. Generic over the hasher.
209#[derive(Default, Clone, Copy)]
210pub struct HashCode<T: 'static + BuildHasher> {
211    value: u64,
212    _phantom: PhantomData<&'static T>,
213}
214
215impl<T: BuildHasher> Debug for HashCode<T> {
216    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217        f.debug_struct("HashCode")
218            .field("value", &self.value)
219            .finish()
220    }
221}
222
223impl<T: BuildHasher> PartialEq for HashCode<T> {
224    fn eq(&self, other: &Self) -> bool {
225        self.value == other.value
226    }
227}
228
229impl<T: BuildHasher> From<u64> for HashCode<T> {
230    fn from(hash_code: u64) -> Self {
231        Self {
232            value: hash_code,
233            _phantom: PhantomData,
234        }
235    }
236}
237
238impl<T: BuildHasher> HashCode<T> {
239    pub fn value(&self) -> u64 {
240        self.value
241    }
242}
243
244/// Hash code from the `Crc32` hasher. Used for hash-shuffle exchange.
245pub type Crc32HashCode = HashCode<Crc32FastBuilder>;
246/// Hash code from the `XxHash64` hasher. Used for in-memory hash map cache.
247pub type XxHash64HashCode = HashCode<XxHash64Builder>;
248
249/// A special hasher designed for [`HashKey`], which stores a hash key from `HashKey::hash()` and
250/// outputs it on `finish()`.
251///
252/// We need this because we compute hash keys in vectorized fashion, and we store them in this
253/// hasher.
254///
255/// WARN: This should ONLY be used along with [`HashKey`].
256///
257/// [`HashKey`]: crate::hash::HashKey
258#[derive(Default)]
259pub struct PrecomputedHasher {
260    hash_code: u64,
261}
262
263impl Hasher for PrecomputedHasher {
264    fn finish(&self) -> u64 {
265        self.hash_code
266    }
267
268    fn write_u64(&mut self, i: u64) {
269        assert_eq!(self.hash_code, 0);
270        self.hash_code = i;
271    }
272
273    fn write(&mut self, _bytes: &[u8]) {
274        unreachable!("must writes from HashKey with write_u64")
275    }
276}
277
278#[derive(Default, Clone)]
279pub struct PrecomputedBuildHasher;
280
281impl BuildHasher for PrecomputedBuildHasher {
282    type Hasher = PrecomputedHasher;
283
284    fn build_hasher(&self) -> Self::Hasher {
285        PrecomputedHasher::default()
286    }
287}
288
289/// Extension of scalars to be serialized into hash keys.
290///
291/// The `exact_size` and `estimated_size` methods are used to estimate the size of the serialized
292/// hash key, so that we can pre-allocate the buffer for it.
293/// - override `exact_size` if the serialized size is known for this scalar type;
294/// - override `estimated_size` if the serialized size varies for different values of this scalar
295///   type, but we can estimate it.
296///
297/// NOTE: The hash key encoding algorithm needs to respect the implementation of `Hash` and `Eq` on
298/// scalar types, which is exactly the same behavior of the data types under `GROUP BY` or
299/// `PARTITION BY` in PostgreSQL. For example, `Decimal(1.0)` vs `Decimal(1.00)`, or `Interval(24
300/// hour)` vs `Interval(1 day)` are considered equal here.
301///
302/// This means that...
303/// - delegating to the value encoding can be **incorrect** for some types, as they reflect the
304///   in-memory representation faithfully;
305/// - delegating to the memcmp encoding should be always safe, but they put extra effort to also
306///   preserve the property of `Ord`.
307///
308/// So for some scalar types we have to maintain a separate implementation here. For those that can
309/// be delegated to other encoding algorithms, we can use macros of
310/// `impl_memcmp_encoding_hash_key_serde!` and `impl_value_encoding_hash_key_serde!` here.
311pub trait HashKeySer<'a>: ScalarRef<'a> {
312    /// Serialize the scalar into the given buffer.
313    fn serialize_into(self, buf: impl BufMut);
314
315    /// Returns `Some` if the serialized size is known for this scalar type.
316    fn exact_size() -> Option<usize> {
317        None
318    }
319
320    /// Returns the estimated serialized size for this scalar.
321    fn estimated_size(self) -> usize {
322        Self::exact_size().unwrap_or(1) // use a default size of 1 if not known
323        // this should never happen in practice as we always
324        // implement one of these two methods
325    }
326}
327
328/// The deserialization counterpart of [`HashKeySer`].
329pub trait HashKeyDe: Scalar {
330    fn deserialize(data_type: &DataType, buf: impl Buf) -> Self;
331}
332
333macro_rules! impl_value_encoding_hash_key_serde {
334    ($owned_ty:ty) => {
335        // TODO: extra boxing to `ScalarRefImpl` and encoding for `NonNull` tag is
336        // unnecessary here. After we resolve them, we can make more types directly delegate
337        // to this implementation.
338        impl<'a> HashKeySer<'a> for <$owned_ty as Scalar>::ScalarRefType<'a> {
339            fn serialize_into(self, mut buf: impl BufMut) {
340                value_encoding::serialize_datum_into(Some(ScalarRefImpl::from(self)), &mut buf);
341            }
342
343            fn estimated_size(self) -> usize {
344                value_encoding::estimate_serialize_datum_size(Some(ScalarRefImpl::from(self)))
345            }
346        }
347
348        impl HashKeyDe for $owned_ty {
349            fn deserialize(data_type: &DataType, buf: impl Buf) -> Self {
350                let scalar = value_encoding::deserialize_datum(buf, data_type)
351                    .expect("in-memory deserialize should never fail")
352                    .expect("datum should never be NULL");
353
354                // TODO: extra unboxing from `ScalarRefImpl` is unnecessary here.
355                scalar.try_into().unwrap()
356            }
357        }
358    };
359}
360
361macro_rules! impl_memcmp_encoding_hash_key_serde {
362    ($owned_ty:ty) => {
363        impl<'a> HashKeySer<'a> for <$owned_ty as Scalar>::ScalarRefType<'a> {
364            fn serialize_into(self, buf: impl BufMut) {
365                let mut serializer = memcomparable::Serializer::new(buf);
366                // TODO: extra boxing to `ScalarRefImpl` and encoding for `NonNull` tag is
367                // unnecessary here.
368                memcmp_encoding::serialize_datum(
369                    Some(ScalarRefImpl::from(self)),
370                    OrderType::ascending(),
371                    &mut serializer,
372                )
373                .expect("serialize should never fail");
374            }
375
376            // TODO: estimate size for memcmp encoding.
377            fn estimated_size(self) -> usize {
378                1
379            }
380        }
381
382        impl HashKeyDe for $owned_ty {
383            fn deserialize(data_type: &DataType, buf: impl Buf) -> Self {
384                let mut deserializer = memcomparable::Deserializer::new(buf);
385                let scalar = memcmp_encoding::deserialize_datum(
386                    data_type,
387                    OrderType::ascending(),
388                    &mut deserializer,
389                )
390                .expect("in-memory deserialize should never fail")
391                .expect("datum should never be NULL");
392
393                // TODO: extra unboxing from `ScalarRefImpl` is unnecessary here.
394                scalar.try_into().unwrap()
395            }
396        }
397    };
398}
399
400impl HashKeySer<'_> for bool {
401    fn serialize_into(self, mut buf: impl BufMut) {
402        buf.put_u8(if self { 1 } else { 0 });
403    }
404
405    fn exact_size() -> Option<usize> {
406        Some(1)
407    }
408}
409
410impl HashKeyDe for bool {
411    fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
412        buf.get_u8() == 1
413    }
414}
415
416impl HashKeySer<'_> for i16 {
417    fn serialize_into(self, mut buf: impl BufMut) {
418        buf.put_i16_ne(self);
419    }
420
421    fn exact_size() -> Option<usize> {
422        Some(2)
423    }
424}
425
426impl HashKeyDe for i16 {
427    fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
428        buf.get_i16_ne()
429    }
430}
431
432impl HashKeySer<'_> for i32 {
433    fn serialize_into(self, mut buf: impl BufMut) {
434        buf.put_i32_ne(self);
435    }
436
437    fn exact_size() -> Option<usize> {
438        Some(4)
439    }
440}
441
442impl HashKeyDe for i32 {
443    fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
444        buf.get_i32_ne()
445    }
446}
447
448impl HashKeySer<'_> for i64 {
449    fn serialize_into(self, mut buf: impl BufMut) {
450        buf.put_i64_ne(self);
451    }
452
453    fn exact_size() -> Option<usize> {
454        Some(8)
455    }
456}
457
458impl HashKeyDe for i64 {
459    fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
460        buf.get_i64_ne()
461    }
462}
463
464impl<'a> HashKeySer<'a> for Int256Ref<'a> {
465    fn serialize_into(self, mut buf: impl BufMut) {
466        let b = self.to_ne_bytes();
467        buf.put_slice(b.as_ref());
468    }
469
470    fn exact_size() -> Option<usize> {
471        Some(32)
472    }
473}
474
475impl HashKeyDe for Int256 {
476    fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
477        let mut value = [0; 32];
478        buf.copy_to_slice(&mut value);
479        Self::from_ne_bytes(value)
480    }
481}
482
483impl HashKeySer<'_> for Serial {
484    fn serialize_into(self, mut buf: impl BufMut) {
485        buf.put_i64_ne(self.as_row_id());
486    }
487
488    fn exact_size() -> Option<usize> {
489        Some(8)
490    }
491}
492
493impl HashKeyDe for Serial {
494    fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
495        buf.get_i64_ne().into()
496    }
497}
498
499impl HashKeySer<'_> for F32 {
500    fn serialize_into(self, mut buf: impl BufMut) {
501        buf.put_f32_ne(self.normalized().0);
502    }
503
504    fn exact_size() -> Option<usize> {
505        Some(4)
506    }
507}
508
509impl HashKeyDe for F32 {
510    fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
511        buf.get_f32_ne().into()
512    }
513}
514
515impl HashKeySer<'_> for F64 {
516    fn serialize_into(self, mut buf: impl BufMut) {
517        buf.put_f64_ne(self.normalized().0);
518    }
519
520    fn exact_size() -> Option<usize> {
521        Some(8)
522    }
523}
524
525impl HashKeyDe for F64 {
526    fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
527        buf.get_f64_ne().into()
528    }
529}
530
531impl HashKeySer<'_> for Decimal {
532    fn serialize_into(self, mut buf: impl BufMut) {
533        let b = Decimal::unordered_serialize(&self.normalize());
534        buf.put_slice(b.as_ref());
535    }
536
537    fn exact_size() -> Option<usize> {
538        Some(16)
539    }
540}
541
542impl HashKeyDe for Decimal {
543    fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
544        let mut value = [0; 16];
545        buf.copy_to_slice(&mut value);
546        Self::unordered_deserialize(value)
547    }
548}
549
550impl HashKeySer<'_> for Date {
551    fn serialize_into(self, mut buf: impl BufMut) {
552        let b = self.0.num_days_from_ce().to_ne_bytes();
553        buf.put_slice(b.as_ref());
554    }
555
556    fn exact_size() -> Option<usize> {
557        Some(4)
558    }
559}
560
561impl HashKeyDe for Date {
562    fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
563        let days = buf.get_i32_ne();
564        Date::with_days_since_ce(days).unwrap()
565    }
566}
567
568impl HashKeySer<'_> for Timestamp {
569    fn serialize_into(self, mut buf: impl BufMut) {
570        buf.put_i64_ne(self.0.and_utc().timestamp());
571        buf.put_u32_ne(self.0.and_utc().timestamp_subsec_nanos());
572    }
573
574    fn exact_size() -> Option<usize> {
575        Some(12)
576    }
577}
578
579impl HashKeyDe for Timestamp {
580    fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
581        let secs = buf.get_i64_ne();
582        let nsecs = buf.get_u32_ne();
583        Timestamp::with_secs_nsecs(secs, nsecs).unwrap()
584    }
585}
586
587impl HashKeySer<'_> for Time {
588    fn serialize_into(self, mut buf: impl BufMut) {
589        buf.put_u32_ne(self.0.num_seconds_from_midnight());
590        buf.put_u32_ne(self.0.nanosecond());
591    }
592
593    fn exact_size() -> Option<usize> {
594        Some(8)
595    }
596}
597
598impl HashKeyDe for Time {
599    fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
600        let secs = buf.get_u32_ne();
601        let nano = buf.get_u32_ne();
602        Time::with_secs_nano(secs, nano).unwrap()
603    }
604}
605
606impl HashKeySer<'_> for Timestamptz {
607    fn serialize_into(self, mut buf: impl BufMut) {
608        buf.put_i64_ne(self.timestamp_micros());
609    }
610
611    fn exact_size() -> Option<usize> {
612        Some(8)
613    }
614}
615
616impl HashKeyDe for Timestamptz {
617    fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
618        Timestamptz::from_micros(buf.get_i64_ne())
619    }
620}
621
622impl_value_encoding_hash_key_serde!(Box<str>);
623impl_value_encoding_hash_key_serde!(Box<[u8]>);
624impl_value_encoding_hash_key_serde!(JsonbVal);
625
626// It's possible there's `Decimal` or `Interval` in these composite types, so we currently always
627// use the memcmp encoding for safety.
628impl_memcmp_encoding_hash_key_serde!(StructValue);
629impl_memcmp_encoding_hash_key_serde!(ListValue);
630impl_memcmp_encoding_hash_key_serde!(MapValue);
631
632#[cfg(test)]
633mod tests {
634    use std::collections::HashMap;
635    use std::str::FromStr;
636    use std::sync::Arc;
637
638    use itertools::Itertools;
639
640    use super::*;
641    use crate::array::{
642        ArrayBuilder, ArrayBuilderImpl, ArrayImpl, BoolArray, DataChunk, DataChunkTestExt,
643        DateArray, DecimalArray, F32Array, F64Array, I16Array, I32Array, I32ArrayBuilder, I64Array,
644        TimeArray, TimestampArray, Utf8Array,
645    };
646    use crate::hash::{HashKey, Key16, Key32, Key64, Key128, Key256, KeySerialized};
647    use crate::test_utils::rand_array::seed_rand_array_ref;
648    use crate::types::Datum;
649
650    #[derive(Hash, PartialEq, Eq)]
651    struct Row(Vec<Datum>);
652
653    fn generate_random_data_chunk() -> (DataChunk, Vec<DataType>) {
654        let capacity = 128;
655        let seed = 10244021u64;
656        let columns = vec![
657            seed_rand_array_ref::<BoolArray>(capacity, seed, 0.5),
658            seed_rand_array_ref::<I16Array>(capacity, seed + 1, 0.5),
659            seed_rand_array_ref::<I32Array>(capacity, seed + 2, 0.5),
660            seed_rand_array_ref::<I64Array>(capacity, seed + 3, 0.5),
661            seed_rand_array_ref::<F32Array>(capacity, seed + 4, 0.5),
662            seed_rand_array_ref::<F64Array>(capacity, seed + 5, 0.5),
663            seed_rand_array_ref::<DecimalArray>(capacity, seed + 6, 0.5),
664            seed_rand_array_ref::<Utf8Array>(capacity, seed + 7, 0.5),
665            seed_rand_array_ref::<DateArray>(capacity, seed + 8, 0.5),
666            seed_rand_array_ref::<TimeArray>(capacity, seed + 9, 0.5),
667            seed_rand_array_ref::<TimestampArray>(capacity, seed + 10, 0.5),
668        ];
669        let types = vec![
670            DataType::Boolean,
671            DataType::Int16,
672            DataType::Int32,
673            DataType::Int64,
674            DataType::Float32,
675            DataType::Float64,
676            DataType::Decimal,
677            DataType::Varchar,
678            DataType::Date,
679            DataType::Time,
680            DataType::Timestamp,
681        ];
682
683        (DataChunk::new(columns, capacity), types)
684    }
685
686    fn do_test_serialize<K: HashKey, F>(column_indexes: Vec<usize>, data_gen: F)
687    where
688        F: FnOnce() -> DataChunk,
689    {
690        let data = data_gen();
691
692        let mut actual_row_id_mapping = HashMap::<usize, Vec<usize>>::with_capacity(100);
693        {
694            let mut fast_hash_map =
695                HashMap::<K, Vec<usize>, PrecomputedBuildHasher>::with_capacity_and_hasher(
696                    100,
697                    PrecomputedBuildHasher,
698                );
699            let keys = K::build_many(column_indexes.as_slice(), &data);
700
701            for (row_id, key) in keys.into_iter().enumerate() {
702                let row_ids = fast_hash_map.entry(key).or_default();
703                row_ids.push(row_id);
704            }
705
706            for row_ids in fast_hash_map.values() {
707                for row_id in row_ids {
708                    actual_row_id_mapping.insert(*row_id, row_ids.clone());
709                }
710            }
711        }
712
713        let mut expected_row_id_mapping = HashMap::<usize, Vec<usize>>::with_capacity(100);
714        {
715            let mut normal_hash_map: HashMap<Row, Vec<usize>> = HashMap::with_capacity(100);
716            for row_idx in 0..data.capacity() {
717                let row = column_indexes
718                    .iter()
719                    .map(|col_idx| data.column_at(*col_idx))
720                    .map(|col| col.datum_at(row_idx))
721                    .collect::<Vec<Datum>>();
722
723                normal_hash_map.entry(Row(row)).or_default().push(row_idx);
724            }
725
726            for row_ids in normal_hash_map.values() {
727                for row_id in row_ids {
728                    expected_row_id_mapping.insert(*row_id, row_ids.clone());
729                }
730            }
731        }
732
733        assert_eq!(expected_row_id_mapping, actual_row_id_mapping);
734    }
735
736    fn do_test_deserialize<K: HashKey, F>(column_indexes: Vec<usize>, data_gen: F)
737    where
738        F: FnOnce() -> (DataChunk, Vec<DataType>),
739    {
740        let (data, types) = data_gen();
741        let keys = K::build_many(column_indexes.as_slice(), &data);
742
743        let mut array_builders = column_indexes
744            .iter()
745            .map(|idx| data.columns()[*idx].create_builder(1024))
746            .collect::<Vec<ArrayBuilderImpl>>();
747        let key_types: Vec<_> = column_indexes
748            .iter()
749            .map(|idx| types[*idx].clone())
750            .collect();
751
752        keys.into_iter()
753            .try_for_each(|k| k.deserialize_to_builders(&mut array_builders[..], &key_types))
754            .expect("Failed to deserialize!");
755
756        let result_arrays = array_builders
757            .into_iter()
758            .map(|array_builder| array_builder.finish())
759            .collect::<Vec<ArrayImpl>>();
760
761        for (ret_idx, col_idx) in column_indexes.iter().enumerate() {
762            assert_eq!(&*data.columns()[*col_idx], &result_arrays[ret_idx]);
763        }
764    }
765
766    fn do_test<K: HashKey, F>(column_indexes: Vec<usize>, data_gen: F)
767    where
768        F: FnOnce() -> (DataChunk, Vec<DataType>),
769    {
770        let (data, types) = data_gen();
771
772        let data1 = data.clone();
773        do_test_serialize::<K, _>(column_indexes.clone(), move || data1);
774        do_test_deserialize::<K, _>(column_indexes, move || (data, types));
775    }
776
777    #[test]
778    fn test_two_bytes_hash_key() {
779        do_test::<Key16, _>(vec![1], generate_random_data_chunk);
780    }
781
782    #[test]
783    fn test_four_bytes_hash_key() {
784        do_test::<Key32, _>(vec![0, 1], generate_random_data_chunk);
785        do_test::<Key32, _>(vec![2], generate_random_data_chunk);
786        do_test::<Key32, _>(vec![4], generate_random_data_chunk);
787    }
788
789    #[test]
790    fn test_eight_bytes_hash_key() {
791        do_test::<Key64, _>(vec![1, 2], generate_random_data_chunk);
792        do_test::<Key64, _>(vec![0, 1, 2], generate_random_data_chunk);
793        do_test::<Key64, _>(vec![3], generate_random_data_chunk);
794        do_test::<Key64, _>(vec![5], generate_random_data_chunk);
795    }
796
797    #[test]
798    fn test_128_bits_hash_key() {
799        do_test::<Key128, _>(vec![3, 5], generate_random_data_chunk);
800        do_test::<Key128, _>(vec![6], generate_random_data_chunk);
801    }
802
803    #[test]
804    fn test_256_bits_hash_key() {
805        do_test::<Key256, _>(vec![3, 5, 6], generate_random_data_chunk);
806        do_test::<Key256, _>(vec![3, 6], generate_random_data_chunk);
807    }
808
809    #[test]
810    fn test_var_length_hash_key() {
811        do_test::<KeySerialized, _>(vec![0, 7], generate_random_data_chunk);
812    }
813
814    fn generate_decimal_test_data() -> (DataChunk, Vec<DataType>) {
815        let columns = vec![Arc::new(
816            DecimalArray::from_iter([
817                Some(Decimal::from_str("1.2").unwrap()),
818                None,
819                Some(Decimal::from_str("1.200").unwrap()),
820                Some(Decimal::from_str("0.00").unwrap()),
821                Some(Decimal::from_str("0.0").unwrap()),
822            ])
823            .into(),
824        )];
825        let types = vec![DataType::Decimal];
826
827        (DataChunk::new(columns, 5), types)
828    }
829
830    #[test]
831    fn test_decimal_hash_key_serialization() {
832        do_test::<Key128, _>(vec![0], generate_decimal_test_data);
833    }
834
835    // Simple test to ensure a row <None, Some(2)> will be serialized and restored
836    // losslessly.
837    #[test]
838    fn test_simple_hash_key_nullable_serde() {
839        let keys = Key64::build_many(
840            &[0, 1],
841            &DataChunk::from_pretty(
842                "i i
843                 1 .
844                 . 2",
845            ),
846        );
847
848        let mut array_builders = [0, 1]
849            .iter()
850            .map(|_| ArrayBuilderImpl::Int32(I32ArrayBuilder::new(2)))
851            .collect::<Vec<_>>();
852
853        keys.into_iter().for_each(|k: Key64| {
854            k.deserialize_to_builders(&mut array_builders[..], &[DataType::Int32, DataType::Int32])
855                .unwrap()
856        });
857
858        let array = array_builders.pop().unwrap().finish();
859        let i32_vec = array
860            .iter()
861            .map(|opt| opt.map(|s| s.into_int32()))
862            .collect_vec();
863        assert_eq!(i32_vec, vec![None, Some(2)]);
864    }
865}