1use std::io::{Cursor, Read};
16
17use anyhow::Context;
18use byteorder::{BigEndian, ReadBytesExt};
19use risingwave_pb::data::PbArrayType;
20
21use super::*;
22
23impl ArrayImpl {
24 pub fn from_protobuf(array: &PbArray, cardinality: usize) -> ArrayResult<Self> {
25 let array = match array.array_type() {
26 PbArrayType::Unspecified => unreachable!(),
27 PbArrayType::Int16 => read_primitive_array::<i16>(array, cardinality)?,
28 PbArrayType::Int32 => read_primitive_array::<i32>(array, cardinality)?,
29 PbArrayType::Int64 => read_primitive_array::<i64>(array, cardinality)?,
30 PbArrayType::Serial => read_primitive_array::<Serial>(array, cardinality)?,
31 PbArrayType::Float32 => read_primitive_array::<F32>(array, cardinality)?,
32 PbArrayType::Float64 => read_primitive_array::<F64>(array, cardinality)?,
33 PbArrayType::Bool => read_bool_array(array, cardinality)?,
34 PbArrayType::Utf8 => read_string_array::<Utf8ValueReader>(array, cardinality)?,
35 PbArrayType::Decimal => read_primitive_array::<Decimal>(array, cardinality)?,
36 PbArrayType::Date => read_primitive_array::<Date>(array, cardinality)?,
37 PbArrayType::Time => read_primitive_array::<Time>(array, cardinality)?,
38 PbArrayType::Timestamp => read_primitive_array::<Timestamp>(array, cardinality)?,
39 PbArrayType::Timestamptz => read_primitive_array::<Timestamptz>(array, cardinality)?,
40 PbArrayType::Interval => read_primitive_array::<Interval>(array, cardinality)?,
41 PbArrayType::Jsonb => JsonbArray::from_protobuf(array)?,
42 PbArrayType::Struct => StructArray::from_protobuf(array)?,
43 PbArrayType::List => ListArray::from_protobuf(array)?,
44 PbArrayType::Bytea => read_string_array::<BytesValueReader>(array, cardinality)?,
45 PbArrayType::Int256 => Int256Array::from_protobuf(array, cardinality)?,
46 PbArrayType::Map => MapArray::from_protobuf(array)?,
47 };
48 Ok(array)
49 }
50}
51
52fn read_primitive_array<T: PrimitiveArrayItemType>(
56 array: &PbArray,
57 cardinality: usize,
58) -> ArrayResult<ArrayImpl> {
59 ensure!(
60 array.get_values().len() == 1,
61 "Must have only 1 buffer in a numeric array"
62 );
63
64 let buf = array.get_values()[0].get_body().as_slice();
65
66 let mut builder = PrimitiveArrayBuilder::<T>::new(cardinality);
67 let bitmap: Bitmap = array.get_null_bitmap()?.into();
68 let mut cursor = Cursor::new(buf);
69 for not_null in bitmap.iter() {
70 if not_null {
71 let v = T::from_protobuf(&mut cursor)?;
72 builder.append(Some(v));
73 } else {
74 builder.append(None);
75 }
76 }
77 let arr = builder.finish();
78 ensure_eq!(arr.len(), cardinality);
79
80 Ok(arr.into())
81}
82
83fn read_bool_array(array: &PbArray, cardinality: usize) -> ArrayResult<ArrayImpl> {
84 ensure!(
85 array.get_values().len() == 1,
86 "Must have only 1 buffer in a bool array"
87 );
88
89 let data = (&array.get_values()[0]).into();
90 let bitmap: Bitmap = array.get_null_bitmap()?.into();
91
92 let arr = BoolArray::new(data, bitmap);
93 ensure_eq!(arr.len(), cardinality);
94
95 Ok(arr.into())
96}
97
98fn read_offset(offset_cursor: &mut Cursor<&[u8]>) -> ArrayResult<i64> {
99 let offset = offset_cursor
100 .read_i64::<BigEndian>()
101 .context("failed to read i64 from offset buffer")?;
102
103 Ok(offset)
104}
105
106trait VarSizedValueReader {
107 type AB: ArrayBuilder;
108 fn new_builder(capacity: usize) -> Self::AB;
109 fn read(buf: &[u8], builder: &mut Self::AB) -> ArrayResult<()>;
110}
111
112struct Utf8ValueReader;
113
114impl VarSizedValueReader for Utf8ValueReader {
115 type AB = Utf8ArrayBuilder;
116
117 fn new_builder(capacity: usize) -> Self::AB {
118 Utf8ArrayBuilder::new(capacity)
119 }
120
121 fn read(buf: &[u8], builder: &mut Utf8ArrayBuilder) -> ArrayResult<()> {
122 let s = std::str::from_utf8(buf).context("failed to read utf8 string from bytes")?;
123 builder.append(Some(s));
124 Ok(())
125 }
126}
127
128struct BytesValueReader;
129
130impl VarSizedValueReader for BytesValueReader {
131 type AB = BytesArrayBuilder;
132
133 fn new_builder(capacity: usize) -> Self::AB {
134 BytesArrayBuilder::new(capacity)
135 }
136
137 fn read(buf: &[u8], builder: &mut BytesArrayBuilder) -> ArrayResult<()> {
138 builder.append(Some(buf));
139 Ok(())
140 }
141}
142
143fn read_string_array<R: VarSizedValueReader>(
144 array: &PbArray,
145 cardinality: usize,
146) -> ArrayResult<ArrayImpl> {
147 ensure!(
148 array.get_values().len() == 2,
149 "Must have exactly 2 buffers in a string array"
150 );
151 let offset_buff = array.get_values()[0].get_body().as_slice();
152 let data_buf = array.get_values()[1].get_body().as_slice();
153
154 let mut builder = R::new_builder(cardinality);
155 let bitmap: Bitmap = array.get_null_bitmap()?.into();
156 let mut offset_cursor = Cursor::new(offset_buff);
157 let mut data_cursor = Cursor::new(data_buf);
158 let mut prev_offset: i64 = -1;
159
160 let mut buf = Vec::new();
161 for not_null in bitmap.iter() {
162 if not_null {
163 if prev_offset < 0 {
164 prev_offset = read_offset(&mut offset_cursor)?;
165 }
166 let offset = read_offset(&mut offset_cursor)?;
167 let length = (offset - prev_offset) as usize;
168 prev_offset = offset;
169 buf.resize(length, Default::default());
170 data_cursor
171 .read_exact(buf.as_mut_slice())
172 .with_context(|| {
173 format!(
174 "failed to read str from data buffer [length={}, offset={}]",
175 length, offset
176 )
177 })?;
178 R::read(buf.as_slice(), &mut builder)?;
179 } else {
180 builder.append(None);
181 }
182 }
183 let arr = builder.finish();
184 ensure_eq!(arr.len(), cardinality);
185
186 Ok(arr.into())
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 #[test]
196 fn test_column_protobuf_conversion() {
197 let cardinality = 2048;
198 let mut builder = I32ArrayBuilder::new(cardinality);
199 for i in 0..cardinality {
200 if i % 2 == 0 {
201 builder.append(Some(i as i32));
202 } else {
203 builder.append(None);
204 }
205 }
206 let col: ArrayImpl = builder.finish().into();
207 let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
208 assert_eq!(new_col.len(), cardinality);
209 let arr: &I32Array = new_col.as_int32();
210 arr.iter().enumerate().for_each(|(i, x)| {
211 if i % 2 == 0 {
212 assert_eq!(i as i32, x.unwrap());
213 } else {
214 assert!(x.is_none());
215 }
216 });
217 }
218
219 #[test]
220 fn test_bool_column_protobuf_conversion() {
221 let cardinality = 2048;
222 let mut builder = BoolArrayBuilder::new(cardinality);
223 for i in 0..cardinality {
224 match i % 3 {
225 0 => builder.append(Some(false)),
226 1 => builder.append(Some(true)),
227 _ => builder.append(None),
228 }
229 }
230 let col: ArrayImpl = builder.finish().into();
231 let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
232 assert_eq!(new_col.len(), cardinality);
233 let arr: &BoolArray = new_col.as_bool();
234 arr.iter().enumerate().for_each(|(i, x)| match i % 3 {
235 0 => assert_eq!(Some(false), x),
236 1 => assert_eq!(Some(true), x),
237 _ => assert_eq!(None, x),
238 });
239 }
240
241 #[test]
242 fn test_utf8_column_conversion() {
243 let cardinality = 2048;
244 let mut builder = Utf8ArrayBuilder::new(cardinality);
245 for i in 0..cardinality {
246 if i % 2 == 0 {
247 builder.append(Some("abc"));
248 } else {
249 builder.append(None);
250 }
251 }
252 let col: ArrayImpl = builder.finish().into();
253 let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
254 let arr: &Utf8Array = new_col.as_utf8();
255 arr.iter().enumerate().for_each(|(i, x)| {
256 if i % 2 == 0 {
257 assert_eq!("abc", x.unwrap());
258 } else {
259 assert!(x.is_none());
260 }
261 });
262 }
263
264 #[test]
265 fn test_decimal_protobuf_conversion() {
266 let cardinality = 2048;
267 let mut builder = DecimalArrayBuilder::new(cardinality);
268 for i in 0..cardinality {
269 if i % 2 == 0 {
270 builder.append(Some(Decimal::from(i)));
271 } else {
272 builder.append(None);
273 }
274 }
275 let col: ArrayImpl = builder.finish().into();
276 let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
277 assert_eq!(new_col.len(), cardinality);
278 let arr: &DecimalArray = new_col.as_decimal();
279 arr.iter().enumerate().for_each(|(i, x)| {
280 if i % 2 == 0 {
281 assert_eq!(Decimal::from(i), x.unwrap());
282 } else {
283 assert!(x.is_none());
284 }
285 });
286 }
287
288 #[test]
289 fn test_date_protobuf_conversion() {
290 let cardinality = 2048;
291 let mut builder = DateArrayBuilder::new(cardinality);
292 for i in 0..cardinality {
293 if i % 2 == 0 {
294 builder.append(Date::with_days_since_ce(i as i32).ok());
295 } else {
296 builder.append(None);
297 }
298 }
299 let col: ArrayImpl = builder.finish().into();
300 let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
301 assert_eq!(new_col.len(), cardinality);
302 let arr: &DateArray = new_col.as_date();
303 arr.iter().enumerate().for_each(|(i, x)| {
304 if i % 2 == 0 {
305 assert_eq!(Date::with_days_since_ce(i as i32).ok().unwrap(), x.unwrap());
306 } else {
307 assert!(x.is_none());
308 }
309 });
310 }
311
312 #[test]
313 fn test_time_protobuf_conversion() {
314 let cardinality = 2048;
315 let mut builder = TimeArrayBuilder::new(cardinality);
316 for i in 0..cardinality {
317 if i % 2 == 0 {
318 builder.append(Time::with_secs_nano(i as u32, i as u32 * 1000).ok());
319 } else {
320 builder.append(None);
321 }
322 }
323 let col: ArrayImpl = builder.finish().into();
324 let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
325 assert_eq!(new_col.len(), cardinality);
326 let arr: &TimeArray = new_col.as_time();
327 arr.iter().enumerate().for_each(|(i, x)| {
328 if i % 2 == 0 {
329 assert_eq!(
330 Time::with_secs_nano(i as u32, i as u32 * 1000)
331 .ok()
332 .unwrap(),
333 x.unwrap()
334 );
335 } else {
336 assert!(x.is_none());
337 }
338 });
339 }
340
341 #[test]
342 fn test_timestamp_protobuf_conversion() {
343 let cardinality = 2048;
344 let mut builder = TimestampArrayBuilder::new(cardinality);
345 for i in 0..cardinality {
346 if i % 2 == 0 {
347 builder.append(Timestamp::with_secs_nsecs(i as i64, i as u32 * 1000).ok());
348 } else {
349 builder.append(None);
350 }
351 }
352 let col: ArrayImpl = builder.finish().into();
353 let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
354 assert_eq!(new_col.len(), cardinality);
355 let arr: &TimestampArray = new_col.as_timestamp();
356 arr.iter().enumerate().for_each(|(i, x)| {
357 if i % 2 == 0 {
358 assert_eq!(
359 Timestamp::with_secs_nsecs(i as i64, i as u32 * 1000)
360 .ok()
361 .unwrap(),
362 x.unwrap()
363 );
364 } else {
365 assert!(x.is_none());
366 }
367 });
368 }
369}