risingwave_common/util/value_encoding/
column_aware_row_encoding.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Column-aware row encoding is an encoding format which converts row into a binary form that
16//! remains explanable after schema changes
17//! Current design of flag just contains 1 meaningful information: the 2 LSBs represents
18//! the size of offsets: `u8`/`u16`/`u32`
19//! We have a `Serializer` and a `Deserializer` for each schema of `Row`, which can be reused
20//! until schema changes
21
22use std::collections::HashSet;
23use std::sync::Arc;
24
25use ahash::HashMap;
26use bitfield_struct::bitfield;
27use bytes::{Buf, BufMut};
28use rw_iter_util::ZipEqDebug;
29use smallvec::{SmallVec, smallvec};
30
31use super::error::ValueEncodingError;
32use super::{Result, ValueRowDeserializer, ValueRowSerializer};
33use crate::catalog::ColumnId;
34use crate::row::Row;
35use crate::types::{DataType, Datum, ScalarRefImpl, StructType, ToDatumRef};
36use crate::util::value_encoding as plain;
37
38/// Serialize and deserialize functions that recursively use [`ColumnAwareSerde`] for nested fields
39/// of composite types.
40///
41/// Only for column with types that can be altered ([`DataType::can_alter`]).
42mod new_serde {
43    use super::*;
44    use crate::array::{ListRef, ListValue, MapRef, MapValue, StructRef, StructValue};
45    use crate::types::{MapType, ScalarImpl, StructType, data_types};
46
47    // --- serialize ---
48
49    // Copy of `plain` but call `new_` for the scalar.
50    fn new_serialize_datum(
51        data_type: &DataType,
52        datum_ref: impl ToDatumRef,
53        buf: &mut impl BufMut,
54    ) {
55        if let Some(d) = datum_ref.to_datum_ref() {
56            buf.put_u8(1);
57            new_serialize_scalar(data_type, d, buf)
58        } else {
59            buf.put_u8(0);
60        }
61    }
62
63    // Different logic from `plain`:
64    //
65    // Recursively construct a new column-aware `Serializer` for nested fields.
66    fn new_serialize_struct(struct_type: &StructType, value: StructRef<'_>, buf: &mut impl BufMut) {
67        let serializer = super::Serializer::from_struct(struct_type.clone()); // cloning `StructType` is lightweight
68
69        // `StructRef` is interpreted as a `Row` here.
70        let bytes = serializer.serialize(value); // TODO: serialize into the buf directly if we can reserve space accurately
71        buf.put_u32_le(bytes.len() as _);
72        buf.put_slice(&bytes);
73    }
74
75    // Copy of `plain` but call `new_` for the elements.
76    fn new_serialize_list(inner_type: &DataType, value: ListRef<'_>, buf: &mut impl BufMut) {
77        let elems = value.iter();
78        buf.put_u32_le(elems.len() as u32);
79
80        elems.for_each(|field_value| {
81            new_serialize_datum(inner_type, field_value, buf);
82        });
83    }
84
85    // Different logic from `plain`:
86    //
87    // We don't reuse the serialization of `struct<k, v>[]` for map, as it introduces overhead
88    // for column ids of `struct<k, v>`. It's unnecessary because the shape of the map is fixed.
89    fn new_serialize_map(map_type: &MapType, value: MapRef<'_>, buf: &mut impl BufMut) {
90        let elems = value.iter();
91        buf.put_u32_le(elems.len() as u32);
92
93        elems.for_each(|(k, v)| {
94            new_serialize_scalar(map_type.key(), k, buf);
95            new_serialize_datum(map_type.value(), v, buf);
96        });
97    }
98
99    // Copy of `plain` but call `new_` for composite types.
100    pub fn new_serialize_scalar(
101        data_type: &DataType,
102        value: ScalarRefImpl<'_>,
103        buf: &mut impl BufMut,
104    ) {
105        match value {
106            ScalarRefImpl::Struct(s) => new_serialize_struct(data_type.as_struct(), s, buf),
107            ScalarRefImpl::List(l) => new_serialize_list(data_type.as_list_element_type(), l, buf),
108            ScalarRefImpl::Map(m) => new_serialize_map(data_type.as_map(), m, buf),
109
110            _ => plain::serialize_scalar(value, buf),
111        }
112    }
113
114    // --- deserialize ---
115
116    // Copy of `plain` but call `new_` for the scalar.
117    fn new_inner_deserialize_datum(data: &mut &[u8], ty: &DataType) -> Result<Datum> {
118        let null_tag = data.get_u8();
119        match null_tag {
120            0 => Ok(None),
121            1 => Some(new_deserialize_scalar(ty, data)).transpose(),
122            _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
123        }
124    }
125
126    // Different logic from `plain`:
127    //
128    // Recursively construct a new column-aware `Deserializer` for nested fields.
129    fn new_deserialize_struct(struct_def: &StructType, data: &mut &[u8]) -> Result<ScalarImpl> {
130        let deserializer = super::Deserializer::from_struct(struct_def.clone()); // cloning `StructType` is lightweight
131        let encoded_len = data.get_u32_le() as usize;
132
133        let (struct_data, remaining) = data.split_at(encoded_len);
134        *data = remaining;
135        let fields = deserializer.deserialize(struct_data)?;
136
137        Ok(ScalarImpl::Struct(StructValue::new(fields)))
138    }
139
140    // Copy of `plain` but call `new_` for the elements.
141    fn new_deserialize_list(item_type: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
142        let len = data.get_u32_le();
143        let mut builder = item_type.create_array_builder(len as usize);
144        for _ in 0..len {
145            builder.append(new_inner_deserialize_datum(data, item_type)?);
146        }
147        Ok(ScalarImpl::List(ListValue::new(builder.finish())))
148    }
149
150    // Different logic from `plain`:
151    //
152    // We don't reuse the deserialization of `struct<k, v>[]` for map, as it introduces overhead
153    // for column ids of `struct<k, v>`. It's unnecessary because the shape of the map is fixed.
154    fn new_deserialize_map(map_type: &MapType, data: &mut &[u8]) -> Result<ScalarImpl> {
155        let len = data.get_u32_le();
156        let mut builder = map_type
157            .clone() // FIXME: clone type everytime here is inefficient
158            .into_struct()
159            .create_array_builder(len as usize);
160        for _ in 0..len {
161            let key = new_deserialize_scalar(map_type.key(), data)?;
162            let value = new_inner_deserialize_datum(data, map_type.value())?;
163            let entry = StructValue::new(vec![Some(key), value]);
164            builder.append(Some(ScalarImpl::Struct(entry)));
165        }
166        Ok(ScalarImpl::Map(MapValue::from_entries(ListValue::new(
167            builder.finish(),
168        ))))
169    }
170
171    // Copy of `plain` but call `new_` for composite types.
172    pub fn new_deserialize_scalar(ty: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
173        Ok(match ty {
174            DataType::Struct(struct_def) => new_deserialize_struct(struct_def, data)?,
175            DataType::List(item_type) => new_deserialize_list(item_type, data)?,
176            DataType::Map(map_type) => new_deserialize_map(map_type, data)?,
177            DataType::Vector(_) => plain::deserialize_value(ty, data)?,
178
179            data_types::simple!() => plain::deserialize_value(ty, data)?,
180        })
181    }
182}
183
184/// When a row has columns no more than this number, we will use stack for some intermediate buffers.
185const COLUMN_ON_STACK: usize = 8;
186
187/// The width of the offset of the encoded data, i.e., how many bytes are used to represent the offset.
188#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189#[repr(u8)]
190enum OffsetWidth {
191    /// The offset of encoded data can be represented by u8.
192    Offset8 = 0b01,
193    /// The offset of encoded data can be represented by u16.
194    Offset16 = 0b10,
195    /// The offset of encoded data can be represented by u32.
196    Offset32 = 0b11,
197}
198
199impl OffsetWidth {
200    /// Get the width of the offset in bytes.
201    const fn width(self) -> usize {
202        match self {
203            OffsetWidth::Offset8 => 1,
204            OffsetWidth::Offset16 => 2,
205            OffsetWidth::Offset32 => 4,
206        }
207    }
208
209    const fn into_bits(self) -> u8 {
210        self as u8
211    }
212
213    const fn from_bits(bits: u8) -> Self {
214        match bits {
215            0b01 => OffsetWidth::Offset8,
216            0b10 => OffsetWidth::Offset16,
217            0b11 => OffsetWidth::Offset32,
218            _ => panic!("invalid offset width bits"),
219        }
220    }
221}
222
223/// Header (metadata) of the encoded row.
224///
225/// Layout (most to least significant bits):
226///
227/// ```text
228/// | magic | reserved | offset |
229/// |   1   |    5     |   2    |
230/// ```
231#[bitfield(u8, order = Msb)]
232#[derive(PartialEq, Eq)]
233struct Header {
234    /// Magic bit to indicate it's column-aware encoding.
235    /// Note that in plain value encoding, the first byte is always 0 or 1 for nullability,
236    /// of which the most significant bit is always 0.
237    #[bits(1, default = true, access = RO)]
238    magic: bool,
239
240    #[bits(5)]
241    _reserved: u8,
242
243    /// Indicate the offset width of the encoded data.
244    #[bits(2, default = OffsetWidth::Offset8)]
245    offset: OffsetWidth,
246}
247
248/// `RowEncoding` holds row-specific information for Column-Aware Encoding
249struct RowEncoding {
250    header: Header,
251    offsets: SmallVec<[u8; COLUMN_ON_STACK * 2]>,
252    data: Vec<u8>,
253}
254
255/// A trait unifying [`ToDatumRef`] and already encoded bytes.
256trait Encode {
257    fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>);
258}
259
260impl<T> Encode for T
261where
262    T: ToDatumRef,
263{
264    fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>) {
265        if let Some(v) = self.to_datum_ref() {
266            // Use different encoding logic for alterable types.
267            if data_type.can_alter() == Some(true) {
268                new_serde::new_serialize_scalar(data_type, v, data);
269            } else {
270                plain::serialize_scalar(v, data);
271            }
272        }
273    }
274}
275
276impl Encode for Option<&[u8]> {
277    fn encode_to(self, _data_type: &DataType, data: &mut Vec<u8>) {
278        // Already encoded, just copy the bytes.
279        if let Some(v) = self {
280            data.extend(v);
281        }
282    }
283}
284
285impl RowEncoding {
286    fn new() -> Self {
287        RowEncoding {
288            header: Header::new(),
289            offsets: Default::default(),
290            data: Default::default(),
291        }
292    }
293
294    fn set_offsets(&mut self, usize_offsets: &[usize]) {
295        debug_assert!(
296            self.offsets.is_empty(),
297            "should not set offsets multiple times"
298        );
299
300        // Use 0 if there's no data.
301        let max_offset = usize_offsets.last().copied().unwrap_or(0);
302
303        const U8_MAX: usize = u8::MAX as usize;
304        const U16_MAX: usize = u16::MAX as usize;
305        const U32_MAX: usize = u32::MAX as usize;
306
307        let offset_width = match max_offset {
308            _n @ ..=U8_MAX => OffsetWidth::Offset8,
309            _n @ ..=U16_MAX => OffsetWidth::Offset16,
310            _n @ ..=U32_MAX => OffsetWidth::Offset32,
311            _ => panic!("encoding length {} exceeds u32", max_offset),
312        };
313        self.header.set_offset(offset_width);
314
315        self.offsets
316            .resize(usize_offsets.len() * offset_width.width(), 0);
317
318        let mut offsets_buf = &mut self.offsets[..];
319        for &offset in usize_offsets {
320            offsets_buf.put_uint_le(offset as u64, offset_width.width());
321        }
322    }
323
324    fn encode<T: Encode>(
325        &mut self,
326        datums: impl IntoIterator<Item = T>,
327        data_types: impl IntoIterator<Item = &DataType>,
328    ) {
329        debug_assert!(
330            self.data.is_empty(),
331            "should not encode one RowEncoding object multiple times."
332        );
333        let datums = datums.into_iter();
334        let mut offset_usize =
335            SmallVec::<[usize; COLUMN_ON_STACK]>::with_capacity(datums.size_hint().0);
336        for (datum, data_type) in datums.zip_eq_debug(data_types) {
337            offset_usize.push(self.data.len());
338            datum.encode_to(data_type, &mut self.data);
339        }
340        self.set_offsets(&offset_usize);
341    }
342}
343
344mod data_types {
345    use crate::types::{DataType, StructType};
346
347    /// A trait unifying data types of a row and field types of a struct.
348    pub trait DataTypes: Clone {
349        fn iter(&self) -> impl ExactSizeIterator<Item = &DataType>;
350        fn at(&self, index: usize) -> &DataType;
351    }
352
353    impl<T> DataTypes for T
354    where
355        T: AsRef<[DataType]> + Clone,
356    {
357        fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
358            self.as_ref().iter()
359        }
360
361        fn at(&self, index: usize) -> &DataType {
362            &self.as_ref()[index]
363        }
364    }
365
366    impl DataTypes for StructType {
367        fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
368            self.types()
369        }
370
371        fn at(&self, index: usize) -> &DataType {
372            self.type_at(index)
373        }
374    }
375}
376use data_types::DataTypes;
377
378/// Column-Aware `Serializer` holds schema related information, and shall be
379/// created again once the schema changes
380#[derive(Clone)]
381pub struct Serializer<D: DataTypes = Vec<DataType>> {
382    encoded_column_ids: EncodedColumnIds,
383    data_types: D,
384}
385
386type EncodedColumnIds = SmallVec<[u8; COLUMN_ON_STACK * 4]>;
387
388fn encode_column_ids(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> EncodedColumnIds {
389    // currently we hard-code ColumnId as i32
390    let mut encoded_column_ids = smallvec![0; column_ids.len() * 4];
391    let mut buf = &mut encoded_column_ids[..];
392    for id in column_ids {
393        buf.put_i32_le(id.get_id());
394    }
395    encoded_column_ids
396}
397
398impl Serializer {
399    /// Create a new `Serializer` with given `column_ids` and `data_types`.
400    pub fn new(column_ids: &[ColumnId], data_types: impl IntoIterator<Item = DataType>) -> Self {
401        Self {
402            encoded_column_ids: encode_column_ids(column_ids.iter().copied()),
403            data_types: data_types.into_iter().collect(),
404        }
405    }
406}
407
408impl Serializer<StructType> {
409    /// Create a new `Serializer` for the fields of the given struct.
410    ///
411    /// Panic if the struct type does not have field ids.
412    pub fn from_struct(struct_type: StructType) -> Self {
413        Self {
414            encoded_column_ids: encode_column_ids(struct_type.ids().unwrap()),
415            data_types: struct_type,
416        }
417    }
418}
419
420impl<D: DataTypes> Serializer<D> {
421    fn datum_num(&self) -> usize {
422        self.encoded_column_ids.len() / 4
423    }
424
425    fn serialize_raw<T: Encode>(&self, datums: impl IntoIterator<Item = T>) -> Vec<u8> {
426        let mut encoding = RowEncoding::new();
427        encoding.encode(datums, self.data_types.iter());
428        self.finish(encoding)
429    }
430
431    fn finish(&self, encoding: RowEncoding) -> Vec<u8> {
432        let mut row_bytes = Vec::with_capacity(
433            5 + self.encoded_column_ids.len() + encoding.offsets.len() + encoding.data.len(), /* 5 comes from u8+u32 */
434        );
435        row_bytes.put_u8(encoding.header.into_bits());
436        row_bytes.put_u32_le(self.datum_num() as u32);
437        row_bytes.extend(&self.encoded_column_ids);
438        row_bytes.extend(&encoding.offsets);
439        row_bytes.extend(&encoding.data);
440
441        row_bytes
442    }
443}
444
445impl<D: DataTypes> ValueRowSerializer for Serializer<D> {
446    /// Serialize a row under the schema of the Serializer
447    fn serialize(&self, row: impl Row) -> Vec<u8> {
448        assert_eq!(row.len(), self.datum_num());
449        self.serialize_raw(row.iter())
450    }
451}
452
453/// A view of the encoded bytes, which can be iterated over to get the column id and data.
454/// Used for deserialization.
455// TODO: can we unify this with `RowEncoding`, which is for serialization?
456#[derive(Clone)]
457struct EncodedBytes<'a> {
458    header: Header,
459
460    // When iterating, we will consume `column_ids` and `offsets` while keep `data` unchanged.
461    // This is because we record absolute values in `offsets` to index into `data`.
462    column_ids: &'a [u8],
463    offsets: &'a [u8],
464    data: &'a [u8],
465}
466
467impl<'a> EncodedBytes<'a> {
468    fn new(mut encoded_bytes: &'a [u8]) -> Result<Self> {
469        let header = Header::from_bits(encoded_bytes.get_u8());
470        if !header.magic() {
471            return Err(ValueEncodingError::InvalidFlag(header.into_bits()));
472        }
473        let offset_bytes = header.offset().width();
474
475        let datum_num = encoded_bytes.get_u32_le() as usize;
476        let offsets_start_idx = 4 * datum_num;
477        let data_start_idx = offsets_start_idx + datum_num * offset_bytes;
478
479        Ok(EncodedBytes {
480            header,
481            column_ids: &encoded_bytes[..offsets_start_idx],
482            offsets: &encoded_bytes[offsets_start_idx..data_start_idx],
483            data: &encoded_bytes[data_start_idx..],
484        })
485    }
486}
487
488impl<'a> Iterator for EncodedBytes<'a> {
489    type Item = (i32, &'a [u8]);
490
491    fn next(&mut self) -> Option<Self::Item> {
492        if self.column_ids.is_empty() {
493            assert!(self.offsets.is_empty());
494            return None;
495        }
496
497        let id = self.column_ids.get_i32_le();
498
499        let offset_width = self.header.offset().width();
500        let get_offset = |offsets: &mut &[u8]| offsets.get_uint_le(offset_width) as usize;
501
502        let this_offset = get_offset(&mut self.offsets);
503        let next_offset = if self.offsets.is_empty() {
504            self.data.len()
505        } else {
506            let mut peek_offsets = self.offsets; // copy the reference to the slice to avoid mutating the buf position
507            get_offset(&mut peek_offsets)
508        };
509
510        let data = &self.data[this_offset..next_offset];
511
512        Some((id, data))
513    }
514}
515
516/// Column-Aware `Deserializer` holds needed `ColumnIds` and their corresponding schema
517/// Should non-null default values be specified, a new field could be added to Deserializer
518#[derive(Clone)]
519pub struct Deserializer<D: DataTypes = Arc<[DataType]>> {
520    mapping: ColumnMapping,
521    data_types: D,
522
523    /// A row with default values for each column.
524    ///
525    /// `None` if all default values are `NULL`, typically for struct fields.
526    default_row: Option<Vec<Datum>>,
527}
528
529/// A mapping from column id to the index of the column in the schema.
530#[derive(Clone)]
531enum ColumnMapping {
532    /// For small number of columns, use linear search with `SmallVec`. This ensures no heap allocation.
533    Small(SmallVec<[i32; COLUMN_ON_STACK]>),
534    /// For larger number of columns, build a `HashMap` for faster lookup.
535    Large(HashMap<i32, usize>),
536}
537
538impl ColumnMapping {
539    fn new(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> Self {
540        if column_ids.len() <= COLUMN_ON_STACK {
541            Self::Small(column_ids.map(|c| c.get_id()).collect())
542        } else {
543            Self::Large(
544                column_ids
545                    .enumerate()
546                    .map(|(i, c)| (c.get_id(), i))
547                    .collect(),
548            )
549        }
550    }
551
552    fn get(&self, id: i32) -> Option<usize> {
553        match self {
554            ColumnMapping::Small(vec) => vec.iter().position(|&x| x == id),
555            ColumnMapping::Large(map) => map.get(&id).copied(),
556        }
557    }
558}
559
560impl Deserializer {
561    pub fn new(
562        column_ids: &[ColumnId],
563        schema: Arc<[DataType]>,
564        column_with_default: impl Iterator<Item = (usize, Datum)>,
565    ) -> Self {
566        assert_eq!(column_ids.len(), schema.len());
567        let mut default_row: Vec<Datum> = vec![None; schema.len()];
568        for (i, datum) in column_with_default {
569            default_row[i] = datum;
570        }
571        Self {
572            mapping: ColumnMapping::new(column_ids.iter().copied()),
573            data_types: schema,
574            default_row: Some(default_row),
575        }
576    }
577}
578
579impl Deserializer<StructType> {
580    /// Create a new `Deserializer` for the fields of the given struct.
581    ///
582    /// Panic if the struct type does not have field ids.
583    pub fn from_struct(struct_type: StructType) -> Self {
584        Self {
585            mapping: ColumnMapping::new(struct_type.ids().unwrap()),
586            data_types: struct_type,
587            default_row: None,
588        }
589    }
590}
591
592impl<D: DataTypes> ValueRowDeserializer for Deserializer<D> {
593    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
594        let encoded_bytes = EncodedBytes::new(encoded_bytes)?;
595
596        let mut row =
597            (self.default_row.clone()).unwrap_or_else(|| vec![None; self.data_types.iter().len()]);
598
599        for (id, mut data) in encoded_bytes {
600            let Some(decoded_idx) = self.mapping.get(id) else {
601                continue;
602            };
603            let data_type = self.data_types.at(decoded_idx);
604
605            let datum = if data.is_empty() {
606                None
607            } else if data_type.can_alter() == Some(true) {
608                Some(new_serde::new_deserialize_scalar(data_type, &mut data)?)
609            } else {
610                Some(plain::deserialize_value(data_type, &mut data)?)
611            };
612
613            row[decoded_idx] = datum;
614        }
615
616        Ok(row)
617    }
618}
619
620/// Combined column-aware `Serializer` and `Deserializer` given the same
621/// `column_ids` and `schema`
622#[derive(Clone)]
623pub struct ColumnAwareSerde {
624    pub serializer: Serializer,
625    pub deserializer: Deserializer,
626}
627
628impl ValueRowSerializer for ColumnAwareSerde {
629    fn serialize(&self, row: impl Row) -> Vec<u8> {
630        self.serializer.serialize(row)
631    }
632}
633
634impl ValueRowDeserializer for ColumnAwareSerde {
635    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
636        self.deserializer.deserialize(encoded_bytes)
637    }
638}
639
640/// Deserializes row `encoded_bytes`, drops columns not in `valid_column_ids`, serializes and returns.
641/// If no column is dropped, returns None.
642// TODO: we only support trimming dropped top-level columns here; also support nested fields.
643pub fn try_drop_invalid_columns(
644    encoded_bytes: &[u8],
645    valid_column_ids: &HashSet<i32>,
646) -> Option<Vec<u8>> {
647    let encoded_bytes = EncodedBytes::new(encoded_bytes).ok()?;
648
649    let has_invalid_column = (encoded_bytes.clone()).any(|(id, _)| !valid_column_ids.contains(&id));
650    if !has_invalid_column {
651        return None;
652    }
653
654    // Slow path that drops columns. Should be rare.
655
656    let mut datums = Vec::with_capacity(valid_column_ids.len());
657    let mut column_ids = Vec::with_capacity(valid_column_ids.len());
658
659    for (id, data) in encoded_bytes {
660        if valid_column_ids.contains(&id) {
661            column_ids.push(ColumnId::new(id));
662            datums.push(if data.is_empty() { None } else { Some(data) });
663        }
664    }
665
666    // Data types are only needed when we are actually serializing. But we have encoded data here.
667    // Simple pass dummy data types.
668    let dummy_data_types = vec![DataType::Boolean; column_ids.len()];
669
670    let row_bytes = Serializer::new(&column_ids, dummy_data_types).serialize_raw(datums);
671    Some(row_bytes)
672}