1pub mod arrow;
18mod bool_array;
19pub mod bytes_array;
20mod chrono_array;
21mod data_chunk;
22pub mod data_chunk_iter;
23mod decimal_array;
24pub mod error;
25pub mod interval_array;
26mod iterator;
27mod jsonb_array;
28pub mod list_array;
29mod map_array;
30mod num256_array;
31mod primitive_array;
32mod proto_reader;
33pub mod stream_chunk;
34pub mod stream_chunk_builder;
35mod stream_chunk_iter;
36pub mod stream_record;
37pub mod struct_array;
38mod utf8_array;
39
40use std::convert::From;
41use std::hash::{Hash, Hasher};
42use std::sync::Arc;
43
44pub use bool_array::{BoolArray, BoolArrayBuilder};
45pub use bytes_array::*;
46pub use chrono_array::{
47 DateArray, DateArrayBuilder, TimeArray, TimeArrayBuilder, TimestampArray,
48 TimestampArrayBuilder, TimestamptzArray, TimestamptzArrayBuilder,
49};
50pub use data_chunk::{DataChunk, DataChunkTestExt};
51pub use data_chunk_iter::RowRef;
52pub use decimal_array::{DecimalArray, DecimalArrayBuilder};
53pub use interval_array::{IntervalArray, IntervalArrayBuilder};
54pub use iterator::ArrayIterator;
55pub use jsonb_array::{JsonbArray, JsonbArrayBuilder};
56pub use list_array::{ListArray, ListArrayBuilder, ListRef, ListValue};
57pub use map_array::{MapArray, MapArrayBuilder, MapRef, MapValue};
58use paste::paste;
59pub use primitive_array::{PrimitiveArray, PrimitiveArrayBuilder, PrimitiveArrayItemType};
60use risingwave_common_estimate_size::EstimateSize;
61use risingwave_pb::data::PbArray;
62pub use stream_chunk::{Op, StreamChunk, StreamChunkTestExt};
63pub use stream_chunk_builder::StreamChunkBuilder;
64pub use struct_array::{StructArray, StructArrayBuilder, StructRef, StructValue};
65pub use utf8_array::*;
66
67pub use self::error::ArrayError;
68pub use crate::array::num256_array::{Int256Array, Int256ArrayBuilder};
69use crate::bitmap::Bitmap;
70use crate::types::*;
71use crate::{dispatch_array_builder_variants, dispatch_array_variants, for_all_variants};
72pub type ArrayResult<T> = Result<T, ArrayError>;
73
74pub type I64Array = PrimitiveArray<i64>;
75pub type I32Array = PrimitiveArray<i32>;
76pub type I16Array = PrimitiveArray<i16>;
77pub type F64Array = PrimitiveArray<F64>;
78pub type F32Array = PrimitiveArray<F32>;
79pub type SerialArray = PrimitiveArray<Serial>;
80
81pub type I64ArrayBuilder = PrimitiveArrayBuilder<i64>;
82pub type I32ArrayBuilder = PrimitiveArrayBuilder<i32>;
83pub type I16ArrayBuilder = PrimitiveArrayBuilder<i16>;
84pub type F64ArrayBuilder = PrimitiveArrayBuilder<F64>;
85pub type F32ArrayBuilder = PrimitiveArrayBuilder<F32>;
86pub type SerialArrayBuilder = PrimitiveArrayBuilder<Serial>;
87
88pub type ArrayImplBuilder = ArrayBuilderImpl;
90
91pub(crate) const NULL_VAL_FOR_HASH: u32 = 0xfffffff0;
93
94pub trait ArrayBuilder: Send + Sync + Sized + 'static {
105 type ArrayType: Array<Builder = Self>;
107
108 fn new(capacity: usize) -> Self;
111
112 fn with_type(capacity: usize, ty: DataType) -> Self;
115
116 fn append_n(&mut self, n: usize, value: Option<<Self::ArrayType as Array>::RefItem<'_>>);
120
121 fn append(&mut self, value: Option<<Self::ArrayType as Array>::RefItem<'_>>) {
123 self.append_n(1, value);
124 }
125
126 fn append_owned(&mut self, value: Option<<Self::ArrayType as Array>::OwnedItem>) {
128 let value = value.as_ref().map(|s| s.as_scalar_ref());
129 self.append(value)
130 }
131
132 fn append_null(&mut self) {
133 self.append(None)
134 }
135
136 fn append_array(&mut self, other: &Self::ArrayType);
138
139 fn pop(&mut self) -> Option<()>;
147
148 fn append_array_element(&mut self, other: &Self::ArrayType, idx: usize) {
150 self.append(other.value_at(idx));
151 }
152
153 fn len(&self) -> usize;
155
156 fn is_empty(&self) -> bool {
158 self.len() == 0
159 }
160
161 fn finish(self) -> Self::ArrayType;
163}
164
165pub trait Array:
181 std::fmt::Debug + Send + Sync + Sized + 'static + Into<ArrayImpl> + EstimateSize
182{
183 type RefItem<'a>: ScalarRef<'a, ScalarType = Self::OwnedItem>
186 where
187 Self: 'a;
188
189 type OwnedItem: Clone
191 + std::fmt::Debug
192 + EstimateSize
193 + for<'a> Scalar<ScalarRefType<'a> = Self::RefItem<'a>>;
194
195 type Builder: ArrayBuilder<ArrayType = Self>;
197
198 unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_>;
207
208 #[inline]
210 fn value_at(&self, idx: usize) -> Option<Self::RefItem<'_>> {
211 if !self.is_null(idx) {
212 Some(unsafe { self.raw_value_at_unchecked(idx) })
214 } else {
215 None
216 }
217 }
218
219 #[inline]
223 unsafe fn value_at_unchecked(&self, idx: usize) -> Option<Self::RefItem<'_>> {
224 unsafe {
225 if !self.is_null_unchecked(idx) {
226 Some(self.raw_value_at_unchecked(idx))
227 } else {
228 None
229 }
230 }
231 }
232
233 fn len(&self) -> usize;
235
236 fn iter(&self) -> ArrayIterator<'_, Self> {
238 ArrayIterator::new(self)
239 }
240
241 fn raw_iter(&self) -> impl ExactSizeIterator<Item = Self::RefItem<'_>> {
246 (0..self.len()).map(|i| unsafe { self.raw_value_at_unchecked(i) })
247 }
248
249 fn to_protobuf(&self) -> PbArray;
251
252 fn null_bitmap(&self) -> &Bitmap;
254
255 fn into_null_bitmap(self) -> Bitmap;
257
258 fn is_null(&self, idx: usize) -> bool {
260 !self.null_bitmap().is_set(idx)
261 }
262
263 unsafe fn is_null_unchecked(&self, idx: usize) -> bool {
268 unsafe { !self.null_bitmap().is_set_unchecked(idx) }
269 }
270
271 fn set_bitmap(&mut self, bitmap: Bitmap);
272
273 #[inline(always)]
275 fn hash_at<H: Hasher>(&self, idx: usize, state: &mut H) {
276 if let Some(value) = self.value_at(idx) {
279 value.hash_scalar(state);
280 } else {
281 NULL_VAL_FOR_HASH.hash(state);
282 }
283 }
284
285 fn hash_vec<H: Hasher>(&self, hashers: &mut [H], vis: &Bitmap) {
286 assert_eq!(hashers.len(), self.len());
287 for idx in vis.iter_ones() {
288 self.hash_at(idx, &mut hashers[idx]);
289 }
290 }
291
292 fn is_empty(&self) -> bool {
293 self.len() == 0
294 }
295
296 fn create_builder(&self, capacity: usize) -> Self::Builder {
297 Self::Builder::with_type(capacity, self.data_type())
298 }
299
300 fn data_type(&self) -> DataType;
301
302 fn into_ref(self) -> ArrayRef {
304 Arc::new(self.into())
305 }
306}
307
308trait CompactableArray: Array {
310 fn compact(&self, visibility: &Bitmap, cardinality: usize) -> Self;
313}
314
315impl<A: Array> CompactableArray for A {
316 fn compact(&self, visibility: &Bitmap, cardinality: usize) -> Self {
317 let mut builder = A::Builder::with_type(cardinality, self.data_type());
318 for idx in visibility.iter_ones() {
319 unsafe {
321 builder.append(self.value_at_unchecked(idx));
322 }
323 }
324 builder.finish()
325 }
326}
327
328macro_rules! array_impl_enum {
330 ( $( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
331 #[derive(Debug, Clone, EstimateSize)]
333 pub enum ArrayImpl {
334 $( $variant_name($array) ),*
335 }
336 };
337}
338
339for_all_variants! { array_impl_enum }
340
341impl<T: PrimitiveArrayItemType> From<PrimitiveArray<T>> for ArrayImpl {
346 fn from(arr: PrimitiveArray<T>) -> Self {
347 T::erase_array_type(arr)
348 }
349}
350
351impl From<Int256Array> for ArrayImpl {
352 fn from(arr: Int256Array) -> Self {
353 Self::Int256(arr)
354 }
355}
356
357impl From<BoolArray> for ArrayImpl {
358 fn from(arr: BoolArray) -> Self {
359 Self::Bool(arr)
360 }
361}
362
363impl From<Utf8Array> for ArrayImpl {
364 fn from(arr: Utf8Array) -> Self {
365 Self::Utf8(arr)
366 }
367}
368
369impl From<JsonbArray> for ArrayImpl {
370 fn from(arr: JsonbArray) -> Self {
371 Self::Jsonb(arr)
372 }
373}
374
375impl From<StructArray> for ArrayImpl {
376 fn from(arr: StructArray) -> Self {
377 Self::Struct(arr)
378 }
379}
380
381impl From<ListArray> for ArrayImpl {
382 fn from(arr: ListArray) -> Self {
383 Self::List(arr)
384 }
385}
386
387impl From<BytesArray> for ArrayImpl {
388 fn from(arr: BytesArray) -> Self {
389 Self::Bytea(arr)
390 }
391}
392
393impl From<MapArray> for ArrayImpl {
394 fn from(arr: MapArray) -> Self {
395 Self::Map(arr)
396 }
397}
398
399macro_rules! impl_convert {
406 ($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
407 $(
408 paste! {
409 impl ArrayImpl {
410 pub fn [<as_ $suffix_name>](&self) -> &$array {
414 match self {
415 Self::$variant_name(array) => array,
416 other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
417 }
418 }
419
420 pub fn [<into_ $suffix_name>](self) -> $array {
424 match self {
425 Self::$variant_name(array) => array,
426 other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
427 }
428 }
429 }
430
431 impl <'a> From<&'a ArrayImpl> for &'a $array {
433 fn from(array: &'a ArrayImpl) -> Self {
434 match array {
435 ArrayImpl::$variant_name(inner) => inner,
436 other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
437 }
438 }
439 }
440
441 impl From<ArrayImpl> for $array {
442 fn from(array: ArrayImpl) -> Self {
443 match array {
444 ArrayImpl::$variant_name(inner) => inner,
445 other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
446 }
447 }
448 }
449
450 impl From<$builder> for ArrayBuilderImpl {
451 fn from(builder: $builder) -> Self {
452 Self::$variant_name(builder)
453 }
454 }
455 }
456 )*
457 };
458}
459
460for_all_variants! { impl_convert }
461
462macro_rules! array_builder_impl_enum {
464 ($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
465 #[derive(Debug, Clone, EstimateSize)]
467 pub enum ArrayBuilderImpl {
468 $( $variant_name($builder) ),*
469 }
470 };
471}
472
473for_all_variants! { array_builder_impl_enum }
474
475impl ArrayBuilderImpl {
477 pub fn with_type(capacity: usize, ty: DataType) -> Self {
478 ty.create_array_builder(capacity)
479 }
480
481 pub fn append_array(&mut self, other: &ArrayImpl) {
482 dispatch_array_builder_variants!(self, inner, { inner.append_array(other.into()) })
483 }
484
485 pub fn append_null(&mut self) {
486 dispatch_array_builder_variants!(self, inner, { inner.append(None) })
487 }
488
489 pub fn append_n_null(&mut self, n: usize) {
490 dispatch_array_builder_variants!(self, inner, { inner.append_n(n, None) })
491 }
492
493 pub fn append_n(&mut self, n: usize, datum: impl ToDatumRef) {
496 match datum.to_datum_ref() {
497 None => dispatch_array_builder_variants!(self, inner, { inner.append_n(n, None) }),
498
499 Some(scalar_ref) => {
500 dispatch_array_builder_variants!(self, inner, [I = VARIANT_NAME], {
501 inner.append_n(
502 n,
503 Some(scalar_ref.try_into().unwrap_or_else(|_| {
504 panic!(
505 "type mismatch, array builder type: {}, scalar type: {}",
506 I,
507 scalar_ref.get_ident()
508 )
509 })),
510 )
511 })
512 }
513 }
514 }
515
516 pub fn append(&mut self, datum: impl ToDatumRef) {
518 self.append_n(1, datum);
519 }
520
521 pub fn append_array_element(&mut self, other: &ArrayImpl, idx: usize) {
522 dispatch_array_builder_variants!(self, inner, {
523 inner.append_array_element(other.into(), idx)
524 })
525 }
526
527 pub fn pop(&mut self) -> Option<()> {
528 dispatch_array_builder_variants!(self, inner, { inner.pop() })
529 }
530
531 pub fn finish(self) -> ArrayImpl {
532 dispatch_array_builder_variants!(self, inner, { inner.finish().into() })
533 }
534
535 pub fn get_ident(&self) -> &'static str {
536 dispatch_array_builder_variants!(self, [I = VARIANT_NAME], { I })
537 }
538
539 pub fn len(&self) -> usize {
540 dispatch_array_builder_variants!(self, inner, { inner.len() })
541 }
542
543 pub fn is_empty(&self) -> bool {
544 self.len() == 0
545 }
546}
547
548impl ArrayImpl {
549 pub fn len(&self) -> usize {
551 dispatch_array_variants!(self, inner, { inner.len() })
552 }
553
554 pub fn is_empty(&self) -> bool {
555 self.len() == 0
556 }
557
558 pub fn null_bitmap(&self) -> &Bitmap {
560 dispatch_array_variants!(self, inner, { inner.null_bitmap() })
561 }
562
563 pub fn into_null_bitmap(self) -> Bitmap {
564 dispatch_array_variants!(self, inner, { inner.into_null_bitmap() })
565 }
566
567 pub fn to_protobuf(&self) -> PbArray {
568 dispatch_array_variants!(self, inner, { inner.to_protobuf() })
569 }
570
571 pub fn hash_at<H: Hasher>(&self, idx: usize, state: &mut H) {
572 dispatch_array_variants!(self, inner, { inner.hash_at(idx, state) })
573 }
574
575 pub fn hash_vec<H: Hasher>(&self, hashers: &mut [H], vis: &Bitmap) {
576 dispatch_array_variants!(self, inner, { inner.hash_vec(hashers, vis) })
577 }
578
579 pub fn compact(&self, visibility: &Bitmap, cardinality: usize) -> Self {
581 dispatch_array_variants!(self, inner, {
582 inner.compact(visibility, cardinality).into()
583 })
584 }
585
586 pub fn get_ident(&self) -> &'static str {
587 dispatch_array_variants!(self, [I = VARIANT_NAME], { I })
588 }
589
590 pub fn datum_at(&self, idx: usize) -> Datum {
592 self.value_at(idx).to_owned_datum()
593 }
594
595 pub fn to_datum(&self) -> Datum {
597 assert_eq!(self.len(), 1);
598 self.datum_at(0)
599 }
600
601 pub fn value_at(&self, idx: usize) -> DatumRef<'_> {
603 dispatch_array_variants!(self, inner, {
604 inner.value_at(idx).map(ScalarRefImpl::from)
605 })
606 }
607
608 pub unsafe fn value_at_unchecked(&self, idx: usize) -> DatumRef<'_> {
615 unsafe {
616 dispatch_array_variants!(self, inner, {
617 inner.value_at_unchecked(idx).map(ScalarRefImpl::from)
618 })
619 }
620 }
621
622 pub fn set_bitmap(&mut self, bitmap: Bitmap) {
623 dispatch_array_variants!(self, inner, { inner.set_bitmap(bitmap) })
624 }
625
626 pub fn create_builder(&self, capacity: usize) -> ArrayBuilderImpl {
627 dispatch_array_variants!(self, inner, { inner.create_builder(capacity).into() })
628 }
629
630 pub fn data_type(&self) -> DataType {
632 dispatch_array_variants!(self, inner, { inner.data_type() })
633 }
634
635 pub fn into_ref(self) -> ArrayRef {
636 Arc::new(self)
637 }
638
639 pub fn iter(&self) -> impl DoubleEndedIterator<Item = DatumRef<'_>> + ExactSizeIterator {
640 (0..self.len()).map(|i| self.value_at(i))
641 }
642}
643
644pub type ArrayRef = Arc<ArrayImpl>;
645
646impl PartialEq for ArrayImpl {
647 fn eq(&self, other: &Self) -> bool {
648 self.iter().eq(other.iter())
649 }
650}
651
652impl Eq for ArrayImpl {}
653
654#[cfg(test)]
655mod tests {
656
657 use super::*;
658 use crate::util::iter_util::ZipEqFast;
659
660 fn filter<'a, A, F>(data: &'a A, pred: F) -> ArrayResult<A>
661 where
662 A: Array + 'a,
663 F: Fn(Option<A::RefItem<'a>>) -> bool,
664 {
665 let mut builder = A::Builder::with_type(data.len(), data.data_type());
666 for i in 0..data.len() {
667 if pred(data.value_at(i)) {
668 builder.append(data.value_at(i));
669 }
670 }
671 Ok(builder.finish())
672 }
673
674 #[test]
675 fn test_filter() {
676 let mut builder = PrimitiveArrayBuilder::<i32>::new(0);
677 for i in 0..=60 {
678 builder.append(Some(i));
679 }
680 let array = filter(&builder.finish(), |x| x.unwrap_or(0) >= 60).unwrap();
681 assert_eq!(array.iter().collect::<Vec<Option<i32>>>(), vec![Some(60)]);
682 }
683
684 use num_traits::ops::checked::CheckedAdd;
685
686 fn vec_add<T1, T2, T3>(
687 a: &PrimitiveArray<T1>,
688 b: &PrimitiveArray<T2>,
689 ) -> ArrayResult<PrimitiveArray<T3>>
690 where
691 T1: PrimitiveArrayItemType,
692 T2: PrimitiveArrayItemType,
693 T3: PrimitiveArrayItemType + CheckedAdd + From<T1> + From<T2>,
694 {
695 let mut builder = PrimitiveArrayBuilder::<T3>::new(a.len());
696 for (a, b) in a.iter().zip_eq_fast(b.iter()) {
697 let item = match (a, b) {
698 (Some(a), Some(b)) => Some(T3::from(a) + T3::from(b)),
699 _ => None,
700 };
701 builder.append(item);
702 }
703 Ok(builder.finish())
704 }
705
706 #[test]
707 fn test_vectorized_add() {
708 let mut builder = PrimitiveArrayBuilder::<i32>::new(0);
709 for i in 0..=60 {
710 builder.append(Some(i));
711 }
712 let array1 = builder.finish();
713
714 let mut builder = PrimitiveArrayBuilder::<i16>::new(0);
715 for i in 0..=60 {
716 builder.append(Some(i as i16));
717 }
718 let array2 = builder.finish();
719
720 let final_array = vec_add(&array1, &array2).unwrap() as PrimitiveArray<i64>;
721
722 assert_eq!(final_array.len(), array1.len());
723 for (idx, data) in final_array.iter().enumerate() {
724 assert_eq!(data, Some(idx as i64 * 2));
725 }
726 }
727}
728
729#[cfg(test)]
730mod test_util {
731 use std::hash::{BuildHasher, Hasher};
732
733 use super::Array;
734 use crate::bitmap::Bitmap;
735 use crate::util::iter_util::ZipEqFast;
736
737 pub fn hash_finish<H: Hasher>(hashers: &[H]) -> Vec<u64> {
738 hashers
739 .iter()
740 .map(|hasher| hasher.finish())
741 .collect::<Vec<u64>>()
742 }
743
744 pub fn test_hash<H: BuildHasher, A: Array>(arrs: Vec<A>, expects: Vec<u64>, hasher_builder: H) {
745 let len = expects.len();
746 let mut states_scalar = Vec::with_capacity(len);
747 states_scalar.resize_with(len, || hasher_builder.build_hasher());
748 let mut states_vec = Vec::with_capacity(len);
749 states_vec.resize_with(len, || hasher_builder.build_hasher());
750
751 arrs.iter().for_each(|arr| {
752 for (i, state) in states_scalar.iter_mut().enumerate() {
753 arr.hash_at(i, state)
754 }
755 });
756 let vis = Bitmap::ones(len);
757 arrs.iter()
758 .for_each(|arr| arr.hash_vec(&mut states_vec[..], &vis));
759 itertools::cons_tuples(
760 expects
761 .iter()
762 .zip_eq_fast(hash_finish(&states_scalar[..]))
763 .zip_eq_fast(hash_finish(&states_vec[..])),
764 )
765 .all(|(a, b, c)| *a == b && b == c);
766 }
767}