pub mod arrow;
mod bool_array;
pub mod bytes_array;
mod chrono_array;
mod data_chunk;
pub mod data_chunk_iter;
mod decimal_array;
pub mod error;
pub mod interval_array;
mod iterator;
mod jsonb_array;
pub mod list_array;
mod map_array;
mod num256_array;
mod primitive_array;
mod proto_reader;
pub mod stream_chunk;
pub mod stream_chunk_builder;
mod stream_chunk_iter;
pub mod stream_record;
pub mod struct_array;
mod utf8_array;
use std::convert::From;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
pub use bool_array::{BoolArray, BoolArrayBuilder};
pub use bytes_array::*;
pub use chrono_array::{
DateArray, DateArrayBuilder, TimeArray, TimeArrayBuilder, TimestampArray,
TimestampArrayBuilder, TimestamptzArray, TimestamptzArrayBuilder,
};
pub use data_chunk::{DataChunk, DataChunkTestExt};
pub use data_chunk_iter::RowRef;
pub use decimal_array::{DecimalArray, DecimalArrayBuilder};
pub use interval_array::{IntervalArray, IntervalArrayBuilder};
pub use iterator::ArrayIterator;
pub use jsonb_array::{JsonbArray, JsonbArrayBuilder};
pub use list_array::{ListArray, ListArrayBuilder, ListRef, ListValue};
pub use map_array::{MapArray, MapArrayBuilder, MapRef, MapValue};
use paste::paste;
pub use primitive_array::{PrimitiveArray, PrimitiveArrayBuilder, PrimitiveArrayItemType};
use risingwave_common_estimate_size::EstimateSize;
use risingwave_pb::data::PbArray;
pub use stream_chunk::{Op, StreamChunk, StreamChunkTestExt};
pub use stream_chunk_builder::StreamChunkBuilder;
pub use struct_array::{StructArray, StructArrayBuilder, StructRef, StructValue};
pub use utf8_array::*;
pub use self::error::ArrayError;
pub use crate::array::num256_array::{Int256Array, Int256ArrayBuilder};
use crate::bitmap::Bitmap;
use crate::types::*;
use crate::{dispatch_array_builder_variants, dispatch_array_variants, for_all_variants};
pub type ArrayResult<T> = Result<T, ArrayError>;
pub type I64Array = PrimitiveArray<i64>;
pub type I32Array = PrimitiveArray<i32>;
pub type I16Array = PrimitiveArray<i16>;
pub type F64Array = PrimitiveArray<F64>;
pub type F32Array = PrimitiveArray<F32>;
pub type SerialArray = PrimitiveArray<Serial>;
pub type I64ArrayBuilder = PrimitiveArrayBuilder<i64>;
pub type I32ArrayBuilder = PrimitiveArrayBuilder<i32>;
pub type I16ArrayBuilder = PrimitiveArrayBuilder<i16>;
pub type F64ArrayBuilder = PrimitiveArrayBuilder<F64>;
pub type F32ArrayBuilder = PrimitiveArrayBuilder<F32>;
pub type SerialArrayBuilder = PrimitiveArrayBuilder<Serial>;
pub type ArrayImplBuilder = ArrayBuilderImpl;
pub(crate) const NULL_VAL_FOR_HASH: u32 = 0xfffffff0;
pub trait ArrayBuilder: Send + Sync + Sized + 'static {
type ArrayType: Array<Builder = Self>;
fn new(capacity: usize) -> Self;
fn with_type(capacity: usize, ty: DataType) -> Self;
fn append_n(&mut self, n: usize, value: Option<<Self::ArrayType as Array>::RefItem<'_>>);
fn append(&mut self, value: Option<<Self::ArrayType as Array>::RefItem<'_>>) {
self.append_n(1, value);
}
fn append_owned(&mut self, value: Option<<Self::ArrayType as Array>::OwnedItem>) {
let value = value.as_ref().map(|s| s.as_scalar_ref());
self.append(value)
}
fn append_null(&mut self) {
self.append(None)
}
fn append_array(&mut self, other: &Self::ArrayType);
fn pop(&mut self) -> Option<()>;
fn append_array_element(&mut self, other: &Self::ArrayType, idx: usize) {
self.append(other.value_at(idx));
}
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn finish(self) -> Self::ArrayType;
}
pub trait Array:
std::fmt::Debug + Send + Sync + Sized + 'static + Into<ArrayImpl> + EstimateSize
{
type RefItem<'a>: ScalarRef<'a, ScalarType = Self::OwnedItem>
where
Self: 'a;
type OwnedItem: Clone
+ std::fmt::Debug
+ EstimateSize
+ for<'a> Scalar<ScalarRefType<'a> = Self::RefItem<'a>>;
type Builder: ArrayBuilder<ArrayType = Self>;
unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_>;
#[inline]
fn value_at(&self, idx: usize) -> Option<Self::RefItem<'_>> {
if !self.is_null(idx) {
Some(unsafe { self.raw_value_at_unchecked(idx) })
} else {
None
}
}
#[inline]
unsafe fn value_at_unchecked(&self, idx: usize) -> Option<Self::RefItem<'_>> {
if !self.is_null_unchecked(idx) {
Some(self.raw_value_at_unchecked(idx))
} else {
None
}
}
fn len(&self) -> usize;
fn iter(&self) -> ArrayIterator<'_, Self> {
ArrayIterator::new(self)
}
fn raw_iter(&self) -> impl ExactSizeIterator<Item = Self::RefItem<'_>> {
(0..self.len()).map(|i| unsafe { self.raw_value_at_unchecked(i) })
}
fn to_protobuf(&self) -> PbArray;
fn null_bitmap(&self) -> &Bitmap;
fn into_null_bitmap(self) -> Bitmap;
fn is_null(&self, idx: usize) -> bool {
!self.null_bitmap().is_set(idx)
}
unsafe fn is_null_unchecked(&self, idx: usize) -> bool {
!self.null_bitmap().is_set_unchecked(idx)
}
fn set_bitmap(&mut self, bitmap: Bitmap);
#[inline(always)]
fn hash_at<H: Hasher>(&self, idx: usize, state: &mut H) {
if let Some(value) = self.value_at(idx) {
value.hash_scalar(state);
} else {
NULL_VAL_FOR_HASH.hash(state);
}
}
fn hash_vec<H: Hasher>(&self, hashers: &mut [H], vis: &Bitmap) {
assert_eq!(hashers.len(), self.len());
for idx in vis.iter_ones() {
self.hash_at(idx, &mut hashers[idx]);
}
}
fn is_empty(&self) -> bool {
self.len() == 0
}
fn create_builder(&self, capacity: usize) -> Self::Builder {
Self::Builder::with_type(capacity, self.data_type())
}
fn data_type(&self) -> DataType;
fn into_ref(self) -> ArrayRef {
Arc::new(self.into())
}
}
trait CompactableArray: Array {
fn compact(&self, visibility: &Bitmap, cardinality: usize) -> Self;
}
impl<A: Array> CompactableArray for A {
fn compact(&self, visibility: &Bitmap, cardinality: usize) -> Self {
let mut builder = A::Builder::with_type(cardinality, self.data_type());
for idx in visibility.iter_ones() {
unsafe {
builder.append(self.value_at_unchecked(idx));
}
}
builder.finish()
}
}
macro_rules! array_impl_enum {
( $( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
#[derive(Debug, Clone, EstimateSize)]
pub enum ArrayImpl {
$( $variant_name($array) ),*
}
};
}
for_all_variants! { array_impl_enum }
impl<T: PrimitiveArrayItemType> From<PrimitiveArray<T>> for ArrayImpl {
fn from(arr: PrimitiveArray<T>) -> Self {
T::erase_array_type(arr)
}
}
impl From<Int256Array> for ArrayImpl {
fn from(arr: Int256Array) -> Self {
Self::Int256(arr)
}
}
impl From<BoolArray> for ArrayImpl {
fn from(arr: BoolArray) -> Self {
Self::Bool(arr)
}
}
impl From<Utf8Array> for ArrayImpl {
fn from(arr: Utf8Array) -> Self {
Self::Utf8(arr)
}
}
impl From<JsonbArray> for ArrayImpl {
fn from(arr: JsonbArray) -> Self {
Self::Jsonb(arr)
}
}
impl From<StructArray> for ArrayImpl {
fn from(arr: StructArray) -> Self {
Self::Struct(arr)
}
}
impl From<ListArray> for ArrayImpl {
fn from(arr: ListArray) -> Self {
Self::List(arr)
}
}
impl From<BytesArray> for ArrayImpl {
fn from(arr: BytesArray) -> Self {
Self::Bytea(arr)
}
}
impl From<MapArray> for ArrayImpl {
fn from(arr: MapArray) -> Self {
Self::Map(arr)
}
}
macro_rules! impl_convert {
($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
$(
paste! {
impl ArrayImpl {
pub fn [<as_ $suffix_name>](&self) -> &$array {
match self {
Self::$variant_name(ref array) => array,
other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
}
}
pub fn [<into_ $suffix_name>](self) -> $array {
match self {
Self::$variant_name(array) => array,
other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
}
}
}
impl <'a> From<&'a ArrayImpl> for &'a $array {
fn from(array: &'a ArrayImpl) -> Self {
match array {
ArrayImpl::$variant_name(inner) => inner,
other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
}
}
}
impl From<ArrayImpl> for $array {
fn from(array: ArrayImpl) -> Self {
match array {
ArrayImpl::$variant_name(inner) => inner,
other_array => panic!("cannot convert ArrayImpl::{} to concrete type {}", other_array.get_ident(), stringify!($variant_name))
}
}
}
impl From<$builder> for ArrayBuilderImpl {
fn from(builder: $builder) -> Self {
Self::$variant_name(builder)
}
}
}
)*
};
}
for_all_variants! { impl_convert }
macro_rules! array_builder_impl_enum {
($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
#[derive(Debug, Clone, EstimateSize)]
pub enum ArrayBuilderImpl {
$( $variant_name($builder) ),*
}
};
}
for_all_variants! { array_builder_impl_enum }
impl ArrayBuilderImpl {
pub fn with_type(capacity: usize, ty: DataType) -> Self {
ty.create_array_builder(capacity)
}
pub fn append_array(&mut self, other: &ArrayImpl) {
dispatch_array_builder_variants!(self, inner, { inner.append_array(other.into()) })
}
pub fn append_null(&mut self) {
dispatch_array_builder_variants!(self, inner, { inner.append(None) })
}
pub fn append_n_null(&mut self, n: usize) {
dispatch_array_builder_variants!(self, inner, { inner.append_n(n, None) })
}
pub fn append_n(&mut self, n: usize, datum: impl ToDatumRef) {
match datum.to_datum_ref() {
None => dispatch_array_builder_variants!(self, inner, { inner.append_n(n, None) }),
Some(scalar_ref) => {
dispatch_array_builder_variants!(self, inner, [I = VARIANT_NAME], {
inner.append_n(
n,
Some(scalar_ref.try_into().unwrap_or_else(|_| {
panic!(
"type mismatch, array builder type: {}, scalar type: {}",
I,
scalar_ref.get_ident()
)
})),
)
})
}
}
}
pub fn append(&mut self, datum: impl ToDatumRef) {
self.append_n(1, datum);
}
pub fn append_array_element(&mut self, other: &ArrayImpl, idx: usize) {
dispatch_array_builder_variants!(self, inner, {
inner.append_array_element(other.into(), idx)
})
}
pub fn pop(&mut self) -> Option<()> {
dispatch_array_builder_variants!(self, inner, { inner.pop() })
}
pub fn finish(self) -> ArrayImpl {
dispatch_array_builder_variants!(self, inner, { inner.finish().into() })
}
pub fn get_ident(&self) -> &'static str {
dispatch_array_builder_variants!(self, [I = VARIANT_NAME], { I })
}
pub fn len(&self) -> usize {
dispatch_array_builder_variants!(self, inner, { inner.len() })
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl ArrayImpl {
pub fn len(&self) -> usize {
dispatch_array_variants!(self, inner, { inner.len() })
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn null_bitmap(&self) -> &Bitmap {
dispatch_array_variants!(self, inner, { inner.null_bitmap() })
}
pub fn into_null_bitmap(self) -> Bitmap {
dispatch_array_variants!(self, inner, { inner.into_null_bitmap() })
}
pub fn to_protobuf(&self) -> PbArray {
dispatch_array_variants!(self, inner, { inner.to_protobuf() })
}
pub fn hash_at<H: Hasher>(&self, idx: usize, state: &mut H) {
dispatch_array_variants!(self, inner, { inner.hash_at(idx, state) })
}
pub fn hash_vec<H: Hasher>(&self, hashers: &mut [H], vis: &Bitmap) {
dispatch_array_variants!(self, inner, { inner.hash_vec(hashers, vis) })
}
pub fn compact(&self, visibility: &Bitmap, cardinality: usize) -> Self {
dispatch_array_variants!(self, inner, {
inner.compact(visibility, cardinality).into()
})
}
pub fn get_ident(&self) -> &'static str {
dispatch_array_variants!(self, [I = VARIANT_NAME], { I })
}
pub fn datum_at(&self, idx: usize) -> Datum {
self.value_at(idx).to_owned_datum()
}
pub fn to_datum(&self) -> Datum {
assert_eq!(self.len(), 1);
self.datum_at(0)
}
pub fn value_at(&self, idx: usize) -> DatumRef<'_> {
dispatch_array_variants!(self, inner, {
inner.value_at(idx).map(ScalarRefImpl::from)
})
}
pub unsafe fn value_at_unchecked(&self, idx: usize) -> DatumRef<'_> {
dispatch_array_variants!(self, inner, {
inner.value_at_unchecked(idx).map(ScalarRefImpl::from)
})
}
pub fn set_bitmap(&mut self, bitmap: Bitmap) {
dispatch_array_variants!(self, inner, { inner.set_bitmap(bitmap) })
}
pub fn create_builder(&self, capacity: usize) -> ArrayBuilderImpl {
dispatch_array_variants!(self, inner, { inner.create_builder(capacity).into() })
}
pub fn data_type(&self) -> DataType {
dispatch_array_variants!(self, inner, { inner.data_type() })
}
pub fn into_ref(self) -> ArrayRef {
Arc::new(self)
}
pub fn iter(&self) -> impl DoubleEndedIterator<Item = DatumRef<'_>> + ExactSizeIterator {
(0..self.len()).map(|i| self.value_at(i))
}
}
pub type ArrayRef = Arc<ArrayImpl>;
impl PartialEq for ArrayImpl {
fn eq(&self, other: &Self) -> bool {
self.iter().eq(other.iter())
}
}
impl Eq for ArrayImpl {}
#[cfg(test)]
mod tests {
use super::*;
use crate::util::iter_util::ZipEqFast;
fn filter<'a, A, F>(data: &'a A, pred: F) -> ArrayResult<A>
where
A: Array + 'a,
F: Fn(Option<A::RefItem<'a>>) -> bool,
{
let mut builder = A::Builder::with_type(data.len(), data.data_type());
for i in 0..data.len() {
if pred(data.value_at(i)) {
builder.append(data.value_at(i));
}
}
Ok(builder.finish())
}
#[test]
fn test_filter() {
let mut builder = PrimitiveArrayBuilder::<i32>::new(0);
for i in 0..=60 {
builder.append(Some(i));
}
let array = filter(&builder.finish(), |x| x.unwrap_or(0) >= 60).unwrap();
assert_eq!(array.iter().collect::<Vec<Option<i32>>>(), vec![Some(60)]);
}
use num_traits::ops::checked::CheckedAdd;
fn vec_add<T1, T2, T3>(
a: &PrimitiveArray<T1>,
b: &PrimitiveArray<T2>,
) -> ArrayResult<PrimitiveArray<T3>>
where
T1: PrimitiveArrayItemType,
T2: PrimitiveArrayItemType,
T3: PrimitiveArrayItemType + CheckedAdd + From<T1> + From<T2>,
{
let mut builder = PrimitiveArrayBuilder::<T3>::new(a.len());
for (a, b) in a.iter().zip_eq_fast(b.iter()) {
let item = match (a, b) {
(Some(a), Some(b)) => Some(T3::from(a) + T3::from(b)),
_ => None,
};
builder.append(item);
}
Ok(builder.finish())
}
#[test]
fn test_vectorized_add() {
let mut builder = PrimitiveArrayBuilder::<i32>::new(0);
for i in 0..=60 {
builder.append(Some(i));
}
let array1 = builder.finish();
let mut builder = PrimitiveArrayBuilder::<i16>::new(0);
for i in 0..=60 {
builder.append(Some(i as i16));
}
let array2 = builder.finish();
let final_array = vec_add(&array1, &array2).unwrap() as PrimitiveArray<i64>;
assert_eq!(final_array.len(), array1.len());
for (idx, data) in final_array.iter().enumerate() {
assert_eq!(data, Some(idx as i64 * 2));
}
}
}
#[cfg(test)]
mod test_util {
use std::hash::{BuildHasher, Hasher};
use super::Array;
use crate::bitmap::Bitmap;
use crate::util::iter_util::ZipEqFast;
pub fn hash_finish<H: Hasher>(hashers: &[H]) -> Vec<u64> {
hashers
.iter()
.map(|hasher| hasher.finish())
.collect::<Vec<u64>>()
}
pub fn test_hash<H: BuildHasher, A: Array>(arrs: Vec<A>, expects: Vec<u64>, hasher_builder: H) {
let len = expects.len();
let mut states_scalar = Vec::with_capacity(len);
states_scalar.resize_with(len, || hasher_builder.build_hasher());
let mut states_vec = Vec::with_capacity(len);
states_vec.resize_with(len, || hasher_builder.build_hasher());
arrs.iter().for_each(|arr| {
for (i, state) in states_scalar.iter_mut().enumerate() {
arr.hash_at(i, state)
}
});
let vis = Bitmap::ones(len);
arrs.iter()
.for_each(|arr| arr.hash_vec(&mut states_vec[..], &vis));
itertools::cons_tuples(
expects
.iter()
.zip_eq_fast(hash_finish(&states_scalar[..]))
.zip_eq_fast(hash_finish(&states_vec[..])),
)
.all(|(a, b, c)| *a == b && b == c);
}
}