risingwave_common/array/
proto_reader.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::{Cursor, Read};
16
17use anyhow::Context;
18use byteorder::{BigEndian, ReadBytesExt};
19use risingwave_pb::data::PbArrayType;
20
21use super::*;
22
23impl ArrayImpl {
24    pub fn from_protobuf(array: &PbArray, cardinality: usize) -> ArrayResult<Self> {
25        let array = match array.array_type() {
26            PbArrayType::Unspecified => unreachable!(),
27            PbArrayType::Int16 => read_primitive_array::<i16>(array, cardinality)?,
28            PbArrayType::Int32 => read_primitive_array::<i32>(array, cardinality)?,
29            PbArrayType::Int64 => read_primitive_array::<i64>(array, cardinality)?,
30            PbArrayType::Serial => read_primitive_array::<Serial>(array, cardinality)?,
31            PbArrayType::Float32 => read_primitive_array::<F32>(array, cardinality)?,
32            PbArrayType::Float64 => read_primitive_array::<F64>(array, cardinality)?,
33            PbArrayType::Bool => read_bool_array(array, cardinality)?,
34            PbArrayType::Utf8 => read_string_array::<Utf8ValueReader>(array, cardinality)?,
35            PbArrayType::Decimal => read_primitive_array::<Decimal>(array, cardinality)?,
36            PbArrayType::Date => read_primitive_array::<Date>(array, cardinality)?,
37            PbArrayType::Time => read_primitive_array::<Time>(array, cardinality)?,
38            PbArrayType::Timestamp => read_primitive_array::<Timestamp>(array, cardinality)?,
39            PbArrayType::Timestamptz => read_primitive_array::<Timestamptz>(array, cardinality)?,
40            PbArrayType::Interval => read_primitive_array::<Interval>(array, cardinality)?,
41            PbArrayType::Jsonb => JsonbArray::from_protobuf(array)?,
42            PbArrayType::Struct => StructArray::from_protobuf(array)?,
43            PbArrayType::List => ListArray::from_protobuf(array)?,
44            PbArrayType::Bytea => read_string_array::<BytesValueReader>(array, cardinality)?,
45            PbArrayType::Int256 => Int256Array::from_protobuf(array, cardinality)?,
46            PbArrayType::Map => MapArray::from_protobuf(array)?,
47            PbArrayType::Vector => todo!("VECTOR_PLACEHOLDER"),
48        };
49        Ok(array)
50    }
51}
52
53// TODO: Use techniques like apache arrow flight RPC to eliminate deserialization.
54// https://arrow.apache.org/docs/format/Flight.html
55
56fn read_primitive_array<T: PrimitiveArrayItemType>(
57    array: &PbArray,
58    cardinality: usize,
59) -> ArrayResult<ArrayImpl> {
60    ensure!(
61        array.get_values().len() == 1,
62        "Must have only 1 buffer in a numeric array"
63    );
64
65    let buf = array.get_values()[0].get_body().as_slice();
66
67    let mut builder = PrimitiveArrayBuilder::<T>::new(cardinality);
68    let bitmap: Bitmap = array.get_null_bitmap()?.into();
69    let mut cursor = Cursor::new(buf);
70    for not_null in bitmap.iter() {
71        if not_null {
72            let v = T::from_protobuf(&mut cursor)?;
73            builder.append(Some(v));
74        } else {
75            builder.append(None);
76        }
77    }
78    let arr = builder.finish();
79    ensure_eq!(arr.len(), cardinality);
80
81    Ok(arr.into())
82}
83
84fn read_bool_array(array: &PbArray, cardinality: usize) -> ArrayResult<ArrayImpl> {
85    ensure!(
86        array.get_values().len() == 1,
87        "Must have only 1 buffer in a bool array"
88    );
89
90    let data = (&array.get_values()[0]).into();
91    let bitmap: Bitmap = array.get_null_bitmap()?.into();
92
93    let arr = BoolArray::new(data, bitmap);
94    ensure_eq!(arr.len(), cardinality);
95
96    Ok(arr.into())
97}
98
99fn read_offset(offset_cursor: &mut Cursor<&[u8]>) -> ArrayResult<i64> {
100    let offset = offset_cursor
101        .read_i64::<BigEndian>()
102        .context("failed to read i64 from offset buffer")?;
103
104    Ok(offset)
105}
106
107trait VarSizedValueReader {
108    type AB: ArrayBuilder;
109    fn new_builder(capacity: usize) -> Self::AB;
110    fn read(buf: &[u8], builder: &mut Self::AB) -> ArrayResult<()>;
111}
112
113struct Utf8ValueReader;
114
115impl VarSizedValueReader for Utf8ValueReader {
116    type AB = Utf8ArrayBuilder;
117
118    fn new_builder(capacity: usize) -> Self::AB {
119        Utf8ArrayBuilder::new(capacity)
120    }
121
122    fn read(buf: &[u8], builder: &mut Utf8ArrayBuilder) -> ArrayResult<()> {
123        let s = std::str::from_utf8(buf).context("failed to read utf8 string from bytes")?;
124        builder.append(Some(s));
125        Ok(())
126    }
127}
128
129struct BytesValueReader;
130
131impl VarSizedValueReader for BytesValueReader {
132    type AB = BytesArrayBuilder;
133
134    fn new_builder(capacity: usize) -> Self::AB {
135        BytesArrayBuilder::new(capacity)
136    }
137
138    fn read(buf: &[u8], builder: &mut BytesArrayBuilder) -> ArrayResult<()> {
139        builder.append(Some(buf));
140        Ok(())
141    }
142}
143
144fn read_string_array<R: VarSizedValueReader>(
145    array: &PbArray,
146    cardinality: usize,
147) -> ArrayResult<ArrayImpl> {
148    ensure!(
149        array.get_values().len() == 2,
150        "Must have exactly 2 buffers in a string array"
151    );
152    let offset_buff = array.get_values()[0].get_body().as_slice();
153    let data_buf = array.get_values()[1].get_body().as_slice();
154
155    let mut builder = R::new_builder(cardinality);
156    let bitmap: Bitmap = array.get_null_bitmap()?.into();
157    let mut offset_cursor = Cursor::new(offset_buff);
158    let mut data_cursor = Cursor::new(data_buf);
159    let mut prev_offset: i64 = -1;
160
161    let mut buf = Vec::new();
162    for not_null in bitmap.iter() {
163        if not_null {
164            if prev_offset < 0 {
165                prev_offset = read_offset(&mut offset_cursor)?;
166            }
167            let offset = read_offset(&mut offset_cursor)?;
168            let length = (offset - prev_offset) as usize;
169            prev_offset = offset;
170            buf.resize(length, Default::default());
171            data_cursor
172                .read_exact(buf.as_mut_slice())
173                .with_context(|| {
174                    format!(
175                        "failed to read str from data buffer [length={}, offset={}]",
176                        length, offset
177                    )
178                })?;
179            R::read(buf.as_slice(), &mut builder)?;
180        } else {
181            builder.append(None);
182        }
183    }
184    let arr = builder.finish();
185    ensure_eq!(arr.len(), cardinality);
186
187    Ok(arr.into())
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193
194    // Convert a column to protobuf, then convert it back to column, and ensures the two are
195    // identical.
196    #[test]
197    fn test_column_protobuf_conversion() {
198        let cardinality = 2048;
199        let mut builder = I32ArrayBuilder::new(cardinality);
200        for i in 0..cardinality {
201            if i % 2 == 0 {
202                builder.append(Some(i as i32));
203            } else {
204                builder.append(None);
205            }
206        }
207        let col: ArrayImpl = builder.finish().into();
208        let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
209        assert_eq!(new_col.len(), cardinality);
210        let arr: &I32Array = new_col.as_int32();
211        arr.iter().enumerate().for_each(|(i, x)| {
212            if i % 2 == 0 {
213                assert_eq!(i as i32, x.unwrap());
214            } else {
215                assert!(x.is_none());
216            }
217        });
218    }
219
220    #[test]
221    fn test_bool_column_protobuf_conversion() {
222        let cardinality = 2048;
223        let mut builder = BoolArrayBuilder::new(cardinality);
224        for i in 0..cardinality {
225            match i % 3 {
226                0 => builder.append(Some(false)),
227                1 => builder.append(Some(true)),
228                _ => builder.append(None),
229            }
230        }
231        let col: ArrayImpl = builder.finish().into();
232        let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
233        assert_eq!(new_col.len(), cardinality);
234        let arr: &BoolArray = new_col.as_bool();
235        arr.iter().enumerate().for_each(|(i, x)| match i % 3 {
236            0 => assert_eq!(Some(false), x),
237            1 => assert_eq!(Some(true), x),
238            _ => assert_eq!(None, x),
239        });
240    }
241
242    #[test]
243    fn test_utf8_column_conversion() {
244        let cardinality = 2048;
245        let mut builder = Utf8ArrayBuilder::new(cardinality);
246        for i in 0..cardinality {
247            if i % 2 == 0 {
248                builder.append(Some("abc"));
249            } else {
250                builder.append(None);
251            }
252        }
253        let col: ArrayImpl = builder.finish().into();
254        let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
255        let arr: &Utf8Array = new_col.as_utf8();
256        arr.iter().enumerate().for_each(|(i, x)| {
257            if i % 2 == 0 {
258                assert_eq!("abc", x.unwrap());
259            } else {
260                assert!(x.is_none());
261            }
262        });
263    }
264
265    #[test]
266    fn test_decimal_protobuf_conversion() {
267        let cardinality = 2048;
268        let mut builder = DecimalArrayBuilder::new(cardinality);
269        for i in 0..cardinality {
270            if i % 2 == 0 {
271                builder.append(Some(Decimal::from(i)));
272            } else {
273                builder.append(None);
274            }
275        }
276        let col: ArrayImpl = builder.finish().into();
277        let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
278        assert_eq!(new_col.len(), cardinality);
279        let arr: &DecimalArray = new_col.as_decimal();
280        arr.iter().enumerate().for_each(|(i, x)| {
281            if i % 2 == 0 {
282                assert_eq!(Decimal::from(i), x.unwrap());
283            } else {
284                assert!(x.is_none());
285            }
286        });
287    }
288
289    #[test]
290    fn test_date_protobuf_conversion() {
291        let cardinality = 2048;
292        let mut builder = DateArrayBuilder::new(cardinality);
293        for i in 0..cardinality {
294            if i % 2 == 0 {
295                builder.append(Date::with_days_since_ce(i as i32).ok());
296            } else {
297                builder.append(None);
298            }
299        }
300        let col: ArrayImpl = builder.finish().into();
301        let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
302        assert_eq!(new_col.len(), cardinality);
303        let arr: &DateArray = new_col.as_date();
304        arr.iter().enumerate().for_each(|(i, x)| {
305            if i % 2 == 0 {
306                assert_eq!(Date::with_days_since_ce(i as i32).ok().unwrap(), x.unwrap());
307            } else {
308                assert!(x.is_none());
309            }
310        });
311    }
312
313    #[test]
314    fn test_time_protobuf_conversion() {
315        let cardinality = 2048;
316        let mut builder = TimeArrayBuilder::new(cardinality);
317        for i in 0..cardinality {
318            if i % 2 == 0 {
319                builder.append(Time::with_secs_nano(i as u32, i as u32 * 1000).ok());
320            } else {
321                builder.append(None);
322            }
323        }
324        let col: ArrayImpl = builder.finish().into();
325        let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
326        assert_eq!(new_col.len(), cardinality);
327        let arr: &TimeArray = new_col.as_time();
328        arr.iter().enumerate().for_each(|(i, x)| {
329            if i % 2 == 0 {
330                assert_eq!(
331                    Time::with_secs_nano(i as u32, i as u32 * 1000)
332                        .ok()
333                        .unwrap(),
334                    x.unwrap()
335                );
336            } else {
337                assert!(x.is_none());
338            }
339        });
340    }
341
342    #[test]
343    fn test_timestamp_protobuf_conversion() {
344        let cardinality = 2048;
345        let mut builder = TimestampArrayBuilder::new(cardinality);
346        for i in 0..cardinality {
347            if i % 2 == 0 {
348                builder.append(Timestamp::with_secs_nsecs(i as i64, i as u32 * 1000).ok());
349            } else {
350                builder.append(None);
351            }
352        }
353        let col: ArrayImpl = builder.finish().into();
354        let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
355        assert_eq!(new_col.len(), cardinality);
356        let arr: &TimestampArray = new_col.as_timestamp();
357        arr.iter().enumerate().for_each(|(i, x)| {
358            if i % 2 == 0 {
359                assert_eq!(
360                    Timestamp::with_secs_nsecs(i as i64, i as u32 * 1000)
361                        .ok()
362                        .unwrap(),
363                    x.unwrap()
364                );
365            } else {
366                assert!(x.is_none());
367            }
368        });
369    }
370}