1pub mod arrow;
18mod bool_array;
19pub mod bytes_array;
20mod chrono_array;
21mod data_chunk;
22pub mod data_chunk_iter;
23mod decimal_array;
24pub mod error;
25pub mod interval_array;
26mod iterator;
27mod jsonb_array;
28pub mod list_array;
29mod map_array;
30mod num256_array;
31mod primitive_array;
32mod proto_reader;
33pub mod stream_chunk;
34pub mod stream_chunk_builder;
35mod stream_chunk_iter;
36pub mod stream_record;
37pub mod struct_array;
38mod utf8_array;
39mod vector_array;
40
41use std::convert::From;
42use std::hash::{Hash, Hasher};
43use std::sync::Arc;
44
45pub use bool_array::{BoolArray, BoolArrayBuilder};
46pub use bytes_array::*;
47pub use chrono_array::{
48 DateArray, DateArrayBuilder, TimeArray, TimeArrayBuilder, TimestampArray,
49 TimestampArrayBuilder, TimestamptzArray, TimestamptzArrayBuilder,
50};
51pub use data_chunk::{DataChunk, DataChunkTestExt};
52pub use data_chunk_iter::RowRef;
53pub use decimal_array::{DecimalArray, DecimalArrayBuilder};
54pub use interval_array::{IntervalArray, IntervalArrayBuilder};
55pub use iterator::ArrayIterator;
56pub use jsonb_array::{JsonbArray, JsonbArrayBuilder};
57pub use list_array::{ListArray, ListArrayBuilder, ListRef, ListValue};
58pub use map_array::{MapArray, MapArrayBuilder, MapRef, MapValue};
59use paste::paste;
60pub use primitive_array::{PrimitiveArray, PrimitiveArrayBuilder, PrimitiveArrayItemType};
61use risingwave_common_estimate_size::EstimateSize;
62use risingwave_pb::data::PbArray;
63pub use stream_chunk::{Op, StreamChunk, StreamChunkTestExt};
64pub use stream_chunk_builder::StreamChunkBuilder;
65pub use struct_array::{StructArray, StructArrayBuilder, StructRef, StructValue};
66pub use utf8_array::*;
67pub use vector_array::{
68 Finite32, VECTOR_DISTANCE_TYPE, VECTOR_ITEM_TYPE, VectorArray, VectorArrayBuilder,
69 VectorDistanceType, VectorItemType, VectorRef, VectorVal,
70};
71
72pub use self::error::ArrayError;
73pub use crate::array::num256_array::{Int256Array, Int256ArrayBuilder};
74use crate::bitmap::Bitmap;
75use crate::types::*;
76use crate::{dispatch_array_builder_variants, dispatch_array_variants, for_all_variants};
77pub type ArrayResult<T> = Result<T, ArrayError>;
78
79pub type I64Array = PrimitiveArray<i64>;
80pub type I32Array = PrimitiveArray<i32>;
81pub type I16Array = PrimitiveArray<i16>;
82pub type F64Array = PrimitiveArray<F64>;
83pub type F32Array = PrimitiveArray<F32>;
84pub type SerialArray = PrimitiveArray<Serial>;
85
86pub type I64ArrayBuilder = PrimitiveArrayBuilder<i64>;
87pub type I32ArrayBuilder = PrimitiveArrayBuilder<i32>;
88pub type I16ArrayBuilder = PrimitiveArrayBuilder<i16>;
89pub type F64ArrayBuilder = PrimitiveArrayBuilder<F64>;
90pub type F32ArrayBuilder = PrimitiveArrayBuilder<F32>;
91pub type SerialArrayBuilder = PrimitiveArrayBuilder<Serial>;
92
93pub type ArrayImplBuilder = ArrayBuilderImpl;
95
96pub(crate) const NULL_VAL_FOR_HASH: u32 = 0xfffffff0;
98
99pub trait ArrayBuilder: Send + Sync + Sized + 'static {
110 type ArrayType: Array<Builder = Self>;
112
113 fn new(capacity: usize) -> Self;
116
117 fn with_type(capacity: usize, ty: DataType) -> Self;
120
121 fn append_n(&mut self, n: usize, value: Option<<Self::ArrayType as Array>::RefItem<'_>>);
125
126 fn append(&mut self, value: Option<<Self::ArrayType as Array>::RefItem<'_>>) {
128 self.append_n(1, value);
129 }
130
131 fn append_owned(&mut self, value: Option<<Self::ArrayType as Array>::OwnedItem>) {
133 let value = value.as_ref().map(|s| s.as_scalar_ref());
134 self.append(value)
135 }
136
137 fn append_null(&mut self) {
138 self.append(None)
139 }
140
141 fn append_array(&mut self, other: &Self::ArrayType);
143
144 fn pop(&mut self) -> Option<()>;
152
153 fn append_array_element(&mut self, other: &Self::ArrayType, idx: usize) {
155 self.append(other.value_at(idx));
156 }
157
158 fn len(&self) -> usize;
160
161 fn is_empty(&self) -> bool {
163 self.len() == 0
164 }
165
166 fn finish(self) -> Self::ArrayType;
168}
169
170pub trait Array:
186 std::fmt::Debug + Send + Sync + Sized + 'static + Into<ArrayImpl> + EstimateSize
187{
188 type RefItem<'a>: ScalarRef<'a, ScalarType = Self::OwnedItem>
191 where
192 Self: 'a;
193
194 type OwnedItem: Clone
196 + std::fmt::Debug
197 + EstimateSize
198 + for<'a> Scalar<ScalarRefType<'a> = Self::RefItem<'a>>;
199
200 type Builder: ArrayBuilder<ArrayType = Self>;
202
203 unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_>;
212
213 #[inline]
215 fn value_at(&self, idx: usize) -> Option<Self::RefItem<'_>> {
216 if !self.is_null(idx) {
217 Some(unsafe { self.raw_value_at_unchecked(idx) })
219 } else {
220 None
221 }
222 }
223
224 #[inline]
228 unsafe fn value_at_unchecked(&self, idx: usize) -> Option<Self::RefItem<'_>> {
229 unsafe {
230 if !self.is_null_unchecked(idx) {
231 Some(self.raw_value_at_unchecked(idx))
232 } else {
233 None
234 }
235 }
236 }
237
238 fn len(&self) -> usize;
240
241 fn iter(&self) -> ArrayIterator<'_, Self> {
243 ArrayIterator::new(self)
244 }
245
246 fn raw_iter(&self) -> impl ExactSizeIterator<Item = Self::RefItem<'_>> {
251 (0..self.len()).map(|i| unsafe { self.raw_value_at_unchecked(i) })
252 }
253
254 fn to_protobuf(&self) -> PbArray;
256
257 fn null_bitmap(&self) -> &Bitmap;
259
260 fn into_null_bitmap(self) -> Bitmap;
262
263 fn is_null(&self, idx: usize) -> bool {
265 !self.null_bitmap().is_set(idx)
266 }
267
268 unsafe fn is_null_unchecked(&self, idx: usize) -> bool {
273 unsafe { !self.null_bitmap().is_set_unchecked(idx) }
274 }
275
276 fn set_bitmap(&mut self, bitmap: Bitmap);
277
278 #[inline(always)]
280 fn hash_at<H: Hasher>(&self, idx: usize, state: &mut H) {
281 if let Some(value) = self.value_at(idx) {
284 value.hash_scalar(state);
285 } else {
286 NULL_VAL_FOR_HASH.hash(state);
287 }
288 }
289
290 fn hash_vec<H: Hasher>(&self, hashers: &mut [H], vis: &Bitmap) {
291 assert_eq!(hashers.len(), self.len());
292 for idx in vis.iter_ones() {
293 self.hash_at(idx, &mut hashers[idx]);
294 }
295 }
296
297 fn is_empty(&self) -> bool {
298 self.len() == 0
299 }
300
301 fn create_builder(&self, capacity: usize) -> Self::Builder {
302 Self::Builder::with_type(capacity, self.data_type())
303 }
304
305 fn data_type(&self) -> DataType;
306
307 fn into_ref(self) -> ArrayRef {
309 Arc::new(self.into())
310 }
311}
312
313trait CompactableArray: Array {
315 fn compact(&self, visibility: &Bitmap, cardinality: usize) -> Self;
318}
319
320impl<A: Array> CompactableArray for A {
321 fn compact(&self, visibility: &Bitmap, cardinality: usize) -> Self {
322 let mut builder = A::Builder::with_type(cardinality, self.data_type());
323 for idx in visibility.iter_ones() {
324 unsafe {
326 builder.append(self.value_at_unchecked(idx));
327 }
328 }
329 builder.finish()
330 }
331}
332
333macro_rules! array_impl_enum {
335 ( $( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
336 #[derive(Debug, Clone, EstimateSize)]
338 pub enum ArrayImpl {
339 $( $variant_name($array) ),*
340 }
341 };
342}
343
344for_all_variants! { array_impl_enum }
345
346impl<T: PrimitiveArrayItemType> From<PrimitiveArray<T>> for ArrayImpl {
351 fn from(arr: PrimitiveArray<T>) -> Self {
352 T::erase_array_type(arr)
353 }
354}
355
356impl From<Int256Array> for ArrayImpl {
357 fn from(arr: Int256Array) -> Self {
358 Self::Int256(arr)
359 }
360}
361
362impl From<BoolArray> for ArrayImpl {
363 fn from(arr: BoolArray) -> Self {
364 Self::Bool(arr)
365 }
366}
367
368impl From<Utf8Array> for ArrayImpl {
369 fn from(arr: Utf8Array) -> Self {
370 Self::Utf8(arr)
371 }
372}
373
374impl From<JsonbArray> for ArrayImpl {
375 fn from(arr: JsonbArray) -> Self {
376 Self::Jsonb(arr)
377 }
378}
379
380impl From<StructArray> for ArrayImpl {
381 fn from(arr: StructArray) -> Self {
382 Self::Struct(arr)
383 }
384}
385
386impl From<ListArray> for ArrayImpl {
387 fn from(arr: ListArray) -> Self {
388 Self::List(arr)
389 }
390}
391
392impl From<VectorArray> for ArrayImpl {
393 fn from(arr: VectorArray) -> Self {
394 Self::Vector(arr)
395 }
396}
397
398impl From<BytesArray> for ArrayImpl {
399 fn from(arr: BytesArray) -> Self {
400 Self::Bytea(arr)
401 }
402}
403
404impl From<MapArray> for ArrayImpl {
405 fn from(arr: MapArray) -> Self {
406 Self::Map(arr)
407 }
408}
409
410macro_rules! impl_convert {
417 ($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
418 $(
419 paste! {
420 impl ArrayImpl {
421 pub fn [<as_ $suffix_name>](&self) -> &$array {
425 match self {
426 Self::$variant_name(array) => array,
427 other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
428 }
429 }
430
431 pub fn [<into_ $suffix_name>](self) -> $array {
435 match self {
436 Self::$variant_name(array) => array,
437 other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
438 }
439 }
440 }
441
442 impl <'a> From<&'a ArrayImpl> for &'a $array {
444 fn from(array: &'a ArrayImpl) -> Self {
445 match array {
446 ArrayImpl::$variant_name(inner) => inner,
447 other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
448 }
449 }
450 }
451
452 impl From<ArrayImpl> for $array {
453 fn from(array: ArrayImpl) -> Self {
454 match array {
455 ArrayImpl::$variant_name(inner) => inner,
456 other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
457 }
458 }
459 }
460
461 impl From<$builder> for ArrayBuilderImpl {
462 fn from(builder: $builder) -> Self {
463 Self::$variant_name(builder)
464 }
465 }
466 }
467 )*
468 };
469}
470
471for_all_variants! { impl_convert }
472
473macro_rules! array_builder_impl_enum {
475 ($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
476 #[derive(Debug, Clone, EstimateSize)]
478 pub enum ArrayBuilderImpl {
479 $( $variant_name($builder) ),*
480 }
481 };
482}
483
484for_all_variants! { array_builder_impl_enum }
485
486impl ArrayBuilderImpl {
488 pub fn with_type(capacity: usize, ty: DataType) -> Self {
489 ty.create_array_builder(capacity)
490 }
491
492 pub fn append_array(&mut self, other: &ArrayImpl) {
493 dispatch_array_builder_variants!(self, inner, { inner.append_array(other.into()) })
494 }
495
496 pub fn append_null(&mut self) {
497 dispatch_array_builder_variants!(self, inner, { inner.append(None) })
498 }
499
500 pub fn append_n_null(&mut self, n: usize) {
501 dispatch_array_builder_variants!(self, inner, { inner.append_n(n, None) })
502 }
503
504 pub fn append_n(&mut self, n: usize, datum: impl ToDatumRef) {
507 match datum.to_datum_ref() {
508 None => dispatch_array_builder_variants!(self, inner, { inner.append_n(n, None) }),
509
510 Some(scalar_ref) => {
511 dispatch_array_builder_variants!(self, inner, [I = VARIANT_NAME], {
512 inner.append_n(
513 n,
514 Some(scalar_ref.try_into().unwrap_or_else(|_| {
515 panic!(
516 "type mismatch, array builder type: {}, scalar type: {}",
517 I,
518 scalar_ref.get_ident()
519 )
520 })),
521 )
522 })
523 }
524 }
525 }
526
527 pub fn append(&mut self, datum: impl ToDatumRef) {
529 self.append_n(1, datum);
530 }
531
532 pub fn append_array_element(&mut self, other: &ArrayImpl, idx: usize) {
533 dispatch_array_builder_variants!(self, inner, {
534 inner.append_array_element(other.into(), idx)
535 })
536 }
537
538 pub fn pop(&mut self) -> Option<()> {
539 dispatch_array_builder_variants!(self, inner, { inner.pop() })
540 }
541
542 pub fn finish(self) -> ArrayImpl {
543 dispatch_array_builder_variants!(self, inner, { inner.finish().into() })
544 }
545
546 pub fn get_ident(&self) -> &'static str {
547 dispatch_array_builder_variants!(self, [I = VARIANT_NAME], { I })
548 }
549
550 pub fn len(&self) -> usize {
551 dispatch_array_builder_variants!(self, inner, { inner.len() })
552 }
553
554 pub fn is_empty(&self) -> bool {
555 self.len() == 0
556 }
557}
558
559impl ArrayImpl {
560 pub fn len(&self) -> usize {
562 dispatch_array_variants!(self, inner, { inner.len() })
563 }
564
565 pub fn is_empty(&self) -> bool {
566 self.len() == 0
567 }
568
569 pub fn null_bitmap(&self) -> &Bitmap {
571 dispatch_array_variants!(self, inner, { inner.null_bitmap() })
572 }
573
574 pub fn into_null_bitmap(self) -> Bitmap {
575 dispatch_array_variants!(self, inner, { inner.into_null_bitmap() })
576 }
577
578 pub fn to_protobuf(&self) -> PbArray {
579 dispatch_array_variants!(self, inner, { inner.to_protobuf() })
580 }
581
582 pub fn hash_at<H: Hasher>(&self, idx: usize, state: &mut H) {
583 dispatch_array_variants!(self, inner, { inner.hash_at(idx, state) })
584 }
585
586 pub fn hash_vec<H: Hasher>(&self, hashers: &mut [H], vis: &Bitmap) {
587 dispatch_array_variants!(self, inner, { inner.hash_vec(hashers, vis) })
588 }
589
590 pub fn compact(&self, visibility: &Bitmap, cardinality: usize) -> Self {
592 dispatch_array_variants!(self, inner, {
593 inner.compact(visibility, cardinality).into()
594 })
595 }
596
597 pub fn get_ident(&self) -> &'static str {
598 dispatch_array_variants!(self, [I = VARIANT_NAME], { I })
599 }
600
601 pub fn datum_at(&self, idx: usize) -> Datum {
603 self.value_at(idx).to_owned_datum()
604 }
605
606 pub fn to_datum(&self) -> Datum {
608 assert_eq!(self.len(), 1);
609 self.datum_at(0)
610 }
611
612 pub fn value_at(&self, idx: usize) -> DatumRef<'_> {
614 dispatch_array_variants!(self, inner, {
615 inner.value_at(idx).map(ScalarRefImpl::from)
616 })
617 }
618
619 pub unsafe fn value_at_unchecked(&self, idx: usize) -> DatumRef<'_> {
626 unsafe {
627 dispatch_array_variants!(self, inner, {
628 inner.value_at_unchecked(idx).map(ScalarRefImpl::from)
629 })
630 }
631 }
632
633 pub fn set_bitmap(&mut self, bitmap: Bitmap) {
634 dispatch_array_variants!(self, inner, { inner.set_bitmap(bitmap) })
635 }
636
637 pub fn create_builder(&self, capacity: usize) -> ArrayBuilderImpl {
638 dispatch_array_variants!(self, inner, { inner.create_builder(capacity).into() })
639 }
640
641 pub fn data_type(&self) -> DataType {
643 dispatch_array_variants!(self, inner, { inner.data_type() })
644 }
645
646 pub fn into_ref(self) -> ArrayRef {
647 Arc::new(self)
648 }
649
650 pub fn iter(&self) -> impl DoubleEndedIterator<Item = DatumRef<'_>> + ExactSizeIterator {
651 (0..self.len()).map(|i| self.value_at(i))
652 }
653}
654
655pub type ArrayRef = Arc<ArrayImpl>;
656
657impl PartialEq for ArrayImpl {
658 fn eq(&self, other: &Self) -> bool {
659 self.iter().eq(other.iter())
660 }
661}
662
663impl Eq for ArrayImpl {}
664
665#[cfg(test)]
666mod tests {
667
668 use super::*;
669 use crate::util::iter_util::ZipEqFast;
670
671 fn filter<'a, A, F>(data: &'a A, pred: F) -> ArrayResult<A>
672 where
673 A: Array + 'a,
674 F: Fn(Option<A::RefItem<'a>>) -> bool,
675 {
676 let mut builder = A::Builder::with_type(data.len(), data.data_type());
677 for i in 0..data.len() {
678 if pred(data.value_at(i)) {
679 builder.append(data.value_at(i));
680 }
681 }
682 Ok(builder.finish())
683 }
684
685 #[test]
686 fn test_filter() {
687 let mut builder = PrimitiveArrayBuilder::<i32>::new(0);
688 for i in 0..=60 {
689 builder.append(Some(i));
690 }
691 let array = filter(&builder.finish(), |x| x.unwrap_or(0) >= 60).unwrap();
692 assert_eq!(array.iter().collect::<Vec<Option<i32>>>(), vec![Some(60)]);
693 }
694
695 use num_traits::ops::checked::CheckedAdd;
696
697 fn vec_add<T1, T2, T3>(
698 a: &PrimitiveArray<T1>,
699 b: &PrimitiveArray<T2>,
700 ) -> ArrayResult<PrimitiveArray<T3>>
701 where
702 T1: PrimitiveArrayItemType,
703 T2: PrimitiveArrayItemType,
704 T3: PrimitiveArrayItemType + CheckedAdd + From<T1> + From<T2>,
705 {
706 let mut builder = PrimitiveArrayBuilder::<T3>::new(a.len());
707 for (a, b) in a.iter().zip_eq_fast(b.iter()) {
708 let item = match (a, b) {
709 (Some(a), Some(b)) => Some(T3::from(a) + T3::from(b)),
710 _ => None,
711 };
712 builder.append(item);
713 }
714 Ok(builder.finish())
715 }
716
717 #[test]
718 fn test_vectorized_add() {
719 let mut builder = PrimitiveArrayBuilder::<i32>::new(0);
720 for i in 0..=60 {
721 builder.append(Some(i));
722 }
723 let array1 = builder.finish();
724
725 let mut builder = PrimitiveArrayBuilder::<i16>::new(0);
726 for i in 0..=60 {
727 builder.append(Some(i as i16));
728 }
729 let array2 = builder.finish();
730
731 let final_array = vec_add(&array1, &array2).unwrap() as PrimitiveArray<i64>;
732
733 assert_eq!(final_array.len(), array1.len());
734 for (idx, data) in final_array.iter().enumerate() {
735 assert_eq!(data, Some(idx as i64 * 2));
736 }
737 }
738}
739
740#[cfg(test)]
741mod test_util {
742 use std::hash::{BuildHasher, Hasher};
743
744 use super::Array;
745 use crate::bitmap::Bitmap;
746 use crate::util::iter_util::ZipEqFast;
747
748 pub fn hash_finish<H: Hasher>(hashers: &[H]) -> Vec<u64> {
749 hashers
750 .iter()
751 .map(|hasher| hasher.finish())
752 .collect::<Vec<u64>>()
753 }
754
755 pub fn test_hash<H: BuildHasher, A: Array>(arrs: Vec<A>, expects: Vec<u64>, hasher_builder: H) {
756 let len = expects.len();
757 let mut states_scalar = Vec::with_capacity(len);
758 states_scalar.resize_with(len, || hasher_builder.build_hasher());
759 let mut states_vec = Vec::with_capacity(len);
760 states_vec.resize_with(len, || hasher_builder.build_hasher());
761
762 arrs.iter().for_each(|arr| {
763 for (i, state) in states_scalar.iter_mut().enumerate() {
764 arr.hash_at(i, state)
765 }
766 });
767 let vis = Bitmap::ones(len);
768 arrs.iter()
769 .for_each(|arr| arr.hash_vec(&mut states_vec[..], &vis));
770 itertools::cons_tuples(
771 expects
772 .iter()
773 .zip_eq_fast(hash_finish(&states_scalar[..]))
774 .zip_eq_fast(hash_finish(&states_vec[..])),
775 )
776 .all(|(a, b, c)| *a == b && b == c);
777 }
778}