risingwave_common/array/
num256_array.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::{Cursor, Read};
16
17use ethnum::I256;
18use risingwave_common_estimate_size::EstimateSize;
19use risingwave_pb::common::Buffer;
20use risingwave_pb::common::buffer::CompressionType;
21use risingwave_pb::data::PbArray;
22
23use crate::array::{Array, ArrayBuilder, ArrayImpl, ArrayResult};
24use crate::bitmap::{Bitmap, BitmapBuilder};
25use crate::types::{DataType, Int256, Int256Ref, Scalar};
26
27#[derive(Debug, Clone, EstimateSize)]
28pub struct Int256ArrayBuilder {
29    bitmap: BitmapBuilder,
30    data: Vec<I256>,
31}
32
33#[derive(Debug, Clone, PartialEq, EstimateSize)]
34pub struct Int256Array {
35    bitmap: Bitmap,
36    data: Box<[I256]>,
37}
38
39#[rustfmt::skip]
40macro_rules! impl_array_for_num256 {
41    (
42        $array:ty,
43        $array_builder:ty,
44        $scalar:ident,
45        $scalar_ref:ident < $gen:tt > ,
46        $variant_name:ident
47    ) => {
48        impl Array for $array {
49            type Builder = $array_builder;
50            type OwnedItem = $scalar;
51            type RefItem<$gen> = $scalar_ref<$gen>;
52
53            unsafe fn raw_value_at_unchecked(&self, idx: usize) -> Self::RefItem<'_> { unsafe {
54                $scalar_ref(self.data.get_unchecked(idx))
55            }}
56
57            fn len(&self) -> usize {
58                self.data.len()
59            }
60
61            fn to_protobuf(&self) -> PbArray {
62                let mut output_buffer = Vec::<u8>::with_capacity(self.len() * $scalar::size());
63
64                for v in self.iter() {
65                    v.map(|node| node.to_protobuf(&mut output_buffer));
66                }
67
68                let buffer = Buffer {
69                    compression: CompressionType::None as i32,
70                    body: output_buffer,
71                };
72
73                PbArray {
74                    null_bitmap: Some(self.null_bitmap().to_protobuf()),
75                    values: vec![buffer],
76                    array_type: $scalar::array_type() as i32,
77                    struct_array_data: None,
78                    list_array_data: None,
79                }
80            }
81
82            fn null_bitmap(&self) -> &Bitmap {
83                &self.bitmap
84            }
85
86            fn into_null_bitmap(self) -> Bitmap {
87                self.bitmap
88            }
89
90            fn set_bitmap(&mut self, bitmap: Bitmap) {
91                self.bitmap = bitmap;
92            }
93
94            fn data_type(&self) -> DataType {
95                DataType::$variant_name
96            }
97        }
98
99        impl ArrayBuilder for $array_builder {
100            type ArrayType = $array;
101
102            fn new(capacity: usize) -> Self {
103                Self {
104                    bitmap: BitmapBuilder::with_capacity(capacity),
105                    data: Vec::with_capacity(capacity),
106                }
107            }
108
109            fn with_type(capacity: usize, ty: DataType) -> Self {
110                assert_eq!(ty, DataType::$variant_name);
111                Self::new(capacity)
112            }
113
114            fn append_n(
115                &mut self,
116                n: usize,
117                value: Option<<Self::ArrayType as Array>::RefItem<'_>>,
118            ) {
119                match value {
120                    Some(x) => {
121                        self.bitmap.append_n(n, true);
122                        self.data
123                            .extend(std::iter::repeat(x).take(n).map(|x| x.0.clone()));
124                    }
125                    None => {
126                        self.bitmap.append_n(n, false);
127                        self.data
128                            .extend(std::iter::repeat($scalar::default().into_inner()).take(n));
129                    }
130                }
131            }
132
133            fn append_array(&mut self, other: &Self::ArrayType) {
134                for bit in other.bitmap.iter() {
135                    self.bitmap.append(bit);
136                }
137                self.data.extend_from_slice(&other.data);
138            }
139
140            fn pop(&mut self) -> Option<()> {
141                self.data.pop().map(|_| self.bitmap.pop().unwrap())
142            }
143
144            fn len(&self) -> usize {
145                self.bitmap.len()
146            }
147
148            fn finish(self) -> Self::ArrayType {
149                Self::ArrayType {
150                    bitmap: self.bitmap.finish(),
151                    data: self.data.into(),
152                }
153            }
154        }
155
156        impl<$gen> FromIterator<Option<$scalar_ref<$gen>>> for $array {
157            fn from_iter<T: IntoIterator<Item = Option<$scalar_ref<$gen>>>>(iter: T) -> Self {
158                let iter = iter.into_iter();
159                let mut builder = <Self as Array>::Builder::new(iter.size_hint().0);
160                for i in iter {
161                    builder.append(i);
162                }
163                builder.finish()
164            }
165        }
166
167        impl $array {
168            pub fn from_protobuf(array: &PbArray, cardinality: usize) -> ArrayResult<ArrayImpl> {
169                ensure!(
170                    array.get_values().len() == 1,
171                    "Must have only 1 buffer in array"
172                );
173
174                let buf = array.get_values()[0].get_body().as_slice();
175
176                let mut builder = <$array_builder>::new(cardinality);
177                let bitmap: Bitmap = array.get_null_bitmap()?.into();
178                let mut cursor = Cursor::new(buf);
179                for not_null in bitmap.iter() {
180                    if not_null {
181                        let mut buf = [0u8; $scalar::size()];
182                        cursor.read_exact(&mut buf)?;
183                        let item = <$scalar>::from_be_bytes(buf);
184                        builder.append(Some(item.as_scalar_ref()));
185                    } else {
186                        builder.append(None);
187                    }
188                }
189                let arr = builder.finish();
190                ensure_eq!(arr.len(), cardinality);
191
192                Ok(arr.into())
193            }
194        }
195    };
196}
197
198impl_array_for_num256!(
199    Int256Array,
200    Int256ArrayBuilder,
201    Int256,
202    Int256Ref<'a>,
203    Int256
204);
205
206impl FromIterator<Int256> for Int256Array {
207    fn from_iter<I: IntoIterator<Item = Int256>>(iter: I) -> Self {
208        let data: Box<[I256]> = iter.into_iter().map(|i| *i.0).collect();
209        Int256Array {
210            bitmap: Bitmap::ones(data.len()),
211            data,
212        }
213    }
214}