risingwave_common/array/
primitive_array.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::io::{Cursor, Write};
17use std::mem::size_of;
18
19use anyhow::Context;
20use byteorder::{BigEndian, ReadBytesExt};
21use risingwave_common_estimate_size::{EstimateSize, ZeroHeapSize};
22use risingwave_pb::common::Buffer;
23use risingwave_pb::common::buffer::CompressionType;
24use risingwave_pb::data::{ArrayType, PbArray};
25
26use super::{Array, ArrayBuilder, ArrayImpl, ArrayResult};
27use crate::bitmap::{Bitmap, BitmapBuilder};
28use crate::for_all_native_types;
29use crate::types::*;
30
31/// Physical type of array items which have fixed size.
32pub trait PrimitiveArrayItemType
33where
34    for<'a> Self: Sized
35        + Default
36        + PartialOrd
37        + ZeroHeapSize
38        + Scalar<ScalarRefType<'a> = Self>
39        + ScalarRef<'a, ScalarType = Self>,
40{
41    /// The data type.
42    const DATA_TYPE: DataType;
43    // array methods
44    /// A helper to convert a primitive array to `ArrayImpl`.
45    fn erase_array_type(arr: PrimitiveArray<Self>) -> ArrayImpl;
46    /// A helper to convert `ArrayImpl` to self.
47    fn try_into_array(arr: ArrayImpl) -> Option<PrimitiveArray<Self>>;
48    /// A helper to convert `ArrayImpl` to self.
49    fn try_into_array_ref(arr: &ArrayImpl) -> Option<&PrimitiveArray<Self>>;
50    /// Returns array type of the primitive array
51    fn array_type() -> ArrayType;
52
53    // item methods
54    fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize>;
55    fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<Self>;
56}
57
58macro_rules! impl_array_methods {
59    ($scalar_type:ty, $array_type_pb:ident, $array_impl_variant:ident) => {
60        const DATA_TYPE: DataType = DataType::$array_impl_variant;
61
62        fn erase_array_type(arr: PrimitiveArray<Self>) -> ArrayImpl {
63            ArrayImpl::$array_impl_variant(arr)
64        }
65
66        fn try_into_array(arr: ArrayImpl) -> Option<PrimitiveArray<Self>> {
67            match arr {
68                ArrayImpl::$array_impl_variant(inner) => Some(inner),
69                _ => None,
70            }
71        }
72
73        fn try_into_array_ref(arr: &ArrayImpl) -> Option<&PrimitiveArray<Self>> {
74            match arr {
75                ArrayImpl::$array_impl_variant(inner) => Some(inner),
76                _ => None,
77            }
78        }
79
80        fn array_type() -> ArrayType {
81            ArrayType::$array_type_pb
82        }
83    };
84}
85
86macro_rules! impl_primitive_for_native_types {
87    ($({ $naive_type:ty, $scalar_type:ident, $read_fn:ident } ),*) => {
88        $(
89            impl PrimitiveArrayItemType for $naive_type {
90                impl_array_methods!($naive_type, $scalar_type, $scalar_type);
91
92                fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize> {
93                    NativeType::to_protobuf(self, output)
94                }
95                fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<Self> {
96                    let v = cur
97                        .$read_fn::<BigEndian>()
98                        .context("failed to read value from buffer")?;
99                    Ok(v.into())
100                }
101            }
102        )*
103    }
104}
105
106for_all_native_types! { impl_primitive_for_native_types }
107
108/// These types have `to_protobuf` and implement `Hash`.
109macro_rules! impl_primitive_for_others {
110    ($({ $scalar_type:ty, $array_type_pb:ident, $array_impl_variant:ident } ),*) => {
111        $(
112            impl PrimitiveArrayItemType for $scalar_type {
113                impl_array_methods!($scalar_type, $array_type_pb, $array_impl_variant);
114
115                fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize> {
116                    <$scalar_type>::to_protobuf(self, output)
117                }
118                fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<Self> {
119                    <$scalar_type>::from_protobuf(cur)
120                }
121            }
122        )*
123    }
124}
125
126impl_primitive_for_others! {
127    { Decimal, Decimal, Decimal },
128    { Interval, Interval, Interval },
129    { Date, Date, Date },
130    { Time, Time, Time },
131    { Timestamp, Timestamp, Timestamp },
132    { Timestamptz, Timestamptz, Timestamptz }
133}
134
135/// `PrimitiveArray` is a collection of primitive types, such as `i32`, `f32`.
136#[derive(Debug, Clone, PartialEq, Eq, EstimateSize)]
137pub struct PrimitiveArray<T: PrimitiveArrayItemType> {
138    bitmap: Bitmap,
139    data: Box<[T]>,
140}
141
142impl<T: PrimitiveArrayItemType> FromIterator<Option<T>> for PrimitiveArray<T> {
143    fn from_iter<I: IntoIterator<Item = Option<T>>>(iter: I) -> Self {
144        let iter = iter.into_iter();
145        let mut builder = <Self as Array>::Builder::new(iter.size_hint().0);
146        for i in iter {
147            builder.append(i);
148        }
149        builder.finish()
150    }
151}
152
153impl<'a, T: PrimitiveArrayItemType> FromIterator<&'a Option<T>> for PrimitiveArray<T> {
154    fn from_iter<I: IntoIterator<Item = &'a Option<T>>>(iter: I) -> Self {
155        iter.into_iter().cloned().collect()
156    }
157}
158
159impl<T: PrimitiveArrayItemType> FromIterator<T> for PrimitiveArray<T> {
160    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
161        let data: Box<[T]> = iter.into_iter().collect();
162        PrimitiveArray {
163            bitmap: Bitmap::ones(data.len()),
164            data,
165        }
166    }
167}
168
169impl FromIterator<Option<f32>> for PrimitiveArray<F32> {
170    fn from_iter<I: IntoIterator<Item = Option<f32>>>(iter: I) -> Self {
171        iter.into_iter().map(|o| o.map(F32::from)).collect()
172    }
173}
174
175impl FromIterator<Option<f64>> for PrimitiveArray<F64> {
176    fn from_iter<I: IntoIterator<Item = Option<f64>>>(iter: I) -> Self {
177        iter.into_iter().map(|o| o.map(F64::from)).collect()
178    }
179}
180
181impl FromIterator<f32> for PrimitiveArray<F32> {
182    fn from_iter<I: IntoIterator<Item = f32>>(iter: I) -> Self {
183        iter.into_iter().map(F32::from).collect()
184    }
185}
186
187impl FromIterator<f64> for PrimitiveArray<F64> {
188    fn from_iter<I: IntoIterator<Item = f64>>(iter: I) -> Self {
189        iter.into_iter().map(F64::from).collect()
190    }
191}
192
193impl<T: PrimitiveArrayItemType> PrimitiveArray<T> {
194    /// Build a [`PrimitiveArray`] from iterator and bitmap.
195    ///
196    /// NOTE: The length of `bitmap` must be equal to the length of `iter`.
197    pub fn from_iter_bitmap(iter: impl IntoIterator<Item = T>, bitmap: Bitmap) -> Self {
198        let data: Box<[T]> = iter.into_iter().collect();
199        assert_eq!(data.len(), bitmap.len());
200        PrimitiveArray { bitmap, data }
201    }
202
203    /// Returns a slice containing the entire array.
204    pub fn as_slice(&self) -> &[T] {
205        &self.data
206    }
207
208    /// Returns a mutable slice containing the entire array.
209    pub fn as_mut_slice(&mut self) -> &mut [T] {
210        &mut self.data
211    }
212}
213
214impl<T: PrimitiveArrayItemType> Array for PrimitiveArray<T> {
215    type Builder = PrimitiveArrayBuilder<T>;
216    type OwnedItem = T;
217    type RefItem<'a> = T;
218
219    unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> {
220        unsafe { *self.data.get_unchecked(idx) }
221    }
222
223    fn raw_iter(&self) -> impl ExactSizeIterator<Item = Self::RefItem<'_>> {
224        self.data.iter().cloned()
225    }
226
227    fn len(&self) -> usize {
228        self.data.len()
229    }
230
231    fn to_protobuf(&self) -> PbArray {
232        let mut output_buffer = Vec::<u8>::with_capacity(self.len() * size_of::<T>());
233
234        for v in self.iter() {
235            v.map(|node| node.to_protobuf(&mut output_buffer));
236        }
237
238        let buffer = Buffer {
239            compression: CompressionType::None as i32,
240            body: output_buffer,
241        };
242        let null_bitmap = self.null_bitmap().to_protobuf();
243        PbArray {
244            null_bitmap: Some(null_bitmap),
245            values: vec![buffer],
246            array_type: T::array_type() as i32,
247            struct_array_data: None,
248            list_array_data: None,
249        }
250    }
251
252    fn null_bitmap(&self) -> &Bitmap {
253        &self.bitmap
254    }
255
256    fn into_null_bitmap(self) -> Bitmap {
257        self.bitmap
258    }
259
260    fn set_bitmap(&mut self, bitmap: Bitmap) {
261        self.bitmap = bitmap;
262    }
263
264    fn data_type(&self) -> DataType {
265        T::DATA_TYPE
266    }
267}
268
269/// `PrimitiveArrayBuilder` constructs a `PrimitiveArray` from `Option<Primitive>`.
270#[derive(Debug, Clone, EstimateSize)]
271pub struct PrimitiveArrayBuilder<T: PrimitiveArrayItemType> {
272    bitmap: BitmapBuilder,
273    data: Vec<T>,
274}
275
276impl<T: PrimitiveArrayItemType> ArrayBuilder for PrimitiveArrayBuilder<T> {
277    type ArrayType = PrimitiveArray<T>;
278
279    fn new(capacity: usize) -> Self {
280        Self {
281            bitmap: BitmapBuilder::with_capacity(capacity),
282            data: Vec::with_capacity(capacity),
283        }
284    }
285
286    fn with_type(capacity: usize, ty: DataType) -> Self {
287        assert_eq!(ty, T::DATA_TYPE);
288        Self::new(capacity)
289    }
290
291    fn append_n(&mut self, n: usize, value: Option<T>) {
292        match value {
293            Some(x) => {
294                self.bitmap.append_n(n, true);
295                self.data.extend(std::iter::repeat_n(x, n));
296            }
297            None => {
298                self.bitmap.append_n(n, false);
299                self.data.extend(std::iter::repeat_n(T::default(), n));
300            }
301        }
302    }
303
304    fn append_array(&mut self, other: &PrimitiveArray<T>) {
305        for bit in other.bitmap.iter() {
306            self.bitmap.append(bit);
307        }
308        self.data.extend_from_slice(&other.data);
309    }
310
311    fn pop(&mut self) -> Option<()> {
312        self.data.pop().map(|_| self.bitmap.pop().unwrap())
313    }
314
315    fn len(&self) -> usize {
316        self.bitmap.len()
317    }
318
319    fn finish(self) -> PrimitiveArray<T> {
320        PrimitiveArray {
321            bitmap: self.bitmap.finish(),
322            data: self.data.into(),
323        }
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    fn helper_test_builder<T: PrimitiveArrayItemType>(data: Vec<Option<T>>) -> PrimitiveArray<T> {
332        let mut builder = PrimitiveArrayBuilder::<T>::new(data.len());
333        for d in data {
334            builder.append(d);
335        }
336        builder.finish()
337    }
338
339    #[test]
340    fn test_i16_builder() {
341        let arr = helper_test_builder::<i16>(
342            (0..1000)
343                .map(|x| if x % 2 == 0 { None } else { Some(x) })
344                .collect(),
345        );
346        if !matches!(ArrayImpl::from(arr), ArrayImpl::Int16(_)) {
347            unreachable!()
348        }
349    }
350
351    #[test]
352    fn test_i32_builder() {
353        let arr = helper_test_builder::<i32>(
354            (0..1000)
355                .map(|x| if x % 2 == 0 { None } else { Some(x) })
356                .collect(),
357        );
358        if !matches!(ArrayImpl::from(arr), ArrayImpl::Int32(_)) {
359            unreachable!()
360        }
361    }
362
363    #[test]
364    fn test_i64_builder() {
365        let arr = helper_test_builder::<i64>(
366            (0..1000)
367                .map(|x| if x % 2 == 0 { None } else { Some(x) })
368                .collect(),
369        );
370        if !matches!(ArrayImpl::from(arr), ArrayImpl::Int64(_)) {
371            unreachable!()
372        }
373    }
374
375    #[test]
376    fn test_f32_builder() {
377        let arr = helper_test_builder::<F32>(
378            (0..1000)
379                .map(|x| {
380                    if x % 2 == 0 {
381                        None
382                    } else {
383                        Some((x as f32).into())
384                    }
385                })
386                .collect(),
387        );
388        if !matches!(ArrayImpl::from(arr), ArrayImpl::Float32(_)) {
389            unreachable!()
390        }
391    }
392
393    #[test]
394    fn test_f64_builder() {
395        let arr = helper_test_builder::<F64>(
396            (0..1000)
397                .map(|x| {
398                    if x % 2 == 0 {
399                        None
400                    } else {
401                        Some((x as f64).into())
402                    }
403                })
404                .collect(),
405        );
406        if !matches!(ArrayImpl::from(arr), ArrayImpl::Float64(_)) {
407            unreachable!()
408        }
409    }
410}