1use std::io::{Cursor, Read};
16
17use anyhow::Context;
18use byteorder::{BigEndian, ReadBytesExt};
19use risingwave_pb::data::PbArrayType;
20
21use super::*;
22
23impl ArrayImpl {
24 pub fn from_protobuf(array: &PbArray, cardinality: usize) -> ArrayResult<Self> {
25 let array = match array.array_type() {
26 PbArrayType::Unspecified => unreachable!(),
27 PbArrayType::Int16 => read_primitive_array::<i16>(array, cardinality)?,
28 PbArrayType::Int32 => read_primitive_array::<i32>(array, cardinality)?,
29 PbArrayType::Int64 => read_primitive_array::<i64>(array, cardinality)?,
30 PbArrayType::Serial => read_primitive_array::<Serial>(array, cardinality)?,
31 PbArrayType::Float32 => read_primitive_array::<F32>(array, cardinality)?,
32 PbArrayType::Float64 => read_primitive_array::<F64>(array, cardinality)?,
33 PbArrayType::Bool => read_bool_array(array, cardinality)?,
34 PbArrayType::Utf8 => read_string_array::<Utf8ValueReader>(array, cardinality)?,
35 PbArrayType::Decimal => read_primitive_array::<Decimal>(array, cardinality)?,
36 PbArrayType::Date => read_primitive_array::<Date>(array, cardinality)?,
37 PbArrayType::Time => read_primitive_array::<Time>(array, cardinality)?,
38 PbArrayType::Timestamp => read_primitive_array::<Timestamp>(array, cardinality)?,
39 PbArrayType::Timestamptz => read_primitive_array::<Timestamptz>(array, cardinality)?,
40 PbArrayType::Interval => read_primitive_array::<Interval>(array, cardinality)?,
41 PbArrayType::Jsonb => JsonbArray::from_protobuf(array)?,
42 PbArrayType::Struct => StructArray::from_protobuf(array)?,
43 PbArrayType::List => ListArray::from_protobuf(array)?,
44 PbArrayType::Bytea => read_string_array::<BytesValueReader>(array, cardinality)?,
45 PbArrayType::Int256 => Int256Array::from_protobuf(array, cardinality)?,
46 PbArrayType::Map => MapArray::from_protobuf(array)?,
47 PbArrayType::Vector => todo!("VECTOR_PLACEHOLDER"),
48 };
49 Ok(array)
50 }
51}
52
53fn read_primitive_array<T: PrimitiveArrayItemType>(
57 array: &PbArray,
58 cardinality: usize,
59) -> ArrayResult<ArrayImpl> {
60 ensure!(
61 array.get_values().len() == 1,
62 "Must have only 1 buffer in a numeric array"
63 );
64
65 let buf = array.get_values()[0].get_body().as_slice();
66
67 let mut builder = PrimitiveArrayBuilder::<T>::new(cardinality);
68 let bitmap: Bitmap = array.get_null_bitmap()?.into();
69 let mut cursor = Cursor::new(buf);
70 for not_null in bitmap.iter() {
71 if not_null {
72 let v = T::from_protobuf(&mut cursor)?;
73 builder.append(Some(v));
74 } else {
75 builder.append(None);
76 }
77 }
78 let arr = builder.finish();
79 ensure_eq!(arr.len(), cardinality);
80
81 Ok(arr.into())
82}
83
84fn read_bool_array(array: &PbArray, cardinality: usize) -> ArrayResult<ArrayImpl> {
85 ensure!(
86 array.get_values().len() == 1,
87 "Must have only 1 buffer in a bool array"
88 );
89
90 let data = (&array.get_values()[0]).into();
91 let bitmap: Bitmap = array.get_null_bitmap()?.into();
92
93 let arr = BoolArray::new(data, bitmap);
94 ensure_eq!(arr.len(), cardinality);
95
96 Ok(arr.into())
97}
98
99fn read_offset(offset_cursor: &mut Cursor<&[u8]>) -> ArrayResult<i64> {
100 let offset = offset_cursor
101 .read_i64::<BigEndian>()
102 .context("failed to read i64 from offset buffer")?;
103
104 Ok(offset)
105}
106
107trait VarSizedValueReader {
108 type AB: ArrayBuilder;
109 fn new_builder(capacity: usize) -> Self::AB;
110 fn read(buf: &[u8], builder: &mut Self::AB) -> ArrayResult<()>;
111}
112
113struct Utf8ValueReader;
114
115impl VarSizedValueReader for Utf8ValueReader {
116 type AB = Utf8ArrayBuilder;
117
118 fn new_builder(capacity: usize) -> Self::AB {
119 Utf8ArrayBuilder::new(capacity)
120 }
121
122 fn read(buf: &[u8], builder: &mut Utf8ArrayBuilder) -> ArrayResult<()> {
123 let s = std::str::from_utf8(buf).context("failed to read utf8 string from bytes")?;
124 builder.append(Some(s));
125 Ok(())
126 }
127}
128
129struct BytesValueReader;
130
131impl VarSizedValueReader for BytesValueReader {
132 type AB = BytesArrayBuilder;
133
134 fn new_builder(capacity: usize) -> Self::AB {
135 BytesArrayBuilder::new(capacity)
136 }
137
138 fn read(buf: &[u8], builder: &mut BytesArrayBuilder) -> ArrayResult<()> {
139 builder.append(Some(buf));
140 Ok(())
141 }
142}
143
144fn read_string_array<R: VarSizedValueReader>(
145 array: &PbArray,
146 cardinality: usize,
147) -> ArrayResult<ArrayImpl> {
148 ensure!(
149 array.get_values().len() == 2,
150 "Must have exactly 2 buffers in a string array"
151 );
152 let offset_buff = array.get_values()[0].get_body().as_slice();
153 let data_buf = array.get_values()[1].get_body().as_slice();
154
155 let mut builder = R::new_builder(cardinality);
156 let bitmap: Bitmap = array.get_null_bitmap()?.into();
157 let mut offset_cursor = Cursor::new(offset_buff);
158 let mut data_cursor = Cursor::new(data_buf);
159 let mut prev_offset: i64 = -1;
160
161 let mut buf = Vec::new();
162 for not_null in bitmap.iter() {
163 if not_null {
164 if prev_offset < 0 {
165 prev_offset = read_offset(&mut offset_cursor)?;
166 }
167 let offset = read_offset(&mut offset_cursor)?;
168 let length = (offset - prev_offset) as usize;
169 prev_offset = offset;
170 buf.resize(length, Default::default());
171 data_cursor
172 .read_exact(buf.as_mut_slice())
173 .with_context(|| {
174 format!(
175 "failed to read str from data buffer [length={}, offset={}]",
176 length, offset
177 )
178 })?;
179 R::read(buf.as_slice(), &mut builder)?;
180 } else {
181 builder.append(None);
182 }
183 }
184 let arr = builder.finish();
185 ensure_eq!(arr.len(), cardinality);
186
187 Ok(arr.into())
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
197 fn test_column_protobuf_conversion() {
198 let cardinality = 2048;
199 let mut builder = I32ArrayBuilder::new(cardinality);
200 for i in 0..cardinality {
201 if i % 2 == 0 {
202 builder.append(Some(i as i32));
203 } else {
204 builder.append(None);
205 }
206 }
207 let col: ArrayImpl = builder.finish().into();
208 let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
209 assert_eq!(new_col.len(), cardinality);
210 let arr: &I32Array = new_col.as_int32();
211 arr.iter().enumerate().for_each(|(i, x)| {
212 if i % 2 == 0 {
213 assert_eq!(i as i32, x.unwrap());
214 } else {
215 assert!(x.is_none());
216 }
217 });
218 }
219
220 #[test]
221 fn test_bool_column_protobuf_conversion() {
222 let cardinality = 2048;
223 let mut builder = BoolArrayBuilder::new(cardinality);
224 for i in 0..cardinality {
225 match i % 3 {
226 0 => builder.append(Some(false)),
227 1 => builder.append(Some(true)),
228 _ => builder.append(None),
229 }
230 }
231 let col: ArrayImpl = builder.finish().into();
232 let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
233 assert_eq!(new_col.len(), cardinality);
234 let arr: &BoolArray = new_col.as_bool();
235 arr.iter().enumerate().for_each(|(i, x)| match i % 3 {
236 0 => assert_eq!(Some(false), x),
237 1 => assert_eq!(Some(true), x),
238 _ => assert_eq!(None, x),
239 });
240 }
241
242 #[test]
243 fn test_utf8_column_conversion() {
244 let cardinality = 2048;
245 let mut builder = Utf8ArrayBuilder::new(cardinality);
246 for i in 0..cardinality {
247 if i % 2 == 0 {
248 builder.append(Some("abc"));
249 } else {
250 builder.append(None);
251 }
252 }
253 let col: ArrayImpl = builder.finish().into();
254 let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
255 let arr: &Utf8Array = new_col.as_utf8();
256 arr.iter().enumerate().for_each(|(i, x)| {
257 if i % 2 == 0 {
258 assert_eq!("abc", x.unwrap());
259 } else {
260 assert!(x.is_none());
261 }
262 });
263 }
264
265 #[test]
266 fn test_decimal_protobuf_conversion() {
267 let cardinality = 2048;
268 let mut builder = DecimalArrayBuilder::new(cardinality);
269 for i in 0..cardinality {
270 if i % 2 == 0 {
271 builder.append(Some(Decimal::from(i)));
272 } else {
273 builder.append(None);
274 }
275 }
276 let col: ArrayImpl = builder.finish().into();
277 let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
278 assert_eq!(new_col.len(), cardinality);
279 let arr: &DecimalArray = new_col.as_decimal();
280 arr.iter().enumerate().for_each(|(i, x)| {
281 if i % 2 == 0 {
282 assert_eq!(Decimal::from(i), x.unwrap());
283 } else {
284 assert!(x.is_none());
285 }
286 });
287 }
288
289 #[test]
290 fn test_date_protobuf_conversion() {
291 let cardinality = 2048;
292 let mut builder = DateArrayBuilder::new(cardinality);
293 for i in 0..cardinality {
294 if i % 2 == 0 {
295 builder.append(Date::with_days_since_ce(i as i32).ok());
296 } else {
297 builder.append(None);
298 }
299 }
300 let col: ArrayImpl = builder.finish().into();
301 let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
302 assert_eq!(new_col.len(), cardinality);
303 let arr: &DateArray = new_col.as_date();
304 arr.iter().enumerate().for_each(|(i, x)| {
305 if i % 2 == 0 {
306 assert_eq!(Date::with_days_since_ce(i as i32).ok().unwrap(), x.unwrap());
307 } else {
308 assert!(x.is_none());
309 }
310 });
311 }
312
313 #[test]
314 fn test_time_protobuf_conversion() {
315 let cardinality = 2048;
316 let mut builder = TimeArrayBuilder::new(cardinality);
317 for i in 0..cardinality {
318 if i % 2 == 0 {
319 builder.append(Time::with_secs_nano(i as u32, i as u32 * 1000).ok());
320 } else {
321 builder.append(None);
322 }
323 }
324 let col: ArrayImpl = builder.finish().into();
325 let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
326 assert_eq!(new_col.len(), cardinality);
327 let arr: &TimeArray = new_col.as_time();
328 arr.iter().enumerate().for_each(|(i, x)| {
329 if i % 2 == 0 {
330 assert_eq!(
331 Time::with_secs_nano(i as u32, i as u32 * 1000)
332 .ok()
333 .unwrap(),
334 x.unwrap()
335 );
336 } else {
337 assert!(x.is_none());
338 }
339 });
340 }
341
342 #[test]
343 fn test_timestamp_protobuf_conversion() {
344 let cardinality = 2048;
345 let mut builder = TimestampArrayBuilder::new(cardinality);
346 for i in 0..cardinality {
347 if i % 2 == 0 {
348 builder.append(Timestamp::with_secs_nsecs(i as i64, i as u32 * 1000).ok());
349 } else {
350 builder.append(None);
351 }
352 }
353 let col: ArrayImpl = builder.finish().into();
354 let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
355 assert_eq!(new_col.len(), cardinality);
356 let arr: &TimestampArray = new_col.as_timestamp();
357 arr.iter().enumerate().for_each(|(i, x)| {
358 if i % 2 == 0 {
359 assert_eq!(
360 Timestamp::with_secs_nsecs(i as i64, i as u32 * 1000)
361 .ok()
362 .unwrap(),
363 x.unwrap()
364 );
365 } else {
366 assert!(x.is_none());
367 }
368 });
369 }
370}