risingwave_common/array/
proto_reader.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::{Cursor, Read};
16
17use anyhow::Context;
18use byteorder::{BigEndian, ReadBytesExt};
19use risingwave_pb::data::PbArrayType;
20
21use super::*;
22
23impl ArrayImpl {
24    pub fn from_protobuf(array: &PbArray, cardinality: usize) -> ArrayResult<Self> {
25        let array = match array.array_type() {
26            PbArrayType::Unspecified => unreachable!(),
27            PbArrayType::Int16 => read_primitive_array::<i16>(array, cardinality)?,
28            PbArrayType::Int32 => read_primitive_array::<i32>(array, cardinality)?,
29            PbArrayType::Int64 => read_primitive_array::<i64>(array, cardinality)?,
30            PbArrayType::Serial => read_primitive_array::<Serial>(array, cardinality)?,
31            PbArrayType::Float32 => read_primitive_array::<F32>(array, cardinality)?,
32            PbArrayType::Float64 => read_primitive_array::<F64>(array, cardinality)?,
33            PbArrayType::Bool => read_bool_array(array, cardinality)?,
34            PbArrayType::Utf8 => read_string_array::<Utf8ValueReader>(array, cardinality)?,
35            PbArrayType::Decimal => read_primitive_array::<Decimal>(array, cardinality)?,
36            PbArrayType::Date => read_primitive_array::<Date>(array, cardinality)?,
37            PbArrayType::Time => read_primitive_array::<Time>(array, cardinality)?,
38            PbArrayType::Timestamp => read_primitive_array::<Timestamp>(array, cardinality)?,
39            PbArrayType::Timestamptz => read_primitive_array::<Timestamptz>(array, cardinality)?,
40            PbArrayType::Interval => read_primitive_array::<Interval>(array, cardinality)?,
41            PbArrayType::Jsonb => JsonbArray::from_protobuf(array)?,
42            PbArrayType::Struct => StructArray::from_protobuf(array)?,
43            PbArrayType::List => ListArray::from_protobuf(array)?,
44            PbArrayType::Bytea => read_string_array::<BytesValueReader>(array, cardinality)?,
45            PbArrayType::Int256 => Int256Array::from_protobuf(array, cardinality)?,
46            PbArrayType::Map => MapArray::from_protobuf(array)?,
47        };
48        Ok(array)
49    }
50}
51
52// TODO: Use techniques like apache arrow flight RPC to eliminate deserialization.
53// https://arrow.apache.org/docs/format/Flight.html
54
55fn read_primitive_array<T: PrimitiveArrayItemType>(
56    array: &PbArray,
57    cardinality: usize,
58) -> ArrayResult<ArrayImpl> {
59    ensure!(
60        array.get_values().len() == 1,
61        "Must have only 1 buffer in a numeric array"
62    );
63
64    let buf = array.get_values()[0].get_body().as_slice();
65
66    let mut builder = PrimitiveArrayBuilder::<T>::new(cardinality);
67    let bitmap: Bitmap = array.get_null_bitmap()?.into();
68    let mut cursor = Cursor::new(buf);
69    for not_null in bitmap.iter() {
70        if not_null {
71            let v = T::from_protobuf(&mut cursor)?;
72            builder.append(Some(v));
73        } else {
74            builder.append(None);
75        }
76    }
77    let arr = builder.finish();
78    ensure_eq!(arr.len(), cardinality);
79
80    Ok(arr.into())
81}
82
83fn read_bool_array(array: &PbArray, cardinality: usize) -> ArrayResult<ArrayImpl> {
84    ensure!(
85        array.get_values().len() == 1,
86        "Must have only 1 buffer in a bool array"
87    );
88
89    let data = (&array.get_values()[0]).into();
90    let bitmap: Bitmap = array.get_null_bitmap()?.into();
91
92    let arr = BoolArray::new(data, bitmap);
93    ensure_eq!(arr.len(), cardinality);
94
95    Ok(arr.into())
96}
97
98fn read_offset(offset_cursor: &mut Cursor<&[u8]>) -> ArrayResult<i64> {
99    let offset = offset_cursor
100        .read_i64::<BigEndian>()
101        .context("failed to read i64 from offset buffer")?;
102
103    Ok(offset)
104}
105
106trait VarSizedValueReader {
107    type AB: ArrayBuilder;
108    fn new_builder(capacity: usize) -> Self::AB;
109    fn read(buf: &[u8], builder: &mut Self::AB) -> ArrayResult<()>;
110}
111
112struct Utf8ValueReader;
113
114impl VarSizedValueReader for Utf8ValueReader {
115    type AB = Utf8ArrayBuilder;
116
117    fn new_builder(capacity: usize) -> Self::AB {
118        Utf8ArrayBuilder::new(capacity)
119    }
120
121    fn read(buf: &[u8], builder: &mut Utf8ArrayBuilder) -> ArrayResult<()> {
122        let s = std::str::from_utf8(buf).context("failed to read utf8 string from bytes")?;
123        builder.append(Some(s));
124        Ok(())
125    }
126}
127
128struct BytesValueReader;
129
130impl VarSizedValueReader for BytesValueReader {
131    type AB = BytesArrayBuilder;
132
133    fn new_builder(capacity: usize) -> Self::AB {
134        BytesArrayBuilder::new(capacity)
135    }
136
137    fn read(buf: &[u8], builder: &mut BytesArrayBuilder) -> ArrayResult<()> {
138        builder.append(Some(buf));
139        Ok(())
140    }
141}
142
143fn read_string_array<R: VarSizedValueReader>(
144    array: &PbArray,
145    cardinality: usize,
146) -> ArrayResult<ArrayImpl> {
147    ensure!(
148        array.get_values().len() == 2,
149        "Must have exactly 2 buffers in a string array"
150    );
151    let offset_buff = array.get_values()[0].get_body().as_slice();
152    let data_buf = array.get_values()[1].get_body().as_slice();
153
154    let mut builder = R::new_builder(cardinality);
155    let bitmap: Bitmap = array.get_null_bitmap()?.into();
156    let mut offset_cursor = Cursor::new(offset_buff);
157    let mut data_cursor = Cursor::new(data_buf);
158    let mut prev_offset: i64 = -1;
159
160    let mut buf = Vec::new();
161    for not_null in bitmap.iter() {
162        if not_null {
163            if prev_offset < 0 {
164                prev_offset = read_offset(&mut offset_cursor)?;
165            }
166            let offset = read_offset(&mut offset_cursor)?;
167            let length = (offset - prev_offset) as usize;
168            prev_offset = offset;
169            buf.resize(length, Default::default());
170            data_cursor
171                .read_exact(buf.as_mut_slice())
172                .with_context(|| {
173                    format!(
174                        "failed to read str from data buffer [length={}, offset={}]",
175                        length, offset
176                    )
177                })?;
178            R::read(buf.as_slice(), &mut builder)?;
179        } else {
180            builder.append(None);
181        }
182    }
183    let arr = builder.finish();
184    ensure_eq!(arr.len(), cardinality);
185
186    Ok(arr.into())
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    // Convert a column to protobuf, then convert it back to column, and ensures the two are
194    // identical.
195    #[test]
196    fn test_column_protobuf_conversion() {
197        let cardinality = 2048;
198        let mut builder = I32ArrayBuilder::new(cardinality);
199        for i in 0..cardinality {
200            if i % 2 == 0 {
201                builder.append(Some(i as i32));
202            } else {
203                builder.append(None);
204            }
205        }
206        let col: ArrayImpl = builder.finish().into();
207        let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
208        assert_eq!(new_col.len(), cardinality);
209        let arr: &I32Array = new_col.as_int32();
210        arr.iter().enumerate().for_each(|(i, x)| {
211            if i % 2 == 0 {
212                assert_eq!(i as i32, x.unwrap());
213            } else {
214                assert!(x.is_none());
215            }
216        });
217    }
218
219    #[test]
220    fn test_bool_column_protobuf_conversion() {
221        let cardinality = 2048;
222        let mut builder = BoolArrayBuilder::new(cardinality);
223        for i in 0..cardinality {
224            match i % 3 {
225                0 => builder.append(Some(false)),
226                1 => builder.append(Some(true)),
227                _ => builder.append(None),
228            }
229        }
230        let col: ArrayImpl = builder.finish().into();
231        let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
232        assert_eq!(new_col.len(), cardinality);
233        let arr: &BoolArray = new_col.as_bool();
234        arr.iter().enumerate().for_each(|(i, x)| match i % 3 {
235            0 => assert_eq!(Some(false), x),
236            1 => assert_eq!(Some(true), x),
237            _ => assert_eq!(None, x),
238        });
239    }
240
241    #[test]
242    fn test_utf8_column_conversion() {
243        let cardinality = 2048;
244        let mut builder = Utf8ArrayBuilder::new(cardinality);
245        for i in 0..cardinality {
246            if i % 2 == 0 {
247                builder.append(Some("abc"));
248            } else {
249                builder.append(None);
250            }
251        }
252        let col: ArrayImpl = builder.finish().into();
253        let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
254        let arr: &Utf8Array = new_col.as_utf8();
255        arr.iter().enumerate().for_each(|(i, x)| {
256            if i % 2 == 0 {
257                assert_eq!("abc", x.unwrap());
258            } else {
259                assert!(x.is_none());
260            }
261        });
262    }
263
264    #[test]
265    fn test_decimal_protobuf_conversion() {
266        let cardinality = 2048;
267        let mut builder = DecimalArrayBuilder::new(cardinality);
268        for i in 0..cardinality {
269            if i % 2 == 0 {
270                builder.append(Some(Decimal::from(i)));
271            } else {
272                builder.append(None);
273            }
274        }
275        let col: ArrayImpl = builder.finish().into();
276        let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
277        assert_eq!(new_col.len(), cardinality);
278        let arr: &DecimalArray = new_col.as_decimal();
279        arr.iter().enumerate().for_each(|(i, x)| {
280            if i % 2 == 0 {
281                assert_eq!(Decimal::from(i), x.unwrap());
282            } else {
283                assert!(x.is_none());
284            }
285        });
286    }
287
288    #[test]
289    fn test_date_protobuf_conversion() {
290        let cardinality = 2048;
291        let mut builder = DateArrayBuilder::new(cardinality);
292        for i in 0..cardinality {
293            if i % 2 == 0 {
294                builder.append(Date::with_days_since_ce(i as i32).ok());
295            } else {
296                builder.append(None);
297            }
298        }
299        let col: ArrayImpl = builder.finish().into();
300        let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
301        assert_eq!(new_col.len(), cardinality);
302        let arr: &DateArray = new_col.as_date();
303        arr.iter().enumerate().for_each(|(i, x)| {
304            if i % 2 == 0 {
305                assert_eq!(Date::with_days_since_ce(i as i32).ok().unwrap(), x.unwrap());
306            } else {
307                assert!(x.is_none());
308            }
309        });
310    }
311
312    #[test]
313    fn test_time_protobuf_conversion() {
314        let cardinality = 2048;
315        let mut builder = TimeArrayBuilder::new(cardinality);
316        for i in 0..cardinality {
317            if i % 2 == 0 {
318                builder.append(Time::with_secs_nano(i as u32, i as u32 * 1000).ok());
319            } else {
320                builder.append(None);
321            }
322        }
323        let col: ArrayImpl = builder.finish().into();
324        let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
325        assert_eq!(new_col.len(), cardinality);
326        let arr: &TimeArray = new_col.as_time();
327        arr.iter().enumerate().for_each(|(i, x)| {
328            if i % 2 == 0 {
329                assert_eq!(
330                    Time::with_secs_nano(i as u32, i as u32 * 1000)
331                        .ok()
332                        .unwrap(),
333                    x.unwrap()
334                );
335            } else {
336                assert!(x.is_none());
337            }
338        });
339    }
340
341    #[test]
342    fn test_timestamp_protobuf_conversion() {
343        let cardinality = 2048;
344        let mut builder = TimestampArrayBuilder::new(cardinality);
345        for i in 0..cardinality {
346            if i % 2 == 0 {
347                builder.append(Timestamp::with_secs_nsecs(i as i64, i as u32 * 1000).ok());
348            } else {
349                builder.append(None);
350            }
351        }
352        let col: ArrayImpl = builder.finish().into();
353        let new_col = ArrayImpl::from_protobuf(&col.to_protobuf(), cardinality).unwrap();
354        assert_eq!(new_col.len(), cardinality);
355        let arr: &TimestampArray = new_col.as_timestamp();
356        arr.iter().enumerate().for_each(|(i, x)| {
357            if i % 2 == 0 {
358                assert_eq!(
359                    Timestamp::with_secs_nsecs(i as i64, i as u32 * 1000)
360                        .ok()
361                        .unwrap(),
362                    x.unwrap()
363                );
364            } else {
365                assert!(x.is_none());
366            }
367        });
368    }
369}