risingwave_common/util/value_encoding/
column_aware_row_encoding.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Column-aware row encoding is an encoding format which converts row into a binary form that
16//! remains explanable after schema changes
17//! Current design of flag just contains 1 meaningful information: the 2 LSBs represents
18//! the size of offsets: `u8`/`u16`/`u32`
19//! We have a `Serializer` and a `Deserializer` for each schema of `Row`, which can be reused
20//! until schema changes
21
22use std::collections::HashSet;
23use std::sync::Arc;
24
25use ahash::HashMap;
26use bitfield_struct::bitfield;
27use bytes::{Buf, BufMut};
28use rw_iter_util::ZipEqDebug;
29use smallvec::{SmallVec, smallvec};
30
31use super::error::ValueEncodingError;
32use super::{Result, ValueRowDeserializer, ValueRowSerializer};
33use crate::catalog::ColumnId;
34use crate::row::Row;
35use crate::types::{DataType, Datum, ScalarRefImpl, StructType, ToDatumRef};
36use crate::util::value_encoding as plain;
37
38/// Serialize and deserialize functions that recursively use [`ColumnAwareSerde`] for nested fields
39/// of composite types.
40///
41/// Only for column with types that can be altered ([`DataType::can_alter`]).
42mod new_serde {
43    use super::*;
44    use crate::array::{ListRef, ListValue, MapRef, MapValue, StructRef, StructValue};
45    use crate::types::{ListType, MapType, ScalarImpl, StructType, data_types};
46
47    // --- serialize ---
48
49    // Copy of `plain` but call `new_` for the scalar.
50    fn new_serialize_datum(
51        data_type: &DataType,
52        datum_ref: impl ToDatumRef,
53        buf: &mut impl BufMut,
54    ) {
55        if let Some(d) = datum_ref.to_datum_ref() {
56            buf.put_u8(1);
57            new_serialize_scalar(data_type, d, buf)
58        } else {
59            buf.put_u8(0);
60        }
61    }
62
63    // Different logic from `plain`:
64    //
65    // Recursively construct a new column-aware `Serializer` for nested fields.
66    fn new_serialize_struct(struct_type: &StructType, value: StructRef<'_>, buf: &mut impl BufMut) {
67        let serializer = super::Serializer::from_struct(struct_type.clone()); // cloning `StructType` is lightweight
68
69        // `StructRef` is interpreted as a `Row` here.
70        let bytes = serializer.serialize(value); // TODO: serialize into the buf directly if we can reserve space accurately
71        buf.put_u32_le(bytes.len() as _);
72        buf.put_slice(&bytes);
73    }
74
75    // Copy of `plain` but call `new_` for the elements.
76    fn new_serialize_list(list_type: &ListType, value: ListRef<'_>, buf: &mut impl BufMut) {
77        let elems = value.iter();
78        buf.put_u32_le(elems.len() as u32);
79
80        elems.for_each(|field_value| {
81            new_serialize_datum(list_type.elem(), field_value, buf);
82        });
83    }
84
85    // Different logic from `plain`:
86    //
87    // We don't reuse the serialization of `struct<k, v>[]` for map, as it introduces overhead
88    // for column ids of `struct<k, v>`. It's unnecessary because the shape of the map is fixed.
89    fn new_serialize_map(map_type: &MapType, value: MapRef<'_>, buf: &mut impl BufMut) {
90        let elems = value.iter();
91        buf.put_u32_le(elems.len() as u32);
92
93        elems.for_each(|(k, v)| {
94            new_serialize_scalar(map_type.key(), k, buf);
95            new_serialize_datum(map_type.value(), v, buf);
96        });
97    }
98
99    // Copy of `plain` but call `new_` for composite types.
100    pub fn new_serialize_scalar(
101        data_type: &DataType,
102        value: ScalarRefImpl<'_>,
103        buf: &mut impl BufMut,
104    ) {
105        match value {
106            ScalarRefImpl::Struct(s) => new_serialize_struct(data_type.as_struct(), s, buf),
107            ScalarRefImpl::List(l) => new_serialize_list(data_type.as_list(), l, buf),
108            ScalarRefImpl::Map(m) => new_serialize_map(data_type.as_map(), m, buf),
109
110            _ => plain::serialize_scalar(value, buf),
111        }
112    }
113
114    // --- deserialize ---
115
116    // Copy of `plain` but call `new_` for the scalar.
117    fn new_inner_deserialize_datum(data: &mut &[u8], ty: &DataType) -> Result<Datum> {
118        let null_tag = data.get_u8();
119        match null_tag {
120            0 => Ok(None),
121            1 => Some(new_deserialize_scalar(ty, data)).transpose(),
122            _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
123        }
124    }
125
126    // Different logic from `plain`:
127    //
128    // Recursively construct a new column-aware `Deserializer` for nested fields.
129    fn new_deserialize_struct(struct_def: &StructType, data: &mut &[u8]) -> Result<ScalarImpl> {
130        let deserializer = super::Deserializer::from_struct(struct_def.clone()); // cloning `StructType` is lightweight
131        let encoded_len = data.get_u32_le() as usize;
132
133        let (struct_data, remaining) = data.split_at(encoded_len);
134        *data = remaining;
135        let fields = deserializer.deserialize(struct_data)?;
136
137        Ok(ScalarImpl::Struct(StructValue::new(fields)))
138    }
139
140    // Copy of `plain` but call `new_` for the elements.
141    fn new_deserialize_list(list_type: &ListType, data: &mut &[u8]) -> Result<ScalarImpl> {
142        let elem_type = list_type.elem();
143        let len = data.get_u32_le();
144        let mut builder = elem_type.create_array_builder(len as usize);
145        for _ in 0..len {
146            builder.append(new_inner_deserialize_datum(data, elem_type)?);
147        }
148        Ok(ScalarImpl::List(ListValue::new(builder.finish())))
149    }
150
151    // Different logic from `plain`:
152    //
153    // We don't reuse the deserialization of `struct<k, v>[]` for map, as it introduces overhead
154    // for column ids of `struct<k, v>`. It's unnecessary because the shape of the map is fixed.
155    fn new_deserialize_map(map_type: &MapType, data: &mut &[u8]) -> Result<ScalarImpl> {
156        let len = data.get_u32_le();
157        let mut builder = map_type
158            .clone() // FIXME: clone type everytime here is inefficient
159            .into_struct()
160            .create_array_builder(len as usize);
161        for _ in 0..len {
162            let key = new_deserialize_scalar(map_type.key(), data)?;
163            let value = new_inner_deserialize_datum(data, map_type.value())?;
164            let entry = StructValue::new(vec![Some(key), value]);
165            builder.append(Some(ScalarImpl::Struct(entry)));
166        }
167        Ok(ScalarImpl::Map(MapValue::from_entries(ListValue::new(
168            builder.finish(),
169        ))))
170    }
171
172    // Copy of `plain` but call `new_` for composite types.
173    pub fn new_deserialize_scalar(ty: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
174        Ok(match ty {
175            DataType::Struct(struct_def) => new_deserialize_struct(struct_def, data)?,
176            DataType::List(list_type) => new_deserialize_list(list_type, data)?,
177            DataType::Map(map_type) => new_deserialize_map(map_type, data)?,
178            data_types::simple!() => plain::deserialize_value(ty, data)?,
179        })
180    }
181}
182
183/// When a row has columns no more than this number, we will use stack for some intermediate buffers.
184const COLUMN_ON_STACK: usize = 8;
185
186/// The width of the offset of the encoded data, i.e., how many bytes are used to represent the offset.
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188#[repr(u8)]
189enum OffsetWidth {
190    /// The offset of encoded data can be represented by u8.
191    Offset8 = 0b01,
192    /// The offset of encoded data can be represented by u16.
193    Offset16 = 0b10,
194    /// The offset of encoded data can be represented by u32.
195    Offset32 = 0b11,
196}
197
198impl OffsetWidth {
199    /// Get the width of the offset in bytes.
200    const fn width(self) -> usize {
201        match self {
202            OffsetWidth::Offset8 => 1,
203            OffsetWidth::Offset16 => 2,
204            OffsetWidth::Offset32 => 4,
205        }
206    }
207
208    const fn into_bits(self) -> u8 {
209        self as u8
210    }
211
212    const fn from_bits(bits: u8) -> Self {
213        match bits {
214            0b01 => OffsetWidth::Offset8,
215            0b10 => OffsetWidth::Offset16,
216            0b11 => OffsetWidth::Offset32,
217            _ => panic!("invalid offset width bits"),
218        }
219    }
220}
221
222/// Header (metadata) of the encoded row.
223///
224/// Layout (most to least significant bits):
225///
226/// ```text
227/// | magic | reserved | offset |
228/// |   1   |    5     |   2    |
229/// ```
230#[bitfield(u8, order = Msb)]
231#[derive(PartialEq, Eq)]
232struct Header {
233    /// Magic bit to indicate it's column-aware encoding.
234    /// Note that in plain value encoding, the first byte is always 0 or 1 for nullability,
235    /// of which the most significant bit is always 0.
236    #[bits(1, default = true, access = RO)]
237    magic: bool,
238
239    #[bits(5)]
240    _reserved: u8,
241
242    /// Indicate the offset width of the encoded data.
243    #[bits(2, default = OffsetWidth::Offset8)]
244    offset: OffsetWidth,
245}
246
247/// `RowEncoding` holds row-specific information for Column-Aware Encoding
248struct RowEncoding {
249    header: Header,
250    offsets: SmallVec<[u8; COLUMN_ON_STACK * 2]>,
251    data: Vec<u8>,
252}
253
254/// A trait unifying [`ToDatumRef`] and already encoded bytes.
255trait Encode {
256    fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>);
257}
258
259impl<T> Encode for T
260where
261    T: ToDatumRef,
262{
263    fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>) {
264        if let Some(v) = self.to_datum_ref() {
265            let curr_len = data.len();
266
267            // Use different encoding logic for alterable types.
268            if data_type.can_alter() == Some(true) {
269                new_serde::new_serialize_scalar(data_type, v, data);
270            } else {
271                plain::serialize_scalar(v, data);
272            }
273
274            // See `Deserializer::deserialize` for the decoding logic to understand this check.
275            // See also: https://github.com/risingwavelabs/risingwave/issues/23050
276            debug_assert_ne!(
277                data.len(),
278                curr_len,
279                "scalar ({v:?}) should not be encoded to empty bytes, as it will be indistinguishable from NULL",
280            );
281        }
282    }
283}
284
285impl Encode for Option<&[u8]> {
286    fn encode_to(self, _data_type: &DataType, data: &mut Vec<u8>) {
287        // Already encoded, just copy the bytes.
288        if let Some(v) = self {
289            data.extend(v);
290        }
291    }
292}
293
294impl RowEncoding {
295    fn new() -> Self {
296        RowEncoding {
297            header: Header::new(),
298            offsets: Default::default(),
299            data: Default::default(),
300        }
301    }
302
303    fn set_offsets(&mut self, usize_offsets: &[usize]) {
304        debug_assert!(
305            self.offsets.is_empty(),
306            "should not set offsets multiple times"
307        );
308
309        // Use 0 if there's no data.
310        let max_offset = usize_offsets.last().copied().unwrap_or(0);
311
312        const U8_MAX: usize = u8::MAX as usize;
313        const U16_MAX: usize = u16::MAX as usize;
314        const U32_MAX: usize = u32::MAX as usize;
315
316        let offset_width = match max_offset {
317            _n @ ..=U8_MAX => OffsetWidth::Offset8,
318            _n @ ..=U16_MAX => OffsetWidth::Offset16,
319            _n @ ..=U32_MAX => OffsetWidth::Offset32,
320            _ => panic!("encoding length {} exceeds u32", max_offset),
321        };
322        self.header.set_offset(offset_width);
323
324        self.offsets
325            .resize(usize_offsets.len() * offset_width.width(), 0);
326
327        let mut offsets_buf = &mut self.offsets[..];
328        for &offset in usize_offsets {
329            offsets_buf.put_uint_le(offset as u64, offset_width.width());
330        }
331    }
332
333    fn encode<T: Encode>(
334        &mut self,
335        datums: impl IntoIterator<Item = T>,
336        data_types: impl IntoIterator<Item = &DataType>,
337    ) {
338        debug_assert!(
339            self.data.is_empty(),
340            "should not encode one RowEncoding object multiple times."
341        );
342        let datums = datums.into_iter();
343        let mut offset_usize =
344            SmallVec::<[usize; COLUMN_ON_STACK]>::with_capacity(datums.size_hint().0);
345        for (datum, data_type) in datums.zip_eq_debug(data_types) {
346            offset_usize.push(self.data.len());
347            datum.encode_to(data_type, &mut self.data);
348        }
349        self.set_offsets(&offset_usize);
350    }
351}
352
353mod data_types {
354    use crate::types::{DataType, StructType};
355
356    /// A trait unifying data types of a row and field types of a struct.
357    pub trait DataTypes: Clone {
358        fn iter(&self) -> impl ExactSizeIterator<Item = &DataType>;
359        fn at(&self, index: usize) -> &DataType;
360    }
361
362    impl<T> DataTypes for T
363    where
364        T: AsRef<[DataType]> + Clone,
365    {
366        fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
367            self.as_ref().iter()
368        }
369
370        fn at(&self, index: usize) -> &DataType {
371            &self.as_ref()[index]
372        }
373    }
374
375    impl DataTypes for StructType {
376        fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
377            self.types()
378        }
379
380        fn at(&self, index: usize) -> &DataType {
381            self.type_at(index)
382        }
383    }
384}
385use data_types::DataTypes;
386
387/// Column-Aware `Serializer` holds schema related information, and shall be
388/// created again once the schema changes
389#[derive(Clone)]
390pub struct Serializer<D: DataTypes = Vec<DataType>> {
391    encoded_column_ids: EncodedColumnIds,
392    data_types: D,
393}
394
395type EncodedColumnIds = SmallVec<[u8; COLUMN_ON_STACK * 4]>;
396
397fn encode_column_ids(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> EncodedColumnIds {
398    // currently we hard-code ColumnId as i32
399    let mut encoded_column_ids = smallvec![0; column_ids.len() * 4];
400    let mut buf = &mut encoded_column_ids[..];
401    for id in column_ids {
402        buf.put_i32_le(id.get_id());
403    }
404    encoded_column_ids
405}
406
407impl Serializer {
408    /// Create a new `Serializer` with given `column_ids` and `data_types`.
409    pub fn new(column_ids: &[ColumnId], data_types: impl IntoIterator<Item = DataType>) -> Self {
410        Self {
411            encoded_column_ids: encode_column_ids(column_ids.iter().copied()),
412            data_types: data_types.into_iter().collect(),
413        }
414    }
415}
416
417impl Serializer<StructType> {
418    /// Create a new `Serializer` for the fields of the given struct.
419    ///
420    /// Panic if the struct type does not have field ids.
421    pub fn from_struct(struct_type: StructType) -> Self {
422        Self {
423            encoded_column_ids: encode_column_ids(struct_type.ids().unwrap()),
424            data_types: struct_type,
425        }
426    }
427}
428
429impl<D: DataTypes> Serializer<D> {
430    fn datum_num(&self) -> usize {
431        self.encoded_column_ids.len() / 4
432    }
433
434    fn serialize_raw<T: Encode>(&self, datums: impl IntoIterator<Item = T>) -> Vec<u8> {
435        let mut encoding = RowEncoding::new();
436        encoding.encode(datums, self.data_types.iter());
437        self.finish(encoding)
438    }
439
440    fn finish(&self, encoding: RowEncoding) -> Vec<u8> {
441        let mut row_bytes = Vec::with_capacity(
442            5 + self.encoded_column_ids.len() + encoding.offsets.len() + encoding.data.len(), /* 5 comes from u8+u32 */
443        );
444        row_bytes.put_u8(encoding.header.into_bits());
445        row_bytes.put_u32_le(self.datum_num() as u32);
446        row_bytes.extend(&self.encoded_column_ids);
447        row_bytes.extend(&encoding.offsets);
448        row_bytes.extend(&encoding.data);
449
450        row_bytes
451    }
452}
453
454impl<D: DataTypes> ValueRowSerializer for Serializer<D> {
455    /// Serialize a row under the schema of the Serializer
456    fn serialize(&self, row: impl Row) -> Vec<u8> {
457        assert_eq!(row.len(), self.datum_num());
458        self.serialize_raw(row.iter())
459    }
460}
461
462/// A view of the encoded bytes, which can be iterated over to get the column id and data.
463/// Used for deserialization.
464// TODO: can we unify this with `RowEncoding`, which is for serialization?
465#[derive(Clone)]
466struct EncodedBytes<'a> {
467    header: Header,
468
469    // When iterating, we will consume `column_ids` and `offsets` while keep `data` unchanged.
470    // This is because we record absolute values in `offsets` to index into `data`.
471    column_ids: &'a [u8],
472    offsets: &'a [u8],
473    data: &'a [u8],
474}
475
476impl<'a> EncodedBytes<'a> {
477    fn new(mut encoded_bytes: &'a [u8]) -> Result<Self> {
478        let header = Header::from_bits(encoded_bytes.get_u8());
479        if !header.magic() {
480            return Err(ValueEncodingError::InvalidFlag(header.into_bits()));
481        }
482        let offset_bytes = header.offset().width();
483
484        let datum_num = encoded_bytes.get_u32_le() as usize;
485        let offsets_start_idx = 4 * datum_num;
486        let data_start_idx = offsets_start_idx + datum_num * offset_bytes;
487
488        Ok(EncodedBytes {
489            header,
490            column_ids: &encoded_bytes[..offsets_start_idx],
491            offsets: &encoded_bytes[offsets_start_idx..data_start_idx],
492            data: &encoded_bytes[data_start_idx..],
493        })
494    }
495}
496
497impl<'a> Iterator for EncodedBytes<'a> {
498    type Item = (i32, &'a [u8]);
499
500    fn next(&mut self) -> Option<Self::Item> {
501        if self.column_ids.is_empty() {
502            assert!(self.offsets.is_empty());
503            return None;
504        }
505
506        let id = self.column_ids.get_i32_le();
507
508        let offset_width = self.header.offset().width();
509        let get_offset = |offsets: &mut &[u8]| offsets.get_uint_le(offset_width) as usize;
510
511        let this_offset = get_offset(&mut self.offsets);
512        let next_offset = if self.offsets.is_empty() {
513            self.data.len()
514        } else {
515            let mut peek_offsets = self.offsets; // copy the reference to the slice to avoid mutating the buf position
516            get_offset(&mut peek_offsets)
517        };
518
519        let data = &self.data[this_offset..next_offset];
520
521        Some((id, data))
522    }
523}
524
525/// Column-Aware `Deserializer` holds needed `ColumnIds` and their corresponding schema
526/// Should non-null default values be specified, a new field could be added to Deserializer
527#[derive(Clone)]
528pub struct Deserializer<D: DataTypes = Arc<[DataType]>> {
529    mapping: ColumnMapping,
530    data_types: D,
531
532    /// A row with default values for each column.
533    ///
534    /// `None` if all default values are `NULL`, typically for struct fields.
535    default_row: Option<Vec<Datum>>,
536}
537
538/// A mapping from column id to the index of the column in the schema.
539#[derive(Clone)]
540enum ColumnMapping {
541    /// For small number of columns, use linear search with `SmallVec`. This ensures no heap allocation.
542    Small(SmallVec<[i32; COLUMN_ON_STACK]>),
543    /// For larger number of columns, build a `HashMap` for faster lookup.
544    Large(HashMap<i32, usize>),
545}
546
547impl ColumnMapping {
548    fn new(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> Self {
549        if column_ids.len() <= COLUMN_ON_STACK {
550            Self::Small(column_ids.map(|c| c.get_id()).collect())
551        } else {
552            Self::Large(
553                column_ids
554                    .enumerate()
555                    .map(|(i, c)| (c.get_id(), i))
556                    .collect(),
557            )
558        }
559    }
560
561    fn get(&self, id: i32) -> Option<usize> {
562        match self {
563            ColumnMapping::Small(vec) => vec.iter().position(|&x| x == id),
564            ColumnMapping::Large(map) => map.get(&id).copied(),
565        }
566    }
567}
568
569impl Deserializer {
570    pub fn new(
571        column_ids: &[ColumnId],
572        schema: Arc<[DataType]>,
573        column_with_default: impl Iterator<Item = (usize, Datum)>,
574    ) -> Self {
575        assert_eq!(column_ids.len(), schema.len());
576        let mut default_row: Vec<Datum> = vec![None; schema.len()];
577        for (i, datum) in column_with_default {
578            default_row[i] = datum;
579        }
580        Self {
581            mapping: ColumnMapping::new(column_ids.iter().copied()),
582            data_types: schema,
583            default_row: Some(default_row),
584        }
585    }
586}
587
588impl Deserializer<StructType> {
589    /// Create a new `Deserializer` for the fields of the given struct.
590    ///
591    /// Panic if the struct type does not have field ids.
592    pub fn from_struct(struct_type: StructType) -> Self {
593        Self {
594            mapping: ColumnMapping::new(struct_type.ids().unwrap()),
595            data_types: struct_type,
596            default_row: None,
597        }
598    }
599}
600
601impl<D: DataTypes> ValueRowDeserializer for Deserializer<D> {
602    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
603        let encoded_bytes = EncodedBytes::new(encoded_bytes)?;
604
605        let mut row =
606            (self.default_row.clone()).unwrap_or_else(|| vec![None; self.data_types.iter().len()]);
607
608        for (id, mut data) in encoded_bytes {
609            let Some(decoded_idx) = self.mapping.get(id) else {
610                continue;
611            };
612            let data_type = self.data_types.at(decoded_idx);
613
614            let datum = if data.is_empty() {
615                None
616            } else if data_type.can_alter() == Some(true) {
617                Some(new_serde::new_deserialize_scalar(data_type, &mut data)?)
618            } else {
619                Some(plain::deserialize_value(data_type, &mut data)?)
620            };
621
622            row[decoded_idx] = datum;
623        }
624
625        Ok(row)
626    }
627}
628
629/// Combined column-aware `Serializer` and `Deserializer` given the same
630/// `column_ids` and `schema`
631#[derive(Clone)]
632pub struct ColumnAwareSerde {
633    pub serializer: Serializer,
634    pub deserializer: Deserializer,
635}
636
637impl ValueRowSerializer for ColumnAwareSerde {
638    fn serialize(&self, row: impl Row) -> Vec<u8> {
639        self.serializer.serialize(row)
640    }
641}
642
643impl ValueRowDeserializer for ColumnAwareSerde {
644    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
645        self.deserializer.deserialize(encoded_bytes)
646    }
647}
648
649/// Deserializes row `encoded_bytes`, drops columns not in `valid_column_ids`, serializes and returns.
650/// If no column is dropped, returns None.
651// TODO: we only support trimming dropped top-level columns here; also support nested fields.
652pub fn try_drop_invalid_columns(
653    encoded_bytes: &[u8],
654    valid_column_ids: &HashSet<i32>,
655) -> Option<Vec<u8>> {
656    let encoded_bytes = EncodedBytes::new(encoded_bytes).ok()?;
657
658    let has_invalid_column = (encoded_bytes.clone()).any(|(id, _)| !valid_column_ids.contains(&id));
659    if !has_invalid_column {
660        return None;
661    }
662
663    // Slow path that drops columns. Should be rare.
664
665    let mut datums = Vec::with_capacity(valid_column_ids.len());
666    let mut column_ids = Vec::with_capacity(valid_column_ids.len());
667
668    for (id, data) in encoded_bytes {
669        if valid_column_ids.contains(&id) {
670            column_ids.push(ColumnId::new(id));
671            datums.push(if data.is_empty() { None } else { Some(data) });
672        }
673    }
674
675    // Data types are only needed when we are actually serializing. But we have encoded data here.
676    // Simple pass dummy data types.
677    let dummy_data_types = vec![DataType::Boolean; column_ids.len()];
678
679    let row_bytes = Serializer::new(&column_ids, dummy_data_types).serialize_raw(datums);
680    Some(row_bytes)
681}