risingwave_common/util/value_encoding/
column_aware_row_encoding.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Column-aware row encoding is an encoding format which converts row into a binary form that
16//! remains explanable after schema changes
17//! Current design of flag just contains 1 meaningful information: the 2 LSBs represents
18//! the size of offsets: `u8`/`u16`/`u32`
19//! We have a `Serializer` and a `Deserializer` for each schema of `Row`, which can be reused
20//! until schema changes
21
22use std::collections::HashSet;
23use std::sync::Arc;
24
25use ahash::HashMap;
26use bitfield_struct::bitfield;
27use bytes::{Buf, BufMut};
28use rw_iter_util::ZipEqDebug;
29use smallvec::{SmallVec, smallvec};
30
31use super::error::ValueEncodingError;
32use super::{Result, ValueRowDeserializer, ValueRowSerializer};
33use crate::catalog::ColumnId;
34use crate::row::Row;
35use crate::types::{DataType, Datum, ScalarRefImpl, StructType, ToDatumRef};
36use crate::util::value_encoding as plain;
37
38/// Serialize and deserialize functions that recursively use [`ColumnAwareSerde`] for nested fields
39/// of composite types.
40///
41/// Only for column with types that can be altered ([`DataType::can_alter`]).
42mod new_serde {
43    use super::*;
44    use crate::array::{ListRef, ListValue, MapRef, MapValue, StructRef, StructValue};
45    use crate::types::{MapType, ScalarImpl, StructType, data_types};
46
47    // --- serialize ---
48
49    // Copy of `plain` but call `new_` for the scalar.
50    fn new_serialize_datum(
51        data_type: &DataType,
52        datum_ref: impl ToDatumRef,
53        buf: &mut impl BufMut,
54    ) {
55        if let Some(d) = datum_ref.to_datum_ref() {
56            buf.put_u8(1);
57            new_serialize_scalar(data_type, d, buf)
58        } else {
59            buf.put_u8(0);
60        }
61    }
62
63    // Different logic from `plain`:
64    //
65    // Recursively construct a new column-aware `Serializer` for nested fields.
66    fn new_serialize_struct(struct_type: &StructType, value: StructRef<'_>, buf: &mut impl BufMut) {
67        let serializer = super::Serializer::from_struct(struct_type.clone()); // cloning `StructType` is lightweight
68
69        // `StructRef` is interpreted as a `Row` here.
70        let bytes = serializer.serialize(value); // TODO: serialize into the buf directly if we can reserve space accurately
71        buf.put_u32_le(bytes.len() as _);
72        buf.put_slice(&bytes);
73    }
74
75    // Copy of `plain` but call `new_` for the elements.
76    fn new_serialize_list(inner_type: &DataType, value: ListRef<'_>, buf: &mut impl BufMut) {
77        let elems = value.iter();
78        buf.put_u32_le(elems.len() as u32);
79
80        elems.for_each(|field_value| {
81            new_serialize_datum(inner_type, field_value, buf);
82        });
83    }
84
85    // Different logic from `plain`:
86    //
87    // We don't reuse the serialization of `struct<k, v>[]` for map, as it introduces overhead
88    // for column ids of `struct<k, v>`. It's unnecessary because the shape of the map is fixed.
89    fn new_serialize_map(map_type: &MapType, value: MapRef<'_>, buf: &mut impl BufMut) {
90        let elems = value.iter();
91        buf.put_u32_le(elems.len() as u32);
92
93        elems.for_each(|(k, v)| {
94            new_serialize_scalar(map_type.key(), k, buf);
95            new_serialize_datum(map_type.value(), v, buf);
96        });
97    }
98
99    // Copy of `plain` but call `new_` for composite types.
100    pub fn new_serialize_scalar(
101        data_type: &DataType,
102        value: ScalarRefImpl<'_>,
103        buf: &mut impl BufMut,
104    ) {
105        match value {
106            ScalarRefImpl::Struct(s) => new_serialize_struct(data_type.as_struct(), s, buf),
107            ScalarRefImpl::List(l) => new_serialize_list(data_type.as_list_element_type(), l, buf),
108            ScalarRefImpl::Map(m) => new_serialize_map(data_type.as_map(), m, buf),
109
110            _ => plain::serialize_scalar(value, buf),
111        }
112    }
113
114    // --- deserialize ---
115
116    // Copy of `plain` but call `new_` for the scalar.
117    fn new_inner_deserialize_datum(data: &mut &[u8], ty: &DataType) -> Result<Datum> {
118        let null_tag = data.get_u8();
119        match null_tag {
120            0 => Ok(None),
121            1 => Some(new_deserialize_scalar(ty, data)).transpose(),
122            _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
123        }
124    }
125
126    // Different logic from `plain`:
127    //
128    // Recursively construct a new column-aware `Deserializer` for nested fields.
129    fn new_deserialize_struct(struct_def: &StructType, data: &mut &[u8]) -> Result<ScalarImpl> {
130        let deserializer = super::Deserializer::from_struct(struct_def.clone()); // cloning `StructType` is lightweight
131        let encoded_len = data.get_u32_le() as usize;
132
133        let (struct_data, remaining) = data.split_at(encoded_len);
134        *data = remaining;
135        let fields = deserializer.deserialize(struct_data)?;
136
137        Ok(ScalarImpl::Struct(StructValue::new(fields)))
138    }
139
140    // Copy of `plain` but call `new_` for the elements.
141    fn new_deserialize_list(item_type: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
142        let len = data.get_u32_le();
143        let mut builder = item_type.create_array_builder(len as usize);
144        for _ in 0..len {
145            builder.append(new_inner_deserialize_datum(data, item_type)?);
146        }
147        Ok(ScalarImpl::List(ListValue::new(builder.finish())))
148    }
149
150    // Different logic from `plain`:
151    //
152    // We don't reuse the deserialization of `struct<k, v>[]` for map, as it introduces overhead
153    // for column ids of `struct<k, v>`. It's unnecessary because the shape of the map is fixed.
154    fn new_deserialize_map(map_type: &MapType, data: &mut &[u8]) -> Result<ScalarImpl> {
155        let len = data.get_u32_le();
156        let mut builder = map_type
157            .clone() // FIXME: clone type everytime here is inefficient
158            .into_struct()
159            .create_array_builder(len as usize);
160        for _ in 0..len {
161            let key = new_deserialize_scalar(map_type.key(), data)?;
162            let value = new_inner_deserialize_datum(data, map_type.value())?;
163            let entry = StructValue::new(vec![Some(key), value]);
164            builder.append(Some(ScalarImpl::Struct(entry)));
165        }
166        Ok(ScalarImpl::Map(MapValue::from_entries(ListValue::new(
167            builder.finish(),
168        ))))
169    }
170
171    // Copy of `plain` but call `new_` for composite types.
172    pub fn new_deserialize_scalar(ty: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
173        Ok(match ty {
174            DataType::Struct(struct_def) => new_deserialize_struct(struct_def, data)?,
175            DataType::List(item_type) => new_deserialize_list(item_type, data)?,
176            DataType::Map(map_type) => new_deserialize_map(map_type, data)?,
177            data_types::simple!() => plain::deserialize_value(ty, data)?,
178        })
179    }
180}
181
182/// When a row has columns no more than this number, we will use stack for some intermediate buffers.
183const COLUMN_ON_STACK: usize = 8;
184
185/// The width of the offset of the encoded data, i.e., how many bytes are used to represent the offset.
186#[derive(Debug, Clone, Copy, PartialEq, Eq)]
187#[repr(u8)]
188enum OffsetWidth {
189    /// The offset of encoded data can be represented by u8.
190    Offset8 = 0b01,
191    /// The offset of encoded data can be represented by u16.
192    Offset16 = 0b10,
193    /// The offset of encoded data can be represented by u32.
194    Offset32 = 0b11,
195}
196
197impl OffsetWidth {
198    /// Get the width of the offset in bytes.
199    const fn width(self) -> usize {
200        match self {
201            OffsetWidth::Offset8 => 1,
202            OffsetWidth::Offset16 => 2,
203            OffsetWidth::Offset32 => 4,
204        }
205    }
206
207    const fn into_bits(self) -> u8 {
208        self as u8
209    }
210
211    const fn from_bits(bits: u8) -> Self {
212        match bits {
213            0b01 => OffsetWidth::Offset8,
214            0b10 => OffsetWidth::Offset16,
215            0b11 => OffsetWidth::Offset32,
216            _ => panic!("invalid offset width bits"),
217        }
218    }
219}
220
221/// Header (metadata) of the encoded row.
222///
223/// Layout (most to least significant bits):
224///
225/// ```text
226/// | magic | reserved | offset |
227/// |   1   |    5     |   2    |
228/// ```
229#[bitfield(u8, order = Msb)]
230#[derive(PartialEq, Eq)]
231struct Header {
232    /// Magic bit to indicate it's column-aware encoding.
233    /// Note that in plain value encoding, the first byte is always 0 or 1 for nullability,
234    /// of which the most significant bit is always 0.
235    #[bits(1, default = true, access = RO)]
236    magic: bool,
237
238    #[bits(5)]
239    _reserved: u8,
240
241    /// Indicate the offset width of the encoded data.
242    #[bits(2, default = OffsetWidth::Offset8)]
243    offset: OffsetWidth,
244}
245
246/// `RowEncoding` holds row-specific information for Column-Aware Encoding
247struct RowEncoding {
248    header: Header,
249    offsets: SmallVec<[u8; COLUMN_ON_STACK * 2]>,
250    data: Vec<u8>,
251}
252
253/// A trait unifying [`ToDatumRef`] and already encoded bytes.
254trait Encode {
255    fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>);
256}
257
258impl<T> Encode for T
259where
260    T: ToDatumRef,
261{
262    fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>) {
263        if let Some(v) = self.to_datum_ref() {
264            let curr_len = data.len();
265
266            // Use different encoding logic for alterable types.
267            if data_type.can_alter() == Some(true) {
268                new_serde::new_serialize_scalar(data_type, v, data);
269            } else {
270                plain::serialize_scalar(v, data);
271            }
272
273            // See `Deserializer::deserialize` for the decoding logic to understand this check.
274            // See also: https://github.com/risingwavelabs/risingwave/issues/23050
275            debug_assert_ne!(
276                data.len(),
277                curr_len,
278                "scalar ({v:?}) should not be encoded to empty bytes, as it will be indistinguishable from NULL",
279            );
280        }
281    }
282}
283
284impl Encode for Option<&[u8]> {
285    fn encode_to(self, _data_type: &DataType, data: &mut Vec<u8>) {
286        // Already encoded, just copy the bytes.
287        if let Some(v) = self {
288            data.extend(v);
289        }
290    }
291}
292
293impl RowEncoding {
294    fn new() -> Self {
295        RowEncoding {
296            header: Header::new(),
297            offsets: Default::default(),
298            data: Default::default(),
299        }
300    }
301
302    fn set_offsets(&mut self, usize_offsets: &[usize]) {
303        debug_assert!(
304            self.offsets.is_empty(),
305            "should not set offsets multiple times"
306        );
307
308        // Use 0 if there's no data.
309        let max_offset = usize_offsets.last().copied().unwrap_or(0);
310
311        const U8_MAX: usize = u8::MAX as usize;
312        const U16_MAX: usize = u16::MAX as usize;
313        const U32_MAX: usize = u32::MAX as usize;
314
315        let offset_width = match max_offset {
316            _n @ ..=U8_MAX => OffsetWidth::Offset8,
317            _n @ ..=U16_MAX => OffsetWidth::Offset16,
318            _n @ ..=U32_MAX => OffsetWidth::Offset32,
319            _ => panic!("encoding length {} exceeds u32", max_offset),
320        };
321        self.header.set_offset(offset_width);
322
323        self.offsets
324            .resize(usize_offsets.len() * offset_width.width(), 0);
325
326        let mut offsets_buf = &mut self.offsets[..];
327        for &offset in usize_offsets {
328            offsets_buf.put_uint_le(offset as u64, offset_width.width());
329        }
330    }
331
332    fn encode<T: Encode>(
333        &mut self,
334        datums: impl IntoIterator<Item = T>,
335        data_types: impl IntoIterator<Item = &DataType>,
336    ) {
337        debug_assert!(
338            self.data.is_empty(),
339            "should not encode one RowEncoding object multiple times."
340        );
341        let datums = datums.into_iter();
342        let mut offset_usize =
343            SmallVec::<[usize; COLUMN_ON_STACK]>::with_capacity(datums.size_hint().0);
344        for (datum, data_type) in datums.zip_eq_debug(data_types) {
345            offset_usize.push(self.data.len());
346            datum.encode_to(data_type, &mut self.data);
347        }
348        self.set_offsets(&offset_usize);
349    }
350}
351
352mod data_types {
353    use crate::types::{DataType, StructType};
354
355    /// A trait unifying data types of a row and field types of a struct.
356    pub trait DataTypes: Clone {
357        fn iter(&self) -> impl ExactSizeIterator<Item = &DataType>;
358        fn at(&self, index: usize) -> &DataType;
359    }
360
361    impl<T> DataTypes for T
362    where
363        T: AsRef<[DataType]> + Clone,
364    {
365        fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
366            self.as_ref().iter()
367        }
368
369        fn at(&self, index: usize) -> &DataType {
370            &self.as_ref()[index]
371        }
372    }
373
374    impl DataTypes for StructType {
375        fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
376            self.types()
377        }
378
379        fn at(&self, index: usize) -> &DataType {
380            self.type_at(index)
381        }
382    }
383}
384use data_types::DataTypes;
385
386/// Column-Aware `Serializer` holds schema related information, and shall be
387/// created again once the schema changes
388#[derive(Clone)]
389pub struct Serializer<D: DataTypes = Vec<DataType>> {
390    encoded_column_ids: EncodedColumnIds,
391    data_types: D,
392}
393
394type EncodedColumnIds = SmallVec<[u8; COLUMN_ON_STACK * 4]>;
395
396fn encode_column_ids(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> EncodedColumnIds {
397    // currently we hard-code ColumnId as i32
398    let mut encoded_column_ids = smallvec![0; column_ids.len() * 4];
399    let mut buf = &mut encoded_column_ids[..];
400    for id in column_ids {
401        buf.put_i32_le(id.get_id());
402    }
403    encoded_column_ids
404}
405
406impl Serializer {
407    /// Create a new `Serializer` with given `column_ids` and `data_types`.
408    pub fn new(column_ids: &[ColumnId], data_types: impl IntoIterator<Item = DataType>) -> Self {
409        Self {
410            encoded_column_ids: encode_column_ids(column_ids.iter().copied()),
411            data_types: data_types.into_iter().collect(),
412        }
413    }
414}
415
416impl Serializer<StructType> {
417    /// Create a new `Serializer` for the fields of the given struct.
418    ///
419    /// Panic if the struct type does not have field ids.
420    pub fn from_struct(struct_type: StructType) -> Self {
421        Self {
422            encoded_column_ids: encode_column_ids(struct_type.ids().unwrap()),
423            data_types: struct_type,
424        }
425    }
426}
427
428impl<D: DataTypes> Serializer<D> {
429    fn datum_num(&self) -> usize {
430        self.encoded_column_ids.len() / 4
431    }
432
433    fn serialize_raw<T: Encode>(&self, datums: impl IntoIterator<Item = T>) -> Vec<u8> {
434        let mut encoding = RowEncoding::new();
435        encoding.encode(datums, self.data_types.iter());
436        self.finish(encoding)
437    }
438
439    fn finish(&self, encoding: RowEncoding) -> Vec<u8> {
440        let mut row_bytes = Vec::with_capacity(
441            5 + self.encoded_column_ids.len() + encoding.offsets.len() + encoding.data.len(), /* 5 comes from u8+u32 */
442        );
443        row_bytes.put_u8(encoding.header.into_bits());
444        row_bytes.put_u32_le(self.datum_num() as u32);
445        row_bytes.extend(&self.encoded_column_ids);
446        row_bytes.extend(&encoding.offsets);
447        row_bytes.extend(&encoding.data);
448
449        row_bytes
450    }
451}
452
453impl<D: DataTypes> ValueRowSerializer for Serializer<D> {
454    /// Serialize a row under the schema of the Serializer
455    fn serialize(&self, row: impl Row) -> Vec<u8> {
456        assert_eq!(row.len(), self.datum_num());
457        self.serialize_raw(row.iter())
458    }
459}
460
461/// A view of the encoded bytes, which can be iterated over to get the column id and data.
462/// Used for deserialization.
463// TODO: can we unify this with `RowEncoding`, which is for serialization?
464#[derive(Clone)]
465struct EncodedBytes<'a> {
466    header: Header,
467
468    // When iterating, we will consume `column_ids` and `offsets` while keep `data` unchanged.
469    // This is because we record absolute values in `offsets` to index into `data`.
470    column_ids: &'a [u8],
471    offsets: &'a [u8],
472    data: &'a [u8],
473}
474
475impl<'a> EncodedBytes<'a> {
476    fn new(mut encoded_bytes: &'a [u8]) -> Result<Self> {
477        let header = Header::from_bits(encoded_bytes.get_u8());
478        if !header.magic() {
479            return Err(ValueEncodingError::InvalidFlag(header.into_bits()));
480        }
481        let offset_bytes = header.offset().width();
482
483        let datum_num = encoded_bytes.get_u32_le() as usize;
484        let offsets_start_idx = 4 * datum_num;
485        let data_start_idx = offsets_start_idx + datum_num * offset_bytes;
486
487        Ok(EncodedBytes {
488            header,
489            column_ids: &encoded_bytes[..offsets_start_idx],
490            offsets: &encoded_bytes[offsets_start_idx..data_start_idx],
491            data: &encoded_bytes[data_start_idx..],
492        })
493    }
494}
495
496impl<'a> Iterator for EncodedBytes<'a> {
497    type Item = (i32, &'a [u8]);
498
499    fn next(&mut self) -> Option<Self::Item> {
500        if self.column_ids.is_empty() {
501            assert!(self.offsets.is_empty());
502            return None;
503        }
504
505        let id = self.column_ids.get_i32_le();
506
507        let offset_width = self.header.offset().width();
508        let get_offset = |offsets: &mut &[u8]| offsets.get_uint_le(offset_width) as usize;
509
510        let this_offset = get_offset(&mut self.offsets);
511        let next_offset = if self.offsets.is_empty() {
512            self.data.len()
513        } else {
514            let mut peek_offsets = self.offsets; // copy the reference to the slice to avoid mutating the buf position
515            get_offset(&mut peek_offsets)
516        };
517
518        let data = &self.data[this_offset..next_offset];
519
520        Some((id, data))
521    }
522}
523
524/// Column-Aware `Deserializer` holds needed `ColumnIds` and their corresponding schema
525/// Should non-null default values be specified, a new field could be added to Deserializer
526#[derive(Clone)]
527pub struct Deserializer<D: DataTypes = Arc<[DataType]>> {
528    mapping: ColumnMapping,
529    data_types: D,
530
531    /// A row with default values for each column.
532    ///
533    /// `None` if all default values are `NULL`, typically for struct fields.
534    default_row: Option<Vec<Datum>>,
535}
536
537/// A mapping from column id to the index of the column in the schema.
538#[derive(Clone)]
539enum ColumnMapping {
540    /// For small number of columns, use linear search with `SmallVec`. This ensures no heap allocation.
541    Small(SmallVec<[i32; COLUMN_ON_STACK]>),
542    /// For larger number of columns, build a `HashMap` for faster lookup.
543    Large(HashMap<i32, usize>),
544}
545
546impl ColumnMapping {
547    fn new(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> Self {
548        if column_ids.len() <= COLUMN_ON_STACK {
549            Self::Small(column_ids.map(|c| c.get_id()).collect())
550        } else {
551            Self::Large(
552                column_ids
553                    .enumerate()
554                    .map(|(i, c)| (c.get_id(), i))
555                    .collect(),
556            )
557        }
558    }
559
560    fn get(&self, id: i32) -> Option<usize> {
561        match self {
562            ColumnMapping::Small(vec) => vec.iter().position(|&x| x == id),
563            ColumnMapping::Large(map) => map.get(&id).copied(),
564        }
565    }
566}
567
568impl Deserializer {
569    pub fn new(
570        column_ids: &[ColumnId],
571        schema: Arc<[DataType]>,
572        column_with_default: impl Iterator<Item = (usize, Datum)>,
573    ) -> Self {
574        assert_eq!(column_ids.len(), schema.len());
575        let mut default_row: Vec<Datum> = vec![None; schema.len()];
576        for (i, datum) in column_with_default {
577            default_row[i] = datum;
578        }
579        Self {
580            mapping: ColumnMapping::new(column_ids.iter().copied()),
581            data_types: schema,
582            default_row: Some(default_row),
583        }
584    }
585}
586
587impl Deserializer<StructType> {
588    /// Create a new `Deserializer` for the fields of the given struct.
589    ///
590    /// Panic if the struct type does not have field ids.
591    pub fn from_struct(struct_type: StructType) -> Self {
592        Self {
593            mapping: ColumnMapping::new(struct_type.ids().unwrap()),
594            data_types: struct_type,
595            default_row: None,
596        }
597    }
598}
599
600impl<D: DataTypes> ValueRowDeserializer for Deserializer<D> {
601    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
602        let encoded_bytes = EncodedBytes::new(encoded_bytes)?;
603
604        let mut row =
605            (self.default_row.clone()).unwrap_or_else(|| vec![None; self.data_types.iter().len()]);
606
607        for (id, mut data) in encoded_bytes {
608            let Some(decoded_idx) = self.mapping.get(id) else {
609                continue;
610            };
611            let data_type = self.data_types.at(decoded_idx);
612
613            let datum = if data.is_empty() {
614                None
615            } else if data_type.can_alter() == Some(true) {
616                Some(new_serde::new_deserialize_scalar(data_type, &mut data)?)
617            } else {
618                Some(plain::deserialize_value(data_type, &mut data)?)
619            };
620
621            row[decoded_idx] = datum;
622        }
623
624        Ok(row)
625    }
626}
627
628/// Combined column-aware `Serializer` and `Deserializer` given the same
629/// `column_ids` and `schema`
630#[derive(Clone)]
631pub struct ColumnAwareSerde {
632    pub serializer: Serializer,
633    pub deserializer: Deserializer,
634}
635
636impl ValueRowSerializer for ColumnAwareSerde {
637    fn serialize(&self, row: impl Row) -> Vec<u8> {
638        self.serializer.serialize(row)
639    }
640}
641
642impl ValueRowDeserializer for ColumnAwareSerde {
643    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
644        self.deserializer.deserialize(encoded_bytes)
645    }
646}
647
648/// Deserializes row `encoded_bytes`, drops columns not in `valid_column_ids`, serializes and returns.
649/// If no column is dropped, returns None.
650// TODO: we only support trimming dropped top-level columns here; also support nested fields.
651pub fn try_drop_invalid_columns(
652    encoded_bytes: &[u8],
653    valid_column_ids: &HashSet<i32>,
654) -> Option<Vec<u8>> {
655    let encoded_bytes = EncodedBytes::new(encoded_bytes).ok()?;
656
657    let has_invalid_column = (encoded_bytes.clone()).any(|(id, _)| !valid_column_ids.contains(&id));
658    if !has_invalid_column {
659        return None;
660    }
661
662    // Slow path that drops columns. Should be rare.
663
664    let mut datums = Vec::with_capacity(valid_column_ids.len());
665    let mut column_ids = Vec::with_capacity(valid_column_ids.len());
666
667    for (id, data) in encoded_bytes {
668        if valid_column_ids.contains(&id) {
669            column_ids.push(ColumnId::new(id));
670            datums.push(if data.is_empty() { None } else { Some(data) });
671        }
672    }
673
674    // Data types are only needed when we are actually serializing. But we have encoded data here.
675    // Simple pass dummy data types.
676    let dummy_data_types = vec![DataType::Boolean; column_ids.len()];
677
678    let row_bytes = Serializer::new(&column_ids, dummy_data_types).serialize_raw(datums);
679    Some(row_bytes)
680}