1pub mod arrow;
18mod bool_array;
19pub mod bytes_array;
20mod chrono_array;
21mod data_chunk;
22pub mod data_chunk_iter;
23mod decimal_array;
24pub mod error;
25pub mod interval_array;
26mod iterator;
27mod jsonb_array;
28pub mod list_array;
29mod map_array;
30mod num256_array;
31mod primitive_array;
32mod proto_reader;
33pub mod stream_chunk;
34pub mod stream_chunk_builder;
35mod stream_chunk_iter;
36pub mod stream_record;
37pub mod struct_array;
38mod utf8_array;
39mod vector_array;
40
41use std::convert::From;
42use std::hash::{Hash, Hasher};
43use std::sync::Arc;
44
45pub use bool_array::{BoolArray, BoolArrayBuilder};
46pub use bytes_array::*;
47pub use chrono_array::{
48 DateArray, DateArrayBuilder, TimeArray, TimeArrayBuilder, TimestampArray,
49 TimestampArrayBuilder, TimestamptzArray, TimestamptzArrayBuilder,
50};
51pub use data_chunk::{DataChunk, DataChunkTestExt};
52pub use data_chunk_iter::RowRef;
53pub use decimal_array::{DecimalArray, DecimalArrayBuilder};
54pub use interval_array::{IntervalArray, IntervalArrayBuilder};
55pub use iterator::ArrayIterator;
56pub use jsonb_array::{JsonbArray, JsonbArrayBuilder};
57pub use list_array::{ListArray, ListArrayBuilder, ListRef, ListValue};
58pub use map_array::{MapArray, MapArrayBuilder, MapRef, MapValue};
59use paste::paste;
60pub use primitive_array::{PrimitiveArray, PrimitiveArrayBuilder, PrimitiveArrayItemType};
61use risingwave_common_estimate_size::EstimateSize;
62use risingwave_pb::data::PbArray;
63pub use stream_chunk::{Op, StreamChunk, StreamChunkTestExt};
64pub use stream_chunk_builder::StreamChunkBuilder;
65pub use struct_array::{StructArray, StructArrayBuilder, StructRef, StructValue};
66pub use utf8_array::*;
67pub use vector_array::{VectorArray, VectorArrayBuilder, VectorRef, VectorVal};
68
69pub use self::error::ArrayError;
70pub use crate::array::num256_array::{Int256Array, Int256ArrayBuilder};
71use crate::bitmap::Bitmap;
72use crate::types::*;
73use crate::{dispatch_array_builder_variants, dispatch_array_variants, for_all_variants};
74pub type ArrayResult<T> = Result<T, ArrayError>;
75
76pub type I64Array = PrimitiveArray<i64>;
77pub type I32Array = PrimitiveArray<i32>;
78pub type I16Array = PrimitiveArray<i16>;
79pub type F64Array = PrimitiveArray<F64>;
80pub type F32Array = PrimitiveArray<F32>;
81pub type SerialArray = PrimitiveArray<Serial>;
82
83pub type I64ArrayBuilder = PrimitiveArrayBuilder<i64>;
84pub type I32ArrayBuilder = PrimitiveArrayBuilder<i32>;
85pub type I16ArrayBuilder = PrimitiveArrayBuilder<i16>;
86pub type F64ArrayBuilder = PrimitiveArrayBuilder<F64>;
87pub type F32ArrayBuilder = PrimitiveArrayBuilder<F32>;
88pub type SerialArrayBuilder = PrimitiveArrayBuilder<Serial>;
89
90pub type ArrayImplBuilder = ArrayBuilderImpl;
92
93pub(crate) const NULL_VAL_FOR_HASH: u32 = 0xfffffff0;
95
96pub trait ArrayBuilder: Send + Sync + Sized + 'static {
107 type ArrayType: Array<Builder = Self>;
109
110 fn new(capacity: usize) -> Self;
113
114 fn with_type(capacity: usize, ty: DataType) -> Self;
117
118 fn append_n(&mut self, n: usize, value: Option<<Self::ArrayType as Array>::RefItem<'_>>);
122
123 fn append(&mut self, value: Option<<Self::ArrayType as Array>::RefItem<'_>>) {
125 self.append_n(1, value);
126 }
127
128 fn append_owned(&mut self, value: Option<<Self::ArrayType as Array>::OwnedItem>) {
130 let value = value.as_ref().map(|s| s.as_scalar_ref());
131 self.append(value)
132 }
133
134 fn append_null(&mut self) {
135 self.append(None)
136 }
137
138 fn append_array(&mut self, other: &Self::ArrayType);
140
141 fn pop(&mut self) -> Option<()>;
149
150 fn append_array_element(&mut self, other: &Self::ArrayType, idx: usize) {
152 self.append(other.value_at(idx));
153 }
154
155 fn len(&self) -> usize;
157
158 fn is_empty(&self) -> bool {
160 self.len() == 0
161 }
162
163 fn finish(self) -> Self::ArrayType;
165}
166
167pub trait Array:
183 std::fmt::Debug + Send + Sync + Sized + 'static + Into<ArrayImpl> + EstimateSize
184{
185 type RefItem<'a>: ScalarRef<'a, ScalarType = Self::OwnedItem>
188 where
189 Self: 'a;
190
191 type OwnedItem: Clone
193 + std::fmt::Debug
194 + EstimateSize
195 + for<'a> Scalar<ScalarRefType<'a> = Self::RefItem<'a>>;
196
197 type Builder: ArrayBuilder<ArrayType = Self>;
199
200 unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_>;
209
210 #[inline]
212 fn value_at(&self, idx: usize) -> Option<Self::RefItem<'_>> {
213 if !self.is_null(idx) {
214 Some(unsafe { self.raw_value_at_unchecked(idx) })
216 } else {
217 None
218 }
219 }
220
221 #[inline]
225 unsafe fn value_at_unchecked(&self, idx: usize) -> Option<Self::RefItem<'_>> {
226 unsafe {
227 if !self.is_null_unchecked(idx) {
228 Some(self.raw_value_at_unchecked(idx))
229 } else {
230 None
231 }
232 }
233 }
234
235 fn len(&self) -> usize;
237
238 fn iter(&self) -> ArrayIterator<'_, Self> {
240 ArrayIterator::new(self)
241 }
242
243 fn raw_iter(&self) -> impl ExactSizeIterator<Item = Self::RefItem<'_>> {
248 (0..self.len()).map(|i| unsafe { self.raw_value_at_unchecked(i) })
249 }
250
251 fn to_protobuf(&self) -> PbArray;
253
254 fn null_bitmap(&self) -> &Bitmap;
256
257 fn into_null_bitmap(self) -> Bitmap;
259
260 fn is_null(&self, idx: usize) -> bool {
262 !self.null_bitmap().is_set(idx)
263 }
264
265 unsafe fn is_null_unchecked(&self, idx: usize) -> bool {
270 unsafe { !self.null_bitmap().is_set_unchecked(idx) }
271 }
272
273 fn set_bitmap(&mut self, bitmap: Bitmap);
274
275 #[inline(always)]
277 fn hash_at<H: Hasher>(&self, idx: usize, state: &mut H) {
278 if let Some(value) = self.value_at(idx) {
281 value.hash_scalar(state);
282 } else {
283 NULL_VAL_FOR_HASH.hash(state);
284 }
285 }
286
287 fn hash_vec<H: Hasher>(&self, hashers: &mut [H], vis: &Bitmap) {
288 assert_eq!(hashers.len(), self.len());
289 for idx in vis.iter_ones() {
290 self.hash_at(idx, &mut hashers[idx]);
291 }
292 }
293
294 fn is_empty(&self) -> bool {
295 self.len() == 0
296 }
297
298 fn create_builder(&self, capacity: usize) -> Self::Builder {
299 Self::Builder::with_type(capacity, self.data_type())
300 }
301
302 fn data_type(&self) -> DataType;
303
304 fn into_ref(self) -> ArrayRef {
306 Arc::new(self.into())
307 }
308}
309
310trait CompactableArray: Array {
312 fn compact(&self, visibility: &Bitmap, cardinality: usize) -> Self;
315}
316
317impl<A: Array> CompactableArray for A {
318 fn compact(&self, visibility: &Bitmap, cardinality: usize) -> Self {
319 let mut builder = A::Builder::with_type(cardinality, self.data_type());
320 for idx in visibility.iter_ones() {
321 unsafe {
323 builder.append(self.value_at_unchecked(idx));
324 }
325 }
326 builder.finish()
327 }
328}
329
330macro_rules! array_impl_enum {
332 ( $( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
333 #[derive(Debug, Clone, EstimateSize)]
335 pub enum ArrayImpl {
336 $( $variant_name($array) ),*
337 }
338 };
339}
340
341for_all_variants! { array_impl_enum }
342
343impl<T: PrimitiveArrayItemType> From<PrimitiveArray<T>> for ArrayImpl {
348 fn from(arr: PrimitiveArray<T>) -> Self {
349 T::erase_array_type(arr)
350 }
351}
352
353impl From<Int256Array> for ArrayImpl {
354 fn from(arr: Int256Array) -> Self {
355 Self::Int256(arr)
356 }
357}
358
359impl From<BoolArray> for ArrayImpl {
360 fn from(arr: BoolArray) -> Self {
361 Self::Bool(arr)
362 }
363}
364
365impl From<Utf8Array> for ArrayImpl {
366 fn from(arr: Utf8Array) -> Self {
367 Self::Utf8(arr)
368 }
369}
370
371impl From<JsonbArray> for ArrayImpl {
372 fn from(arr: JsonbArray) -> Self {
373 Self::Jsonb(arr)
374 }
375}
376
377impl From<StructArray> for ArrayImpl {
378 fn from(arr: StructArray) -> Self {
379 Self::Struct(arr)
380 }
381}
382
383impl From<ListArray> for ArrayImpl {
384 fn from(arr: ListArray) -> Self {
385 Self::List(arr)
386 }
387}
388
389impl From<VectorArray> for ArrayImpl {
390 fn from(arr: VectorArray) -> Self {
391 Self::Vector(arr)
392 }
393}
394
395impl From<BytesArray> for ArrayImpl {
396 fn from(arr: BytesArray) -> Self {
397 Self::Bytea(arr)
398 }
399}
400
401impl From<MapArray> for ArrayImpl {
402 fn from(arr: MapArray) -> Self {
403 Self::Map(arr)
404 }
405}
406
407macro_rules! impl_convert {
414 ($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
415 $(
416 paste! {
417 impl ArrayImpl {
418 pub fn [<as_ $suffix_name>](&self) -> &$array {
422 match self {
423 Self::$variant_name(array) => array,
424 other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
425 }
426 }
427
428 pub fn [<into_ $suffix_name>](self) -> $array {
432 match self {
433 Self::$variant_name(array) => array,
434 other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
435 }
436 }
437 }
438
439 impl <'a> From<&'a ArrayImpl> for &'a $array {
441 fn from(array: &'a ArrayImpl) -> Self {
442 match array {
443 ArrayImpl::$variant_name(inner) => inner,
444 other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
445 }
446 }
447 }
448
449 impl From<ArrayImpl> for $array {
450 fn from(array: ArrayImpl) -> Self {
451 match array {
452 ArrayImpl::$variant_name(inner) => inner,
453 other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
454 }
455 }
456 }
457
458 impl From<$builder> for ArrayBuilderImpl {
459 fn from(builder: $builder) -> Self {
460 Self::$variant_name(builder)
461 }
462 }
463 }
464 )*
465 };
466}
467
468for_all_variants! { impl_convert }
469
470macro_rules! array_builder_impl_enum {
472 ($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
473 #[derive(Debug, Clone, EstimateSize)]
475 pub enum ArrayBuilderImpl {
476 $( $variant_name($builder) ),*
477 }
478 };
479}
480
481for_all_variants! { array_builder_impl_enum }
482
483impl ArrayBuilderImpl {
485 pub fn with_type(capacity: usize, ty: DataType) -> Self {
486 ty.create_array_builder(capacity)
487 }
488
489 pub fn append_array(&mut self, other: &ArrayImpl) {
490 dispatch_array_builder_variants!(self, inner, { inner.append_array(other.into()) })
491 }
492
493 pub fn append_null(&mut self) {
494 dispatch_array_builder_variants!(self, inner, { inner.append(None) })
495 }
496
497 pub fn append_n_null(&mut self, n: usize) {
498 dispatch_array_builder_variants!(self, inner, { inner.append_n(n, None) })
499 }
500
501 pub fn append_n(&mut self, n: usize, datum: impl ToDatumRef) {
504 match datum.to_datum_ref() {
505 None => dispatch_array_builder_variants!(self, inner, { inner.append_n(n, None) }),
506
507 Some(scalar_ref) => {
508 dispatch_array_builder_variants!(self, inner, [I = VARIANT_NAME], {
509 inner.append_n(
510 n,
511 Some(scalar_ref.try_into().unwrap_or_else(|_| {
512 panic!(
513 "type mismatch, array builder type: {}, scalar type: {}",
514 I,
515 scalar_ref.get_ident()
516 )
517 })),
518 )
519 })
520 }
521 }
522 }
523
524 pub fn append(&mut self, datum: impl ToDatumRef) {
526 self.append_n(1, datum);
527 }
528
529 pub fn append_array_element(&mut self, other: &ArrayImpl, idx: usize) {
530 dispatch_array_builder_variants!(self, inner, {
531 inner.append_array_element(other.into(), idx)
532 })
533 }
534
535 pub fn pop(&mut self) -> Option<()> {
536 dispatch_array_builder_variants!(self, inner, { inner.pop() })
537 }
538
539 pub fn finish(self) -> ArrayImpl {
540 dispatch_array_builder_variants!(self, inner, { inner.finish().into() })
541 }
542
543 pub fn get_ident(&self) -> &'static str {
544 dispatch_array_builder_variants!(self, [I = VARIANT_NAME], { I })
545 }
546
547 pub fn len(&self) -> usize {
548 dispatch_array_builder_variants!(self, inner, { inner.len() })
549 }
550
551 pub fn is_empty(&self) -> bool {
552 self.len() == 0
553 }
554}
555
556impl ArrayImpl {
557 pub fn len(&self) -> usize {
559 dispatch_array_variants!(self, inner, { inner.len() })
560 }
561
562 pub fn is_empty(&self) -> bool {
563 self.len() == 0
564 }
565
566 pub fn null_bitmap(&self) -> &Bitmap {
568 dispatch_array_variants!(self, inner, { inner.null_bitmap() })
569 }
570
571 pub fn into_null_bitmap(self) -> Bitmap {
572 dispatch_array_variants!(self, inner, { inner.into_null_bitmap() })
573 }
574
575 pub fn to_protobuf(&self) -> PbArray {
576 dispatch_array_variants!(self, inner, { inner.to_protobuf() })
577 }
578
579 pub fn hash_at<H: Hasher>(&self, idx: usize, state: &mut H) {
580 dispatch_array_variants!(self, inner, { inner.hash_at(idx, state) })
581 }
582
583 pub fn hash_vec<H: Hasher>(&self, hashers: &mut [H], vis: &Bitmap) {
584 dispatch_array_variants!(self, inner, { inner.hash_vec(hashers, vis) })
585 }
586
587 pub fn compact(&self, visibility: &Bitmap, cardinality: usize) -> Self {
589 dispatch_array_variants!(self, inner, {
590 inner.compact(visibility, cardinality).into()
591 })
592 }
593
594 pub fn get_ident(&self) -> &'static str {
595 dispatch_array_variants!(self, [I = VARIANT_NAME], { I })
596 }
597
598 pub fn datum_at(&self, idx: usize) -> Datum {
600 self.value_at(idx).to_owned_datum()
601 }
602
603 pub fn to_datum(&self) -> Datum {
605 assert_eq!(self.len(), 1);
606 self.datum_at(0)
607 }
608
609 pub fn value_at(&self, idx: usize) -> DatumRef<'_> {
611 dispatch_array_variants!(self, inner, {
612 inner.value_at(idx).map(ScalarRefImpl::from)
613 })
614 }
615
616 pub unsafe fn value_at_unchecked(&self, idx: usize) -> DatumRef<'_> {
623 unsafe {
624 dispatch_array_variants!(self, inner, {
625 inner.value_at_unchecked(idx).map(ScalarRefImpl::from)
626 })
627 }
628 }
629
630 pub fn set_bitmap(&mut self, bitmap: Bitmap) {
631 dispatch_array_variants!(self, inner, { inner.set_bitmap(bitmap) })
632 }
633
634 pub fn create_builder(&self, capacity: usize) -> ArrayBuilderImpl {
635 dispatch_array_variants!(self, inner, { inner.create_builder(capacity).into() })
636 }
637
638 pub fn data_type(&self) -> DataType {
640 dispatch_array_variants!(self, inner, { inner.data_type() })
641 }
642
643 pub fn into_ref(self) -> ArrayRef {
644 Arc::new(self)
645 }
646
647 pub fn iter(&self) -> impl DoubleEndedIterator<Item = DatumRef<'_>> + ExactSizeIterator {
648 (0..self.len()).map(|i| self.value_at(i))
649 }
650}
651
652pub type ArrayRef = Arc<ArrayImpl>;
653
654impl PartialEq for ArrayImpl {
655 fn eq(&self, other: &Self) -> bool {
656 self.iter().eq(other.iter())
657 }
658}
659
660impl Eq for ArrayImpl {}
661
662#[cfg(test)]
663mod tests {
664
665 use super::*;
666 use crate::util::iter_util::ZipEqFast;
667
668 fn filter<'a, A, F>(data: &'a A, pred: F) -> ArrayResult<A>
669 where
670 A: Array + 'a,
671 F: Fn(Option<A::RefItem<'a>>) -> bool,
672 {
673 let mut builder = A::Builder::with_type(data.len(), data.data_type());
674 for i in 0..data.len() {
675 if pred(data.value_at(i)) {
676 builder.append(data.value_at(i));
677 }
678 }
679 Ok(builder.finish())
680 }
681
682 #[test]
683 fn test_filter() {
684 let mut builder = PrimitiveArrayBuilder::<i32>::new(0);
685 for i in 0..=60 {
686 builder.append(Some(i));
687 }
688 let array = filter(&builder.finish(), |x| x.unwrap_or(0) >= 60).unwrap();
689 assert_eq!(array.iter().collect::<Vec<Option<i32>>>(), vec![Some(60)]);
690 }
691
692 use num_traits::ops::checked::CheckedAdd;
693
694 fn vec_add<T1, T2, T3>(
695 a: &PrimitiveArray<T1>,
696 b: &PrimitiveArray<T2>,
697 ) -> ArrayResult<PrimitiveArray<T3>>
698 where
699 T1: PrimitiveArrayItemType,
700 T2: PrimitiveArrayItemType,
701 T3: PrimitiveArrayItemType + CheckedAdd + From<T1> + From<T2>,
702 {
703 let mut builder = PrimitiveArrayBuilder::<T3>::new(a.len());
704 for (a, b) in a.iter().zip_eq_fast(b.iter()) {
705 let item = match (a, b) {
706 (Some(a), Some(b)) => Some(T3::from(a) + T3::from(b)),
707 _ => None,
708 };
709 builder.append(item);
710 }
711 Ok(builder.finish())
712 }
713
714 #[test]
715 fn test_vectorized_add() {
716 let mut builder = PrimitiveArrayBuilder::<i32>::new(0);
717 for i in 0..=60 {
718 builder.append(Some(i));
719 }
720 let array1 = builder.finish();
721
722 let mut builder = PrimitiveArrayBuilder::<i16>::new(0);
723 for i in 0..=60 {
724 builder.append(Some(i as i16));
725 }
726 let array2 = builder.finish();
727
728 let final_array = vec_add(&array1, &array2).unwrap() as PrimitiveArray<i64>;
729
730 assert_eq!(final_array.len(), array1.len());
731 for (idx, data) in final_array.iter().enumerate() {
732 assert_eq!(data, Some(idx as i64 * 2));
733 }
734 }
735}
736
737#[cfg(test)]
738mod test_util {
739 use std::hash::{BuildHasher, Hasher};
740
741 use super::Array;
742 use crate::bitmap::Bitmap;
743 use crate::util::iter_util::ZipEqFast;
744
745 pub fn hash_finish<H: Hasher>(hashers: &[H]) -> Vec<u64> {
746 hashers
747 .iter()
748 .map(|hasher| hasher.finish())
749 .collect::<Vec<u64>>()
750 }
751
752 pub fn test_hash<H: BuildHasher, A: Array>(arrs: Vec<A>, expects: Vec<u64>, hasher_builder: H) {
753 let len = expects.len();
754 let mut states_scalar = Vec::with_capacity(len);
755 states_scalar.resize_with(len, || hasher_builder.build_hasher());
756 let mut states_vec = Vec::with_capacity(len);
757 states_vec.resize_with(len, || hasher_builder.build_hasher());
758
759 arrs.iter().for_each(|arr| {
760 for (i, state) in states_scalar.iter_mut().enumerate() {
761 arr.hash_at(i, state)
762 }
763 });
764 let vis = Bitmap::ones(len);
765 arrs.iter()
766 .for_each(|arr| arr.hash_vec(&mut states_vec[..], &vis));
767 itertools::cons_tuples(
768 expects
769 .iter()
770 .zip_eq_fast(hash_finish(&states_scalar[..]))
771 .zip_eq_fast(hash_finish(&states_vec[..])),
772 )
773 .all(|(a, b, c)| *a == b && b == c);
774 }
775}