1use std::fmt::Debug;
16use std::io::{Cursor, Write};
17use std::mem::size_of;
18
19use anyhow::Context;
20use byteorder::{BigEndian, ReadBytesExt};
21use risingwave_common_estimate_size::{EstimateSize, ZeroHeapSize};
22use risingwave_pb::common::Buffer;
23use risingwave_pb::common::buffer::CompressionType;
24use risingwave_pb::data::{ArrayType, PbArray};
25
26use super::{Array, ArrayBuilder, ArrayImpl, ArrayResult};
27use crate::bitmap::{Bitmap, BitmapBuilder};
28use crate::for_all_native_types;
29use crate::types::*;
30
31pub trait PrimitiveArrayItemType
33where
34 for<'a> Self: Sized
35 + Default
36 + PartialOrd
37 + ZeroHeapSize
38 + Scalar<ScalarRefType<'a> = Self>
39 + ScalarRef<'a, ScalarType = Self>,
40{
41 const DATA_TYPE: DataType;
43 fn erase_array_type(arr: PrimitiveArray<Self>) -> ArrayImpl;
46 fn try_into_array(arr: ArrayImpl) -> Option<PrimitiveArray<Self>>;
48 fn try_into_array_ref(arr: &ArrayImpl) -> Option<&PrimitiveArray<Self>>;
50 fn array_type() -> ArrayType;
52
53 fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize>;
55 fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<Self>;
56}
57
58macro_rules! impl_array_methods {
59 ($scalar_type:ty, $array_type_pb:ident, $array_impl_variant:ident) => {
60 const DATA_TYPE: DataType = DataType::$array_impl_variant;
61
62 fn erase_array_type(arr: PrimitiveArray<Self>) -> ArrayImpl {
63 ArrayImpl::$array_impl_variant(arr)
64 }
65
66 fn try_into_array(arr: ArrayImpl) -> Option<PrimitiveArray<Self>> {
67 match arr {
68 ArrayImpl::$array_impl_variant(inner) => Some(inner),
69 _ => None,
70 }
71 }
72
73 fn try_into_array_ref(arr: &ArrayImpl) -> Option<&PrimitiveArray<Self>> {
74 match arr {
75 ArrayImpl::$array_impl_variant(inner) => Some(inner),
76 _ => None,
77 }
78 }
79
80 fn array_type() -> ArrayType {
81 ArrayType::$array_type_pb
82 }
83 };
84}
85
86macro_rules! impl_primitive_for_native_types {
87 ($({ $naive_type:ty, $scalar_type:ident, $read_fn:ident } ),*) => {
88 $(
89 impl PrimitiveArrayItemType for $naive_type {
90 impl_array_methods!($naive_type, $scalar_type, $scalar_type);
91
92 fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize> {
93 NativeType::to_protobuf(self, output)
94 }
95 fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<Self> {
96 let v = cur
97 .$read_fn::<BigEndian>()
98 .context("failed to read value from buffer")?;
99 Ok(v.into())
100 }
101 }
102 )*
103 }
104}
105
106for_all_native_types! { impl_primitive_for_native_types }
107
108macro_rules! impl_primitive_for_others {
110 ($({ $scalar_type:ty, $array_type_pb:ident, $array_impl_variant:ident } ),*) => {
111 $(
112 impl PrimitiveArrayItemType for $scalar_type {
113 impl_array_methods!($scalar_type, $array_type_pb, $array_impl_variant);
114
115 fn to_protobuf<T: Write>(self, output: &mut T) -> ArrayResult<usize> {
116 <$scalar_type>::to_protobuf(self, output)
117 }
118 fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<Self> {
119 <$scalar_type>::from_protobuf(cur)
120 }
121 }
122 )*
123 }
124}
125
126impl_primitive_for_others! {
127 { Decimal, Decimal, Decimal },
128 { Interval, Interval, Interval },
129 { Date, Date, Date },
130 { Time, Time, Time },
131 { Timestamp, Timestamp, Timestamp },
132 { Timestamptz, Timestamptz, Timestamptz }
133}
134
135#[derive(Debug, Clone, PartialEq, Eq, EstimateSize)]
137pub struct PrimitiveArray<T: PrimitiveArrayItemType> {
138 bitmap: Bitmap,
139 data: Box<[T]>,
140}
141
142impl<T: PrimitiveArrayItemType> FromIterator<Option<T>> for PrimitiveArray<T> {
143 fn from_iter<I: IntoIterator<Item = Option<T>>>(iter: I) -> Self {
144 let iter = iter.into_iter();
145 let mut builder = <Self as Array>::Builder::new(iter.size_hint().0);
146 for i in iter {
147 builder.append(i);
148 }
149 builder.finish()
150 }
151}
152
153impl<'a, T: PrimitiveArrayItemType> FromIterator<&'a Option<T>> for PrimitiveArray<T> {
154 fn from_iter<I: IntoIterator<Item = &'a Option<T>>>(iter: I) -> Self {
155 iter.into_iter().cloned().collect()
156 }
157}
158
159impl<T: PrimitiveArrayItemType> FromIterator<T> for PrimitiveArray<T> {
160 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
161 let data: Box<[T]> = iter.into_iter().collect();
162 PrimitiveArray {
163 bitmap: Bitmap::ones(data.len()),
164 data,
165 }
166 }
167}
168
169impl FromIterator<Option<f32>> for PrimitiveArray<F32> {
170 fn from_iter<I: IntoIterator<Item = Option<f32>>>(iter: I) -> Self {
171 iter.into_iter().map(|o| o.map(F32::from)).collect()
172 }
173}
174
175impl FromIterator<Option<f64>> for PrimitiveArray<F64> {
176 fn from_iter<I: IntoIterator<Item = Option<f64>>>(iter: I) -> Self {
177 iter.into_iter().map(|o| o.map(F64::from)).collect()
178 }
179}
180
181impl FromIterator<f32> for PrimitiveArray<F32> {
182 fn from_iter<I: IntoIterator<Item = f32>>(iter: I) -> Self {
183 iter.into_iter().map(F32::from).collect()
184 }
185}
186
187impl FromIterator<f64> for PrimitiveArray<F64> {
188 fn from_iter<I: IntoIterator<Item = f64>>(iter: I) -> Self {
189 iter.into_iter().map(F64::from).collect()
190 }
191}
192
193impl<T: PrimitiveArrayItemType> PrimitiveArray<T> {
194 pub fn from_iter_bitmap(iter: impl IntoIterator<Item = T>, bitmap: Bitmap) -> Self {
198 let data: Box<[T]> = iter.into_iter().collect();
199 assert_eq!(data.len(), bitmap.len());
200 PrimitiveArray { bitmap, data }
201 }
202
203 pub fn as_slice(&self) -> &[T] {
205 &self.data
206 }
207
208 pub fn as_mut_slice(&mut self) -> &mut [T] {
210 &mut self.data
211 }
212}
213
214impl<T: PrimitiveArrayItemType> Array for PrimitiveArray<T> {
215 type Builder = PrimitiveArrayBuilder<T>;
216 type OwnedItem = T;
217 type RefItem<'a> = T;
218
219 unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> {
220 unsafe { *self.data.get_unchecked(idx) }
221 }
222
223 fn raw_iter(&self) -> impl ExactSizeIterator<Item = Self::RefItem<'_>> {
224 self.data.iter().cloned()
225 }
226
227 fn len(&self) -> usize {
228 self.data.len()
229 }
230
231 fn to_protobuf(&self) -> PbArray {
232 let mut output_buffer = Vec::<u8>::with_capacity(self.len() * size_of::<T>());
233
234 for v in self.iter() {
235 v.map(|node| node.to_protobuf(&mut output_buffer));
236 }
237
238 let buffer = Buffer {
239 compression: CompressionType::None as i32,
240 body: output_buffer,
241 };
242 let null_bitmap = self.null_bitmap().to_protobuf();
243 PbArray {
244 null_bitmap: Some(null_bitmap),
245 values: vec![buffer],
246 array_type: T::array_type() as i32,
247 struct_array_data: None,
248 list_array_data: None,
249 }
250 }
251
252 fn null_bitmap(&self) -> &Bitmap {
253 &self.bitmap
254 }
255
256 fn into_null_bitmap(self) -> Bitmap {
257 self.bitmap
258 }
259
260 fn set_bitmap(&mut self, bitmap: Bitmap) {
261 self.bitmap = bitmap;
262 }
263
264 fn data_type(&self) -> DataType {
265 T::DATA_TYPE
266 }
267}
268
269#[derive(Debug, Clone, EstimateSize)]
271pub struct PrimitiveArrayBuilder<T: PrimitiveArrayItemType> {
272 bitmap: BitmapBuilder,
273 data: Vec<T>,
274}
275
276impl<T: PrimitiveArrayItemType> ArrayBuilder for PrimitiveArrayBuilder<T> {
277 type ArrayType = PrimitiveArray<T>;
278
279 fn new(capacity: usize) -> Self {
280 Self {
281 bitmap: BitmapBuilder::with_capacity(capacity),
282 data: Vec::with_capacity(capacity),
283 }
284 }
285
286 fn with_type(capacity: usize, ty: DataType) -> Self {
287 assert_eq!(ty, T::DATA_TYPE);
288 Self::new(capacity)
289 }
290
291 fn append_n(&mut self, n: usize, value: Option<T>) {
292 match value {
293 Some(x) => {
294 self.bitmap.append_n(n, true);
295 self.data.extend(std::iter::repeat_n(x, n));
296 }
297 None => {
298 self.bitmap.append_n(n, false);
299 self.data.extend(std::iter::repeat_n(T::default(), n));
300 }
301 }
302 }
303
304 fn append_array(&mut self, other: &PrimitiveArray<T>) {
305 for bit in other.bitmap.iter() {
306 self.bitmap.append(bit);
307 }
308 self.data.extend_from_slice(&other.data);
309 }
310
311 fn pop(&mut self) -> Option<()> {
312 self.data.pop().map(|_| self.bitmap.pop().unwrap())
313 }
314
315 fn len(&self) -> usize {
316 self.bitmap.len()
317 }
318
319 fn finish(self) -> PrimitiveArray<T> {
320 PrimitiveArray {
321 bitmap: self.bitmap.finish(),
322 data: self.data.into(),
323 }
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 fn helper_test_builder<T: PrimitiveArrayItemType>(data: Vec<Option<T>>) -> PrimitiveArray<T> {
332 let mut builder = PrimitiveArrayBuilder::<T>::new(data.len());
333 for d in data {
334 builder.append(d);
335 }
336 builder.finish()
337 }
338
339 #[test]
340 fn test_i16_builder() {
341 let arr = helper_test_builder::<i16>(
342 (0..1000)
343 .map(|x| if x % 2 == 0 { None } else { Some(x) })
344 .collect(),
345 );
346 if !matches!(ArrayImpl::from(arr), ArrayImpl::Int16(_)) {
347 unreachable!()
348 }
349 }
350
351 #[test]
352 fn test_i32_builder() {
353 let arr = helper_test_builder::<i32>(
354 (0..1000)
355 .map(|x| if x % 2 == 0 { None } else { Some(x) })
356 .collect(),
357 );
358 if !matches!(ArrayImpl::from(arr), ArrayImpl::Int32(_)) {
359 unreachable!()
360 }
361 }
362
363 #[test]
364 fn test_i64_builder() {
365 let arr = helper_test_builder::<i64>(
366 (0..1000)
367 .map(|x| if x % 2 == 0 { None } else { Some(x) })
368 .collect(),
369 );
370 if !matches!(ArrayImpl::from(arr), ArrayImpl::Int64(_)) {
371 unreachable!()
372 }
373 }
374
375 #[test]
376 fn test_f32_builder() {
377 let arr = helper_test_builder::<F32>(
378 (0..1000)
379 .map(|x| {
380 if x % 2 == 0 {
381 None
382 } else {
383 Some((x as f32).into())
384 }
385 })
386 .collect(),
387 );
388 if !matches!(ArrayImpl::from(arr), ArrayImpl::Float32(_)) {
389 unreachable!()
390 }
391 }
392
393 #[test]
394 fn test_f64_builder() {
395 let arr = helper_test_builder::<F64>(
396 (0..1000)
397 .map(|x| {
398 if x % 2 == 0 {
399 None
400 } else {
401 Some((x as f64).into())
402 }
403 })
404 .collect(),
405 );
406 if !matches!(ArrayImpl::from(arr), ArrayImpl::Float64(_)) {
407 unreachable!()
408 }
409 }
410}