1use std::default::Default;
25use std::fmt::Debug;
26use std::hash::{BuildHasher, Hasher};
27use std::marker::PhantomData;
28
29use bytes::{Buf, BufMut};
30use chrono::{Datelike, Timelike};
31use fixedbitset::FixedBitSet;
32use risingwave_common_estimate_size::EstimateSize;
33use smallbitset::Set64;
34use static_assertions::const_assert_eq;
35
36use crate::array::{ListValue, MapValue, StructValue, VectorVal};
37use crate::types::{
38 DataType, Date, Decimal, F32, F64, Int256, Int256Ref, JsonbVal, Scalar, ScalarRef,
39 ScalarRefImpl, Serial, Time, Timestamp, Timestamptz,
40};
41use crate::util::hash_util::{Crc32FastBuilder, XxHash64Builder};
42use crate::util::sort_util::OrderType;
43use crate::util::{memcmp_encoding, value_encoding};
44
45pub static MAX_GROUP_KEYS_ON_STACK: usize = 64;
48
49#[repr(transparent)]
55#[derive(Clone, Debug, PartialEq)]
56pub struct HeapNullBitmap {
57 inner: FixedBitSet,
58}
59
60impl HeapNullBitmap {
61 fn with_capacity(n: usize) -> Self {
62 HeapNullBitmap {
63 inner: FixedBitSet::with_capacity(n),
64 }
65 }
66}
67
68#[repr(transparent)]
71#[derive(Clone, Debug, PartialEq)]
72pub struct StackNullBitmap {
73 inner: Set64,
74}
75
76const_assert_eq!(
77 std::mem::size_of::<StackNullBitmap>(),
78 std::mem::size_of::<u64>()
79);
80
81const_assert_eq!(
82 std::mem::size_of::<HeapNullBitmap>(),
83 std::mem::size_of::<usize>() * 3,
84);
85
86pub trait NullBitmap: EstimateSize + Clone + PartialEq + Debug + Send + Sync + 'static {
102 fn empty() -> Self;
103
104 fn is_empty(&self) -> bool;
105
106 fn set_true(&mut self, idx: usize);
107
108 fn contains(&self, x: usize) -> bool;
109
110 fn is_subset(&self, other: &Self) -> bool;
111
112 fn from_bool_vec<T: AsRef<[bool]> + IntoIterator<Item = bool>>(value: T) -> Self;
113}
114
115impl NullBitmap for StackNullBitmap {
116 fn empty() -> Self {
117 StackNullBitmap {
118 inner: Set64::empty(),
119 }
120 }
121
122 fn is_empty(&self) -> bool {
123 self.inner.is_empty()
124 }
125
126 fn set_true(&mut self, idx: usize) {
127 self.inner.add_inplace(idx);
128 }
129
130 fn contains(&self, x: usize) -> bool {
131 self.inner.contains(x)
132 }
133
134 fn is_subset(&self, other: &Self) -> bool {
135 other.inner.contains_all(self.inner)
136 }
137
138 fn from_bool_vec<T: AsRef<[bool]> + IntoIterator<Item = bool>>(value: T) -> Self {
139 value.into()
140 }
141}
142
143impl NullBitmap for HeapNullBitmap {
144 fn empty() -> Self {
145 HeapNullBitmap {
146 inner: FixedBitSet::new(),
147 }
148 }
149
150 fn is_empty(&self) -> bool {
151 self.inner.is_empty()
152 }
153
154 fn set_true(&mut self, idx: usize) {
155 self.inner.grow(idx + 1);
156 self.inner.insert(idx)
157 }
158
159 fn contains(&self, x: usize) -> bool {
160 self.inner.contains(x)
161 }
162
163 fn is_subset(&self, other: &Self) -> bool {
164 self.inner.is_subset(&other.inner)
165 }
166
167 fn from_bool_vec<T: AsRef<[bool]> + IntoIterator<Item = bool>>(value: T) -> Self {
168 value.into()
169 }
170}
171
172impl EstimateSize for StackNullBitmap {
173 fn estimated_heap_size(&self) -> usize {
174 0
175 }
176}
177
178impl EstimateSize for HeapNullBitmap {
179 fn estimated_heap_size(&self) -> usize {
180 self.inner.estimated_heap_size()
181 }
182}
183
184impl<T: AsRef<[bool]> + IntoIterator<Item = bool>> From<T> for StackNullBitmap {
185 fn from(value: T) -> Self {
186 let mut bitmap = StackNullBitmap::empty();
187 for (idx, is_true) in value.into_iter().enumerate() {
188 if is_true {
189 bitmap.set_true(idx);
190 }
191 }
192 bitmap
193 }
194}
195
196impl<T: AsRef<[bool]> + IntoIterator<Item = bool>> From<T> for HeapNullBitmap {
197 fn from(value: T) -> Self {
198 let mut bitmap = HeapNullBitmap::with_capacity(value.as_ref().len());
199 for (idx, is_true) in value.into_iter().enumerate() {
200 if is_true {
201 bitmap.set_true(idx);
202 }
203 }
204 bitmap
205 }
206}
207
208#[derive(Default, Clone, Copy)]
210pub struct HashCode<T: 'static + BuildHasher> {
211 value: u64,
212 _phantom: PhantomData<&'static T>,
213}
214
215impl<T: BuildHasher> Debug for HashCode<T> {
216 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217 f.debug_struct("HashCode")
218 .field("value", &self.value)
219 .finish()
220 }
221}
222
223impl<T: BuildHasher> PartialEq for HashCode<T> {
224 fn eq(&self, other: &Self) -> bool {
225 self.value == other.value
226 }
227}
228
229impl<T: BuildHasher> From<u64> for HashCode<T> {
230 fn from(hash_code: u64) -> Self {
231 Self {
232 value: hash_code,
233 _phantom: PhantomData,
234 }
235 }
236}
237
238impl<T: BuildHasher> HashCode<T> {
239 pub fn value(&self) -> u64 {
240 self.value
241 }
242}
243
244pub type Crc32HashCode = HashCode<Crc32FastBuilder>;
246pub type XxHash64HashCode = HashCode<XxHash64Builder>;
248
249#[derive(Default)]
259pub struct PrecomputedHasher {
260 hash_code: u64,
261}
262
263impl Hasher for PrecomputedHasher {
264 fn finish(&self) -> u64 {
265 self.hash_code
266 }
267
268 fn write_u64(&mut self, i: u64) {
269 assert_eq!(self.hash_code, 0);
270 self.hash_code = i;
271 }
272
273 fn write(&mut self, _bytes: &[u8]) {
274 unreachable!("must writes from HashKey with write_u64")
275 }
276}
277
278#[derive(Default, Clone)]
279pub struct PrecomputedBuildHasher;
280
281impl BuildHasher for PrecomputedBuildHasher {
282 type Hasher = PrecomputedHasher;
283
284 fn build_hasher(&self) -> Self::Hasher {
285 PrecomputedHasher::default()
286 }
287}
288
289pub trait HashKeySer<'a>: ScalarRef<'a> {
312 fn serialize_into(self, buf: impl BufMut);
314
315 fn exact_size() -> Option<usize> {
317 None
318 }
319
320 fn estimated_size(self) -> usize {
322 Self::exact_size().unwrap_or(1) }
326}
327
328pub trait HashKeyDe: Scalar {
330 fn deserialize(data_type: &DataType, buf: impl Buf) -> Self;
331}
332
333macro_rules! impl_value_encoding_hash_key_serde {
334 ($owned_ty:ty) => {
335 impl<'a> HashKeySer<'a> for <$owned_ty as Scalar>::ScalarRefType<'a> {
339 fn serialize_into(self, mut buf: impl BufMut) {
340 value_encoding::serialize_datum_into(Some(ScalarRefImpl::from(self)), &mut buf);
341 }
342
343 fn estimated_size(self) -> usize {
344 value_encoding::estimate_serialize_datum_size(Some(ScalarRefImpl::from(self)))
345 }
346 }
347
348 impl HashKeyDe for $owned_ty {
349 fn deserialize(data_type: &DataType, buf: impl Buf) -> Self {
350 let scalar = value_encoding::deserialize_datum(buf, data_type)
351 .expect("in-memory deserialize should never fail")
352 .expect("datum should never be NULL");
353
354 scalar.try_into().unwrap()
356 }
357 }
358 };
359}
360
361macro_rules! impl_memcmp_encoding_hash_key_serde {
362 ($owned_ty:ty) => {
363 impl<'a> HashKeySer<'a> for <$owned_ty as Scalar>::ScalarRefType<'a> {
364 fn serialize_into(self, buf: impl BufMut) {
365 let mut serializer = memcomparable::Serializer::new(buf);
366 memcmp_encoding::serialize_datum(
369 Some(ScalarRefImpl::from(self)),
370 OrderType::ascending(),
371 &mut serializer,
372 )
373 .expect("serialize should never fail");
374 }
375
376 fn estimated_size(self) -> usize {
378 1
379 }
380 }
381
382 impl HashKeyDe for $owned_ty {
383 fn deserialize(data_type: &DataType, buf: impl Buf) -> Self {
384 let mut deserializer = memcomparable::Deserializer::new(buf);
385 let scalar = memcmp_encoding::deserialize_datum(
386 data_type,
387 OrderType::ascending(),
388 &mut deserializer,
389 )
390 .expect("in-memory deserialize should never fail")
391 .expect("datum should never be NULL");
392
393 scalar.try_into().unwrap()
395 }
396 }
397 };
398}
399
400impl HashKeySer<'_> for bool {
401 fn serialize_into(self, mut buf: impl BufMut) {
402 buf.put_u8(if self { 1 } else { 0 });
403 }
404
405 fn exact_size() -> Option<usize> {
406 Some(1)
407 }
408}
409
410impl HashKeyDe for bool {
411 fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
412 buf.get_u8() == 1
413 }
414}
415
416impl HashKeySer<'_> for i16 {
417 fn serialize_into(self, mut buf: impl BufMut) {
418 buf.put_i16_ne(self);
419 }
420
421 fn exact_size() -> Option<usize> {
422 Some(2)
423 }
424}
425
426impl HashKeyDe for i16 {
427 fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
428 buf.get_i16_ne()
429 }
430}
431
432impl HashKeySer<'_> for i32 {
433 fn serialize_into(self, mut buf: impl BufMut) {
434 buf.put_i32_ne(self);
435 }
436
437 fn exact_size() -> Option<usize> {
438 Some(4)
439 }
440}
441
442impl HashKeyDe for i32 {
443 fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
444 buf.get_i32_ne()
445 }
446}
447
448impl HashKeySer<'_> for i64 {
449 fn serialize_into(self, mut buf: impl BufMut) {
450 buf.put_i64_ne(self);
451 }
452
453 fn exact_size() -> Option<usize> {
454 Some(8)
455 }
456}
457
458impl HashKeyDe for i64 {
459 fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
460 buf.get_i64_ne()
461 }
462}
463
464impl<'a> HashKeySer<'a> for Int256Ref<'a> {
465 fn serialize_into(self, mut buf: impl BufMut) {
466 let b = self.to_ne_bytes();
467 buf.put_slice(b.as_ref());
468 }
469
470 fn exact_size() -> Option<usize> {
471 Some(32)
472 }
473}
474
475impl HashKeyDe for Int256 {
476 fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
477 let mut value = [0; 32];
478 buf.copy_to_slice(&mut value);
479 Self::from_ne_bytes(value)
480 }
481}
482
483impl HashKeySer<'_> for Serial {
484 fn serialize_into(self, mut buf: impl BufMut) {
485 buf.put_i64_ne(self.as_row_id());
486 }
487
488 fn exact_size() -> Option<usize> {
489 Some(8)
490 }
491}
492
493impl HashKeyDe for Serial {
494 fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
495 buf.get_i64_ne().into()
496 }
497}
498
499impl HashKeySer<'_> for F32 {
500 fn serialize_into(self, mut buf: impl BufMut) {
501 buf.put_f32_ne(self.normalized().0);
502 }
503
504 fn exact_size() -> Option<usize> {
505 Some(4)
506 }
507}
508
509impl HashKeyDe for F32 {
510 fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
511 buf.get_f32_ne().into()
512 }
513}
514
515impl HashKeySer<'_> for F64 {
516 fn serialize_into(self, mut buf: impl BufMut) {
517 buf.put_f64_ne(self.normalized().0);
518 }
519
520 fn exact_size() -> Option<usize> {
521 Some(8)
522 }
523}
524
525impl HashKeyDe for F64 {
526 fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
527 buf.get_f64_ne().into()
528 }
529}
530
531impl HashKeySer<'_> for Decimal {
532 fn serialize_into(self, mut buf: impl BufMut) {
533 let b = Decimal::unordered_serialize(&self.normalize());
534 buf.put_slice(b.as_ref());
535 }
536
537 fn exact_size() -> Option<usize> {
538 Some(16)
539 }
540}
541
542impl HashKeyDe for Decimal {
543 fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
544 let mut value = [0; 16];
545 buf.copy_to_slice(&mut value);
546 Self::unordered_deserialize(value)
547 }
548}
549
550impl HashKeySer<'_> for Date {
551 fn serialize_into(self, mut buf: impl BufMut) {
552 let b = self.0.num_days_from_ce().to_ne_bytes();
553 buf.put_slice(b.as_ref());
554 }
555
556 fn exact_size() -> Option<usize> {
557 Some(4)
558 }
559}
560
561impl HashKeyDe for Date {
562 fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
563 let days = buf.get_i32_ne();
564 Date::with_days_since_ce(days).unwrap()
565 }
566}
567
568impl HashKeySer<'_> for Timestamp {
569 fn serialize_into(self, mut buf: impl BufMut) {
570 buf.put_i64_ne(self.0.and_utc().timestamp());
571 buf.put_u32_ne(self.0.and_utc().timestamp_subsec_nanos());
572 }
573
574 fn exact_size() -> Option<usize> {
575 Some(12)
576 }
577}
578
579impl HashKeyDe for Timestamp {
580 fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
581 let secs = buf.get_i64_ne();
582 let nsecs = buf.get_u32_ne();
583 Timestamp::with_secs_nsecs(secs, nsecs).unwrap()
584 }
585}
586
587impl HashKeySer<'_> for Time {
588 fn serialize_into(self, mut buf: impl BufMut) {
589 buf.put_u32_ne(self.0.num_seconds_from_midnight());
590 buf.put_u32_ne(self.0.nanosecond());
591 }
592
593 fn exact_size() -> Option<usize> {
594 Some(8)
595 }
596}
597
598impl HashKeyDe for Time {
599 fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
600 let secs = buf.get_u32_ne();
601 let nano = buf.get_u32_ne();
602 Time::with_secs_nano(secs, nano).unwrap()
603 }
604}
605
606impl HashKeySer<'_> for Timestamptz {
607 fn serialize_into(self, mut buf: impl BufMut) {
608 buf.put_i64_ne(self.timestamp_micros());
609 }
610
611 fn exact_size() -> Option<usize> {
612 Some(8)
613 }
614}
615
616impl HashKeyDe for Timestamptz {
617 fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
618 Timestamptz::from_micros(buf.get_i64_ne())
619 }
620}
621
622impl_value_encoding_hash_key_serde!(Box<str>);
623impl_value_encoding_hash_key_serde!(Box<[u8]>);
624impl_value_encoding_hash_key_serde!(JsonbVal);
625
626impl_memcmp_encoding_hash_key_serde!(StructValue);
629impl_memcmp_encoding_hash_key_serde!(ListValue);
630impl_memcmp_encoding_hash_key_serde!(VectorVal);
631impl_memcmp_encoding_hash_key_serde!(MapValue);
632
633#[cfg(test)]
634mod tests {
635 use std::collections::HashMap;
636 use std::str::FromStr;
637 use std::sync::Arc;
638
639 use itertools::Itertools;
640
641 use super::*;
642 use crate::array::{
643 ArrayBuilder, ArrayBuilderImpl, ArrayImpl, BoolArray, DataChunk, DataChunkTestExt,
644 DateArray, DecimalArray, F32Array, F64Array, I16Array, I32Array, I32ArrayBuilder, I64Array,
645 TimeArray, TimestampArray, Utf8Array,
646 };
647 use crate::hash::{HashKey, Key16, Key32, Key64, Key128, Key256, KeySerialized};
648 use crate::test_utils::rand_array::seed_rand_array_ref;
649 use crate::types::Datum;
650
651 #[derive(Hash, PartialEq, Eq)]
652 struct Row(Vec<Datum>);
653
654 fn generate_random_data_chunk() -> (DataChunk, Vec<DataType>) {
655 let capacity = 128;
656 let seed = 10244021u64;
657 let columns = vec![
658 seed_rand_array_ref::<BoolArray>(capacity, seed, 0.5),
659 seed_rand_array_ref::<I16Array>(capacity, seed + 1, 0.5),
660 seed_rand_array_ref::<I32Array>(capacity, seed + 2, 0.5),
661 seed_rand_array_ref::<I64Array>(capacity, seed + 3, 0.5),
662 seed_rand_array_ref::<F32Array>(capacity, seed + 4, 0.5),
663 seed_rand_array_ref::<F64Array>(capacity, seed + 5, 0.5),
664 seed_rand_array_ref::<DecimalArray>(capacity, seed + 6, 0.5),
665 seed_rand_array_ref::<Utf8Array>(capacity, seed + 7, 0.5),
666 seed_rand_array_ref::<DateArray>(capacity, seed + 8, 0.5),
667 seed_rand_array_ref::<TimeArray>(capacity, seed + 9, 0.5),
668 seed_rand_array_ref::<TimestampArray>(capacity, seed + 10, 0.5),
669 ];
670 let types = vec![
671 DataType::Boolean,
672 DataType::Int16,
673 DataType::Int32,
674 DataType::Int64,
675 DataType::Float32,
676 DataType::Float64,
677 DataType::Decimal,
678 DataType::Varchar,
679 DataType::Date,
680 DataType::Time,
681 DataType::Timestamp,
682 ];
683
684 (DataChunk::new(columns, capacity), types)
685 }
686
687 fn do_test_serialize<K: HashKey, F>(column_indexes: Vec<usize>, data_gen: F)
688 where
689 F: FnOnce() -> DataChunk,
690 {
691 let data = data_gen();
692
693 let mut actual_row_id_mapping = HashMap::<usize, Vec<usize>>::with_capacity(100);
694 {
695 let mut fast_hash_map =
696 HashMap::<K, Vec<usize>, PrecomputedBuildHasher>::with_capacity_and_hasher(
697 100,
698 PrecomputedBuildHasher,
699 );
700 let keys = K::build_many(column_indexes.as_slice(), &data);
701
702 for (row_id, key) in keys.into_iter().enumerate() {
703 let row_ids = fast_hash_map.entry(key).or_default();
704 row_ids.push(row_id);
705 }
706
707 for row_ids in fast_hash_map.values() {
708 for row_id in row_ids {
709 actual_row_id_mapping.insert(*row_id, row_ids.clone());
710 }
711 }
712 }
713
714 let mut expected_row_id_mapping = HashMap::<usize, Vec<usize>>::with_capacity(100);
715 {
716 let mut normal_hash_map: HashMap<Row, Vec<usize>> = HashMap::with_capacity(100);
717 for row_idx in 0..data.capacity() {
718 let row = column_indexes
719 .iter()
720 .map(|col_idx| data.column_at(*col_idx))
721 .map(|col| col.datum_at(row_idx))
722 .collect::<Vec<Datum>>();
723
724 normal_hash_map.entry(Row(row)).or_default().push(row_idx);
725 }
726
727 for row_ids in normal_hash_map.values() {
728 for row_id in row_ids {
729 expected_row_id_mapping.insert(*row_id, row_ids.clone());
730 }
731 }
732 }
733
734 assert_eq!(expected_row_id_mapping, actual_row_id_mapping);
735 }
736
737 fn do_test_deserialize<K: HashKey, F>(column_indexes: Vec<usize>, data_gen: F)
738 where
739 F: FnOnce() -> (DataChunk, Vec<DataType>),
740 {
741 let (data, types) = data_gen();
742 let keys = K::build_many(column_indexes.as_slice(), &data);
743
744 let mut array_builders = column_indexes
745 .iter()
746 .map(|idx| data.columns()[*idx].create_builder(1024))
747 .collect::<Vec<ArrayBuilderImpl>>();
748 let key_types: Vec<_> = column_indexes
749 .iter()
750 .map(|idx| types[*idx].clone())
751 .collect();
752
753 keys.into_iter()
754 .try_for_each(|k| k.deserialize_to_builders(&mut array_builders[..], &key_types))
755 .expect("Failed to deserialize!");
756
757 let result_arrays = array_builders
758 .into_iter()
759 .map(|array_builder| array_builder.finish())
760 .collect::<Vec<ArrayImpl>>();
761
762 for (ret_idx, col_idx) in column_indexes.iter().enumerate() {
763 assert_eq!(&*data.columns()[*col_idx], &result_arrays[ret_idx]);
764 }
765 }
766
767 fn do_test<K: HashKey, F>(column_indexes: Vec<usize>, data_gen: F)
768 where
769 F: FnOnce() -> (DataChunk, Vec<DataType>),
770 {
771 let (data, types) = data_gen();
772
773 let data1 = data.clone();
774 do_test_serialize::<K, _>(column_indexes.clone(), move || data1);
775 do_test_deserialize::<K, _>(column_indexes, move || (data, types));
776 }
777
778 #[test]
779 fn test_two_bytes_hash_key() {
780 do_test::<Key16, _>(vec![1], generate_random_data_chunk);
781 }
782
783 #[test]
784 fn test_four_bytes_hash_key() {
785 do_test::<Key32, _>(vec![0, 1], generate_random_data_chunk);
786 do_test::<Key32, _>(vec![2], generate_random_data_chunk);
787 do_test::<Key32, _>(vec![4], generate_random_data_chunk);
788 }
789
790 #[test]
791 fn test_eight_bytes_hash_key() {
792 do_test::<Key64, _>(vec![1, 2], generate_random_data_chunk);
793 do_test::<Key64, _>(vec![0, 1, 2], generate_random_data_chunk);
794 do_test::<Key64, _>(vec![3], generate_random_data_chunk);
795 do_test::<Key64, _>(vec![5], generate_random_data_chunk);
796 }
797
798 #[test]
799 fn test_128_bits_hash_key() {
800 do_test::<Key128, _>(vec![3, 5], generate_random_data_chunk);
801 do_test::<Key128, _>(vec![6], generate_random_data_chunk);
802 }
803
804 #[test]
805 fn test_256_bits_hash_key() {
806 do_test::<Key256, _>(vec![3, 5, 6], generate_random_data_chunk);
807 do_test::<Key256, _>(vec![3, 6], generate_random_data_chunk);
808 }
809
810 #[test]
811 fn test_var_length_hash_key() {
812 do_test::<KeySerialized, _>(vec![0, 7], generate_random_data_chunk);
813 }
814
815 fn generate_decimal_test_data() -> (DataChunk, Vec<DataType>) {
816 let columns = vec![Arc::new(
817 DecimalArray::from_iter([
818 Some(Decimal::from_str("1.2").unwrap()),
819 None,
820 Some(Decimal::from_str("1.200").unwrap()),
821 Some(Decimal::from_str("0.00").unwrap()),
822 Some(Decimal::from_str("0.0").unwrap()),
823 ])
824 .into(),
825 )];
826 let types = vec![DataType::Decimal];
827
828 (DataChunk::new(columns, 5), types)
829 }
830
831 #[test]
832 fn test_decimal_hash_key_serialization() {
833 do_test::<Key128, _>(vec![0], generate_decimal_test_data);
834 }
835
836 #[test]
839 fn test_simple_hash_key_nullable_serde() {
840 let keys = Key64::build_many(
841 &[0, 1],
842 &DataChunk::from_pretty(
843 "i i
844 1 .
845 . 2",
846 ),
847 );
848
849 let mut array_builders = [0, 1]
850 .iter()
851 .map(|_| ArrayBuilderImpl::Int32(I32ArrayBuilder::new(2)))
852 .collect::<Vec<_>>();
853
854 keys.into_iter().for_each(|k: Key64| {
855 k.deserialize_to_builders(&mut array_builders[..], &[DataType::Int32, DataType::Int32])
856 .unwrap()
857 });
858
859 let array = array_builders.pop().unwrap().finish();
860 let i32_vec = array
861 .iter()
862 .map(|opt| opt.map(|s| s.into_int32()))
863 .collect_vec();
864 assert_eq!(i32_vec, vec![None, Some(2)]);
865 }
866}