1use std::fmt::Debug;
16use std::hash::{Hash, Hasher};
17use std::marker::PhantomData;
18
19use bytes::BufMut;
20use educe::Educe;
21use either::{Either, for_both};
22use itertools::Itertools;
23use risingwave_common_estimate_size::EstimateSize;
24use tinyvec::ArrayVec;
25
26use super::{HeapNullBitmap, NullBitmap, XxHash64HashCode};
27use crate::array::{Array, ArrayBuilder, ArrayBuilderImpl, ArrayResult, DataChunk};
28use crate::hash::{HashKeyDe, HashKeySer};
29use crate::row::OwnedRow;
30use crate::types::{DataType, Datum, ScalarImpl};
31use crate::util::hash_util::XxHash64Builder;
32use crate::util::iter_util::ZipEqFast;
33use crate::{dispatch_array_builder_variants, dispatch_array_variants, dispatch_data_types};
34
35pub trait KeyStorage: 'static {
37 type Key: AsRef<[u8]> + EstimateSize + Clone + Send + Sync + 'static;
39
40 type Buffer: Buffer<Sealed = Self::Key>;
42}
43
44pub trait Buffer: BufMut + 'static {
46 type Sealed;
48
49 fn alloc() -> bool;
51
52 fn with_capacity(cap: usize) -> Self;
54
55 fn seal(self) -> Self::Sealed;
57}
58
59pub struct StackStorage<const N: usize>;
63
64impl<const N: usize> KeyStorage for StackStorage<N> {
65 type Buffer = StackBuffer<N>;
66 type Key = [u8; N];
67}
68
69pub struct StackBuffer<const N: usize>(ArrayVec<[u8; N]>);
71
72unsafe impl<const N: usize> BufMut for StackBuffer<N> {
73 #[inline]
74 fn remaining_mut(&self) -> usize {
75 self.0.grab_spare_slice().len()
76 }
77
78 #[inline]
79 unsafe fn advance_mut(&mut self, cnt: usize) {
80 self.0.set_len(self.0.len() + cnt);
81 }
82
83 #[inline]
84 fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
85 unsafe { &mut *(self.0.grab_spare_slice_mut() as *mut [u8] as *mut _) }
88 }
89
90 #[inline]
91 fn put_slice(&mut self, src: &[u8]) {
92 self.0.extend_from_slice(src);
95 }
96}
97
98impl<const N: usize> Buffer for StackBuffer<N> {
99 type Sealed = [u8; N];
100
101 fn alloc() -> bool {
102 false
103 }
104
105 fn with_capacity(_cap: usize) -> Self {
106 Self(ArrayVec::new())
108 }
109
110 fn seal(self) -> Self::Sealed {
111 self.0.into_inner()
112 }
113}
114
115pub struct HeapStorage;
119
120impl KeyStorage for HeapStorage {
121 type Buffer = Vec<u8>;
122 type Key = Box<[u8]>; }
124
125impl Buffer for Vec<u8> {
126 type Sealed = Box<[u8]>;
127
128 fn alloc() -> bool {
129 true
130 }
131
132 fn with_capacity(cap: usize) -> Self {
133 Self::with_capacity(cap)
134 }
135
136 fn seal(self) -> Self::Sealed {
137 self.into_boxed_slice()
138 }
139}
140
141struct Serializer<S: KeyStorage, N: NullBitmap> {
143 buffer: S::Buffer,
144 null_bitmap: N,
145 idx: usize,
146 hash_code: XxHash64HashCode,
147}
148
149impl<S: KeyStorage, N: NullBitmap> Serializer<S, N> {
150 fn new(buffer: S::Buffer, hash_code: XxHash64HashCode) -> Self {
151 Self {
152 buffer,
153 null_bitmap: N::empty(),
154 idx: 0,
155 hash_code,
156 }
157 }
158
159 fn serialize<'a, D: HashKeySer<'a>>(&mut self, datum: Option<D>) {
161 match datum {
162 Some(scalar) => HashKeySer::serialize_into(scalar, &mut self.buffer),
163 None => self.null_bitmap.set_true(self.idx),
164 }
165 self.idx += 1;
166 }
167
168 fn finish(self) -> HashKeyImpl<S, N> {
170 HashKeyImpl {
171 hash_code: self.hash_code,
172 key: self.buffer.seal(),
173 null_bitmap: self.null_bitmap,
174 }
175 }
176}
177
178struct Deserializer<'a, S: KeyStorage, N: NullBitmap> {
180 key: &'a [u8],
181 null_bitmap: &'a N,
182 idx: usize,
183 _phantom: PhantomData<&'a S::Key>,
184}
185
186impl<'a, S: KeyStorage, N: NullBitmap> Deserializer<'a, S, N> {
187 fn new(key: &'a S::Key, null_bitmap: &'a N) -> Self {
188 Self {
189 key: key.as_ref(),
190 null_bitmap,
191 idx: 0,
192 _phantom: PhantomData,
193 }
194 }
195
196 fn deserialize<D: HashKeyDe>(&mut self, data_type: &DataType) -> Option<D> {
198 let datum = if !self.null_bitmap.contains(self.idx) {
199 Some(HashKeyDe::deserialize(data_type, &mut self.key))
200 } else {
201 None
202 };
203
204 self.idx += 1;
205 datum
206 }
207
208 fn deserialize_impl(&mut self, data_type: &DataType) -> Datum {
210 dispatch_data_types!(data_type, [S = Scalar], {
211 self.deserialize::<S>(data_type).map(ScalarImpl::from)
212 })
213 }
214}
215
216pub trait HashKey:
222 EstimateSize + Clone + Debug + Hash + Eq + Sized + Send + Sync + 'static
223{
224 type Bitmap: NullBitmap;
226
227 fn build_many(column_indices: &[usize], data_chunk: &DataChunk) -> Vec<Self>;
229
230 fn deserialize(&self, data_types: &[DataType]) -> ArrayResult<OwnedRow>;
232
233 fn deserialize_to_builders(
235 &self,
236 array_builders: &mut [ArrayBuilderImpl],
237 data_types: &[DataType],
238 ) -> ArrayResult<()>;
239
240 fn null_bitmap(&self) -> &Self::Bitmap;
242}
243
244#[derive(Educe)]
249#[educe(Clone)]
250pub struct HashKeyImpl<S: KeyStorage, N: NullBitmap> {
251 hash_code: XxHash64HashCode,
252 key: S::Key,
253 null_bitmap: N,
254}
255
256impl<S: KeyStorage, N: NullBitmap> Hash for HashKeyImpl<S, N> {
257 fn hash<H: Hasher>(&self, state: &mut H) {
258 state.write_u64(self.hash_code.value());
260 }
261}
262
263impl<S: KeyStorage, N: NullBitmap> PartialEq for HashKeyImpl<S, N> {
264 fn eq(&self, other: &Self) -> bool {
265 self.hash_code == other.hash_code
267 && self.key.as_ref() == other.key.as_ref()
268 && self.null_bitmap == other.null_bitmap
269 }
270}
271impl<S: KeyStorage, N: NullBitmap> Eq for HashKeyImpl<S, N> {}
272
273impl<S: KeyStorage, N: NullBitmap> std::fmt::Debug for HashKeyImpl<S, N> {
274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 f.debug_struct("HashKey")
276 .field("key", &self.key.as_ref())
277 .finish_non_exhaustive()
278 }
279}
280
281impl<S: KeyStorage, N: NullBitmap> EstimateSize for HashKeyImpl<S, N> {
282 fn estimated_heap_size(&self) -> usize {
283 self.key.estimated_heap_size() + self.null_bitmap.estimated_heap_size()
284 }
285}
286
287impl<S: KeyStorage, N: NullBitmap> HashKey for HashKeyImpl<S, N> {
288 type Bitmap = N;
289
290 fn build_many(column_indices: &[usize], data_chunk: &DataChunk) -> Vec<Self> {
291 let hash_codes = data_chunk.get_hash_values(column_indices, XxHash64Builder);
292
293 let mut serializers = {
294 let buffers = if S::Buffer::alloc() {
295 let estimated_key_sizes = data_chunk.estimate_hash_key_sizes(column_indices);
297
298 Either::Left(
299 estimated_key_sizes
300 .into_iter()
301 .map(S::Buffer::with_capacity),
302 )
303 } else {
304 Either::Right((0..data_chunk.capacity()).map(|_| S::Buffer::with_capacity(0)))
305 };
306
307 for_both!(buffers, buffers =>
308 hash_codes
309 .into_iter()
310 .zip_eq_fast(buffers)
311 .map(|(hash_code, buffer)| Serializer::new(buffer, hash_code))
312 .collect_vec()
313 )
314 };
315
316 for &i in column_indices {
317 let array = data_chunk.column_at(i).as_ref();
318
319 dispatch_array_variants!(array, array, {
321 for i in data_chunk.visibility().iter_ones() {
322 unsafe { serializers[i].serialize(array.value_at_unchecked(i)) }
324 }
325 });
326 }
327
328 serializers.into_iter().map(|s| s.finish()).collect()
329 }
330
331 fn deserialize(&self, data_types: &[DataType]) -> ArrayResult<OwnedRow> {
332 let mut deserializer = Deserializer::<S, N>::new(&self.key, &self.null_bitmap);
333 let mut row = Vec::with_capacity(data_types.len());
334
335 for data_type in data_types {
336 let datum = deserializer.deserialize_impl(data_type);
337 row.push(datum);
338 }
339
340 Ok(OwnedRow::new(row))
341 }
342
343 fn deserialize_to_builders(
344 &self,
345 array_builders: &mut [ArrayBuilderImpl],
346 data_types: &[DataType],
347 ) -> ArrayResult<()> {
348 let mut deserializer = Deserializer::<S, N>::new(&self.key, &self.null_bitmap);
349
350 for (data_type, array_builder) in data_types.iter().zip_eq_fast(array_builders.iter_mut()) {
351 dispatch_array_builder_variants!(array_builder, array_builder, {
353 let datum = deserializer.deserialize(data_type);
354 array_builder.append_owned(datum);
355 });
356 }
357
358 Ok(())
359 }
360
361 fn null_bitmap(&self) -> &Self::Bitmap {
362 &self.null_bitmap
363 }
364}
365
366#[easy_ext::ext]
367impl DataChunk {
368 fn estimate_hash_key_sizes(&self, column_indices: &[usize]) -> Vec<usize> {
369 let mut estimated_column_indices = Vec::new();
370 let mut exact_size = 0;
371
372 for &i in column_indices {
373 dispatch_array_variants!(&*self.columns()[i], [S = ScalarRef], {
374 match S::exact_size() {
375 Some(size) => exact_size += size,
376 None => estimated_column_indices.push(i),
377 }
378 })
379 }
380 let mut sizes = vec![exact_size; self.capacity()];
381
382 for i in estimated_column_indices {
383 dispatch_array_variants!(&*self.columns()[i], col, {
384 for i in self.visibility().iter_ones() {
385 unsafe {
387 if let Some(scalar) = col.value_at_unchecked(i) {
388 sizes[i] += HashKeySer::estimated_size(scalar);
389 }
390 }
391 }
392 })
393 }
394
395 sizes
396 }
397}
398
399pub type FixedSizeKey<const N: usize, B> = HashKeyImpl<StackStorage<N>, B>;
400pub type Key8<B = HeapNullBitmap> = FixedSizeKey<1, B>;
401pub type Key16<B = HeapNullBitmap> = FixedSizeKey<2, B>;
402pub type Key32<B = HeapNullBitmap> = FixedSizeKey<4, B>;
403pub type Key64<B = HeapNullBitmap> = FixedSizeKey<8, B>;
404pub type Key128<B = HeapNullBitmap> = FixedSizeKey<16, B>;
405pub type Key256<B = HeapNullBitmap> = FixedSizeKey<32, B>;
406pub type KeySerialized<B = HeapNullBitmap> = SerializedKey<B>;
407
408pub type SerializedKey<B = HeapNullBitmap> = HashKeyImpl<HeapStorage, B>;