risingwave_common/util/value_encoding/
column_aware_row_encoding.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Column-aware row encoding is an encoding format which converts row into a binary form that
16//! remains explanable after schema changes
17//! Current design of flag just contains 1 meaningful information: the 2 LSBs represents
18//! the size of offsets: `u8`/`u16`/`u32`
19//! We have a `Serializer` and a `Deserializer` for each schema of `Row`, which can be reused
20//! until schema changes
21
22use std::collections::HashSet;
23use std::sync::Arc;
24
25use ahash::HashMap;
26use bitfield_struct::bitfield;
27use bytes::{Buf, BufMut};
28use rw_iter_util::ZipEqDebug;
29use smallvec::{SmallVec, smallvec};
30
31use super::error::ValueEncodingError;
32use super::{Result, ValueRowDeserializer, ValueRowSerializer};
33use crate::catalog::ColumnId;
34use crate::row::Row;
35use crate::types::{DataType, Datum, ScalarRefImpl, StructType, ToDatumRef};
36use crate::util::value_encoding as plain;
37
38/// Serialize and deserialize functions that recursively use [`ColumnAwareSerde`] for nested fields
39/// of composite types.
40///
41/// Only for column with types that can be altered ([`DataType::can_alter`]).
42mod new_serde {
43    use super::*;
44    use crate::array::{ListRef, ListValue, MapRef, MapValue, StructRef, StructValue};
45    use crate::types::{MapType, ScalarImpl, StructType, data_types};
46
47    // --- serialize ---
48
49    // Copy of `plain` but call `new_` for the scalar.
50    fn new_serialize_datum(
51        data_type: &DataType,
52        datum_ref: impl ToDatumRef,
53        buf: &mut impl BufMut,
54    ) {
55        if let Some(d) = datum_ref.to_datum_ref() {
56            buf.put_u8(1);
57            new_serialize_scalar(data_type, d, buf)
58        } else {
59            buf.put_u8(0);
60        }
61    }
62
63    // Different logic from `plain`:
64    //
65    // Recursively construct a new column-aware `Serializer` for nested fields.
66    fn new_serialize_struct(struct_type: &StructType, value: StructRef<'_>, buf: &mut impl BufMut) {
67        let serializer = super::Serializer::from_struct(struct_type.clone()); // cloning `StructType` is lightweight
68
69        // `StructRef` is interpreted as a `Row` here.
70        let bytes = serializer.serialize(value); // TODO: serialize into the buf directly if we can reserve space accurately
71        buf.put_u32_le(bytes.len() as _);
72        buf.put_slice(&bytes);
73    }
74
75    // Copy of `plain` but call `new_` for the elements.
76    fn new_serialize_list(inner_type: &DataType, value: ListRef<'_>, buf: &mut impl BufMut) {
77        let elems = value.iter();
78        buf.put_u32_le(elems.len() as u32);
79
80        elems.for_each(|field_value| {
81            new_serialize_datum(inner_type, field_value, buf);
82        });
83    }
84
85    // Different logic from `plain`:
86    //
87    // We don't reuse the serialization of `struct<k, v>[]` for map, as it introduces overhead
88    // for column ids of `struct<k, v>`. It's unnecessary because the shape of the map is fixed.
89    fn new_serialize_map(map_type: &MapType, value: MapRef<'_>, buf: &mut impl BufMut) {
90        let elems = value.iter();
91        buf.put_u32_le(elems.len() as u32);
92
93        elems.for_each(|(k, v)| {
94            new_serialize_scalar(map_type.key(), k, buf);
95            new_serialize_datum(map_type.value(), v, buf);
96        });
97    }
98
99    // Copy of `plain` but call `new_` for composite types.
100    pub fn new_serialize_scalar(
101        data_type: &DataType,
102        value: ScalarRefImpl<'_>,
103        buf: &mut impl BufMut,
104    ) {
105        match value {
106            ScalarRefImpl::Struct(s) => new_serialize_struct(data_type.as_struct(), s, buf),
107            ScalarRefImpl::List(l) => new_serialize_list(data_type.as_list(), l, buf),
108            ScalarRefImpl::Map(m) => new_serialize_map(data_type.as_map(), m, buf),
109
110            _ => plain::serialize_scalar(value, buf),
111        }
112    }
113
114    // --- deserialize ---
115
116    // Copy of `plain` but call `new_` for the scalar.
117    fn new_inner_deserialize_datum(data: &mut &[u8], ty: &DataType) -> Result<Datum> {
118        let null_tag = data.get_u8();
119        match null_tag {
120            0 => Ok(None),
121            1 => Some(new_deserialize_scalar(ty, data)).transpose(),
122            _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
123        }
124    }
125
126    // Different logic from `plain`:
127    //
128    // Recursively construct a new column-aware `Deserializer` for nested fields.
129    fn new_deserialize_struct(struct_def: &StructType, data: &mut &[u8]) -> Result<ScalarImpl> {
130        let deserializer = super::Deserializer::from_struct(struct_def.clone()); // cloning `StructType` is lightweight
131        let encoded_len = data.get_u32_le() as usize;
132
133        let (struct_data, remaining) = data.split_at(encoded_len);
134        *data = remaining;
135        let fields = deserializer.deserialize(struct_data)?;
136
137        Ok(ScalarImpl::Struct(StructValue::new(fields)))
138    }
139
140    // Copy of `plain` but call `new_` for the elements.
141    fn new_deserialize_list(item_type: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
142        let len = data.get_u32_le();
143        let mut builder = item_type.create_array_builder(len as usize);
144        for _ in 0..len {
145            builder.append(new_inner_deserialize_datum(data, item_type)?);
146        }
147        Ok(ScalarImpl::List(ListValue::new(builder.finish())))
148    }
149
150    // Different logic from `plain`:
151    //
152    // We don't reuse the deserialization of `struct<k, v>[]` for map, as it introduces overhead
153    // for column ids of `struct<k, v>`. It's unnecessary because the shape of the map is fixed.
154    fn new_deserialize_map(map_type: &MapType, data: &mut &[u8]) -> Result<ScalarImpl> {
155        let len = data.get_u32_le();
156        let mut builder = map_type
157            .clone() // FIXME: clone type everytime here is inefficient
158            .into_struct()
159            .create_array_builder(len as usize);
160        for _ in 0..len {
161            let key = new_deserialize_scalar(map_type.key(), data)?;
162            let value = new_inner_deserialize_datum(data, map_type.value())?;
163            let entry = StructValue::new(vec![Some(key), value]);
164            builder.append(Some(ScalarImpl::Struct(entry)));
165        }
166        Ok(ScalarImpl::Map(MapValue::from_entries(ListValue::new(
167            builder.finish(),
168        ))))
169    }
170
171    // Copy of `plain` but call `new_` for composite types.
172    pub fn new_deserialize_scalar(ty: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
173        Ok(match ty {
174            DataType::Struct(struct_def) => new_deserialize_struct(struct_def, data)?,
175            DataType::List(item_type) => new_deserialize_list(item_type, data)?,
176            DataType::Map(map_type) => new_deserialize_map(map_type, data)?,
177
178            data_types::simple!() => plain::deserialize_value(ty, data)?,
179        })
180    }
181}
182
183/// When a row has columns no more than this number, we will use stack for some intermediate buffers.
184const COLUMN_ON_STACK: usize = 8;
185
186/// The width of the offset of the encoded data, i.e., how many bytes are used to represent the offset.
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188#[repr(u8)]
189enum OffsetWidth {
190    /// The offset of encoded data can be represented by u8.
191    Offset8 = 0b01,
192    /// The offset of encoded data can be represented by u16.
193    Offset16 = 0b10,
194    /// The offset of encoded data can be represented by u32.
195    Offset32 = 0b11,
196}
197
198impl OffsetWidth {
199    /// Get the width of the offset in bytes.
200    const fn width(self) -> usize {
201        match self {
202            OffsetWidth::Offset8 => 1,
203            OffsetWidth::Offset16 => 2,
204            OffsetWidth::Offset32 => 4,
205        }
206    }
207
208    const fn into_bits(self) -> u8 {
209        self as u8
210    }
211
212    const fn from_bits(bits: u8) -> Self {
213        match bits {
214            0b01 => OffsetWidth::Offset8,
215            0b10 => OffsetWidth::Offset16,
216            0b11 => OffsetWidth::Offset32,
217            _ => panic!("invalid offset width bits"),
218        }
219    }
220}
221
222/// Header (metadata) of the encoded row.
223///
224/// Layout (most to least significant bits):
225///
226/// ```text
227/// | magic | reserved | offset |
228/// |   1   |    5     |   2    |
229/// ```
230#[bitfield(u8, order = Msb)]
231#[derive(PartialEq, Eq)]
232struct Header {
233    /// Magic bit to indicate it's column-aware encoding.
234    /// Note that in plain value encoding, the first byte is always 0 or 1 for nullability,
235    /// of which the most significant bit is always 0.
236    #[bits(1, default = true, access = RO)]
237    magic: bool,
238
239    #[bits(5)]
240    _reserved: u8,
241
242    /// Indicate the offset width of the encoded data.
243    #[bits(2, default = OffsetWidth::Offset8)]
244    offset: OffsetWidth,
245}
246
247/// `RowEncoding` holds row-specific information for Column-Aware Encoding
248struct RowEncoding {
249    header: Header,
250    offsets: SmallVec<[u8; COLUMN_ON_STACK * 2]>,
251    data: Vec<u8>,
252}
253
254/// A trait unifying [`ToDatumRef`] and already encoded bytes.
255trait Encode {
256    fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>);
257}
258
259impl<T> Encode for T
260where
261    T: ToDatumRef,
262{
263    fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>) {
264        if let Some(v) = self.to_datum_ref() {
265            // Use different encoding logic for alterable types.
266            if data_type.can_alter() == Some(true) {
267                new_serde::new_serialize_scalar(data_type, v, data);
268            } else {
269                plain::serialize_scalar(v, data);
270            }
271        }
272    }
273}
274
275impl Encode for Option<&[u8]> {
276    fn encode_to(self, _data_type: &DataType, data: &mut Vec<u8>) {
277        // Already encoded, just copy the bytes.
278        if let Some(v) = self {
279            data.extend(v);
280        }
281    }
282}
283
284impl RowEncoding {
285    fn new() -> Self {
286        RowEncoding {
287            header: Header::new(),
288            offsets: Default::default(),
289            data: Default::default(),
290        }
291    }
292
293    fn set_offsets(&mut self, usize_offsets: &[usize]) {
294        debug_assert!(
295            self.offsets.is_empty(),
296            "should not set offsets multiple times"
297        );
298
299        // Use 0 if there's no data.
300        let max_offset = usize_offsets.last().copied().unwrap_or(0);
301
302        let offset_width = match max_offset {
303            _n @ ..=const { u8::MAX as usize } => OffsetWidth::Offset8,
304            _n @ ..=const { u16::MAX as usize } => OffsetWidth::Offset16,
305            _n @ ..=const { u32::MAX as usize } => OffsetWidth::Offset32,
306            _ => panic!("encoding length {} exceeds u32", max_offset),
307        };
308        self.header.set_offset(offset_width);
309
310        self.offsets
311            .resize(usize_offsets.len() * offset_width.width(), 0);
312
313        let mut offsets_buf = &mut self.offsets[..];
314        for &offset in usize_offsets {
315            offsets_buf.put_uint_le(offset as u64, offset_width.width());
316        }
317    }
318
319    fn encode<T: Encode>(
320        &mut self,
321        datums: impl IntoIterator<Item = T>,
322        data_types: impl IntoIterator<Item = &DataType>,
323    ) {
324        debug_assert!(
325            self.data.is_empty(),
326            "should not encode one RowEncoding object multiple times."
327        );
328        let datums = datums.into_iter();
329        let mut offset_usize =
330            SmallVec::<[usize; COLUMN_ON_STACK]>::with_capacity(datums.size_hint().0);
331        for (datum, data_type) in datums.zip_eq_debug(data_types) {
332            offset_usize.push(self.data.len());
333            datum.encode_to(data_type, &mut self.data);
334        }
335        self.set_offsets(&offset_usize);
336    }
337}
338
339mod data_types {
340    use crate::types::{DataType, StructType};
341
342    /// A trait unifying data types of a row and field types of a struct.
343    pub trait DataTypes: Clone {
344        fn iter(&self) -> impl ExactSizeIterator<Item = &DataType>;
345        fn at(&self, index: usize) -> &DataType;
346    }
347
348    impl<T> DataTypes for T
349    where
350        T: AsRef<[DataType]> + Clone,
351    {
352        fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
353            self.as_ref().iter()
354        }
355
356        fn at(&self, index: usize) -> &DataType {
357            &self.as_ref()[index]
358        }
359    }
360
361    impl DataTypes for StructType {
362        fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
363            self.types()
364        }
365
366        fn at(&self, index: usize) -> &DataType {
367            self.type_at(index)
368        }
369    }
370}
371use data_types::DataTypes;
372
373/// Column-Aware `Serializer` holds schema related information, and shall be
374/// created again once the schema changes
375#[derive(Clone)]
376pub struct Serializer<D: DataTypes = Vec<DataType>> {
377    encoded_column_ids: EncodedColumnIds,
378    data_types: D,
379}
380
381type EncodedColumnIds = SmallVec<[u8; COLUMN_ON_STACK * 4]>;
382
383fn encode_column_ids(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> EncodedColumnIds {
384    // currently we hard-code ColumnId as i32
385    let mut encoded_column_ids = smallvec![0; column_ids.len() * 4];
386    let mut buf = &mut encoded_column_ids[..];
387    for id in column_ids {
388        buf.put_i32_le(id.get_id());
389    }
390    encoded_column_ids
391}
392
393impl Serializer {
394    /// Create a new `Serializer` with given `column_ids` and `data_types`.
395    pub fn new(column_ids: &[ColumnId], data_types: impl IntoIterator<Item = DataType>) -> Self {
396        Self {
397            encoded_column_ids: encode_column_ids(column_ids.iter().copied()),
398            data_types: data_types.into_iter().collect(),
399        }
400    }
401}
402
403impl Serializer<StructType> {
404    /// Create a new `Serializer` for the fields of the given struct.
405    ///
406    /// Panic if the struct type does not have field ids.
407    pub fn from_struct(struct_type: StructType) -> Self {
408        Self {
409            encoded_column_ids: encode_column_ids(struct_type.ids().unwrap()),
410            data_types: struct_type,
411        }
412    }
413}
414
415impl<D: DataTypes> Serializer<D> {
416    fn datum_num(&self) -> usize {
417        self.encoded_column_ids.len() / 4
418    }
419
420    fn serialize_raw<T: Encode>(&self, datums: impl IntoIterator<Item = T>) -> Vec<u8> {
421        let mut encoding = RowEncoding::new();
422        encoding.encode(datums, self.data_types.iter());
423        self.finish(encoding)
424    }
425
426    fn finish(&self, encoding: RowEncoding) -> Vec<u8> {
427        let mut row_bytes = Vec::with_capacity(
428            5 + self.encoded_column_ids.len() + encoding.offsets.len() + encoding.data.len(), /* 5 comes from u8+u32 */
429        );
430        row_bytes.put_u8(encoding.header.into_bits());
431        row_bytes.put_u32_le(self.datum_num() as u32);
432        row_bytes.extend(&self.encoded_column_ids);
433        row_bytes.extend(&encoding.offsets);
434        row_bytes.extend(&encoding.data);
435
436        row_bytes
437    }
438}
439
440impl<D: DataTypes> ValueRowSerializer for Serializer<D> {
441    /// Serialize a row under the schema of the Serializer
442    fn serialize(&self, row: impl Row) -> Vec<u8> {
443        assert_eq!(row.len(), self.datum_num());
444        self.serialize_raw(row.iter())
445    }
446}
447
448/// A view of the encoded bytes, which can be iterated over to get the column id and data.
449/// Used for deserialization.
450// TODO: can we unify this with `RowEncoding`, which is for serialization?
451#[derive(Clone)]
452struct EncodedBytes<'a> {
453    header: Header,
454
455    // When iterating, we will consume `column_ids` and `offsets` while keep `data` unchanged.
456    // This is because we record absolute values in `offsets` to index into `data`.
457    column_ids: &'a [u8],
458    offsets: &'a [u8],
459    data: &'a [u8],
460}
461
462impl<'a> EncodedBytes<'a> {
463    fn new(mut encoded_bytes: &'a [u8]) -> Result<Self> {
464        let header = Header::from_bits(encoded_bytes.get_u8());
465        if !header.magic() {
466            return Err(ValueEncodingError::InvalidFlag(header.into_bits()));
467        }
468        let offset_bytes = header.offset().width();
469
470        let datum_num = encoded_bytes.get_u32_le() as usize;
471        let offsets_start_idx = 4 * datum_num;
472        let data_start_idx = offsets_start_idx + datum_num * offset_bytes;
473
474        Ok(EncodedBytes {
475            header,
476            column_ids: &encoded_bytes[..offsets_start_idx],
477            offsets: &encoded_bytes[offsets_start_idx..data_start_idx],
478            data: &encoded_bytes[data_start_idx..],
479        })
480    }
481}
482
483impl<'a> Iterator for EncodedBytes<'a> {
484    type Item = (i32, &'a [u8]);
485
486    fn next(&mut self) -> Option<Self::Item> {
487        if self.column_ids.is_empty() {
488            assert!(self.offsets.is_empty());
489            return None;
490        }
491
492        let id = self.column_ids.get_i32_le();
493
494        let offset_width = self.header.offset().width();
495        let get_offset = |offsets: &mut &[u8]| offsets.get_uint_le(offset_width) as usize;
496
497        let this_offset = get_offset(&mut self.offsets);
498        let next_offset = if self.offsets.is_empty() {
499            self.data.len()
500        } else {
501            let mut peek_offsets = self.offsets; // copy the reference to the slice to avoid mutating the buf position
502            get_offset(&mut peek_offsets)
503        };
504
505        let data = &self.data[this_offset..next_offset];
506
507        Some((id, data))
508    }
509}
510
511/// Column-Aware `Deserializer` holds needed `ColumnIds` and their corresponding schema
512/// Should non-null default values be specified, a new field could be added to Deserializer
513#[derive(Clone)]
514pub struct Deserializer<D: DataTypes = Arc<[DataType]>> {
515    mapping: ColumnMapping,
516    data_types: D,
517
518    /// A row with default values for each column.
519    ///
520    /// `None` if all default values are `NULL`, typically for struct fields.
521    default_row: Option<Vec<Datum>>,
522}
523
524/// A mapping from column id to the index of the column in the schema.
525#[derive(Clone)]
526enum ColumnMapping {
527    /// For small number of columns, use linear search with `SmallVec`. This ensures no heap allocation.
528    Small(SmallVec<[i32; COLUMN_ON_STACK]>),
529    /// For larger number of columns, build a `HashMap` for faster lookup.
530    Large(HashMap<i32, usize>),
531}
532
533impl ColumnMapping {
534    fn new(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> Self {
535        if column_ids.len() <= COLUMN_ON_STACK {
536            Self::Small(column_ids.map(|c| c.get_id()).collect())
537        } else {
538            Self::Large(
539                column_ids
540                    .enumerate()
541                    .map(|(i, c)| (c.get_id(), i))
542                    .collect(),
543            )
544        }
545    }
546
547    fn get(&self, id: i32) -> Option<usize> {
548        match self {
549            ColumnMapping::Small(vec) => vec.iter().position(|&x| x == id),
550            ColumnMapping::Large(map) => map.get(&id).copied(),
551        }
552    }
553}
554
555impl Deserializer {
556    pub fn new(
557        column_ids: &[ColumnId],
558        schema: Arc<[DataType]>,
559        column_with_default: impl Iterator<Item = (usize, Datum)>,
560    ) -> Self {
561        assert_eq!(column_ids.len(), schema.len());
562        let mut default_row: Vec<Datum> = vec![None; schema.len()];
563        for (i, datum) in column_with_default {
564            default_row[i] = datum;
565        }
566        Self {
567            mapping: ColumnMapping::new(column_ids.iter().copied()),
568            data_types: schema,
569            default_row: Some(default_row),
570        }
571    }
572}
573
574impl Deserializer<StructType> {
575    /// Create a new `Deserializer` for the fields of the given struct.
576    ///
577    /// Panic if the struct type does not have field ids.
578    pub fn from_struct(struct_type: StructType) -> Self {
579        Self {
580            mapping: ColumnMapping::new(struct_type.ids().unwrap()),
581            data_types: struct_type,
582            default_row: None,
583        }
584    }
585}
586
587impl<D: DataTypes> ValueRowDeserializer for Deserializer<D> {
588    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
589        let encoded_bytes = EncodedBytes::new(encoded_bytes)?;
590
591        let mut row =
592            (self.default_row.clone()).unwrap_or_else(|| vec![None; self.data_types.iter().len()]);
593
594        for (id, mut data) in encoded_bytes {
595            let Some(decoded_idx) = self.mapping.get(id) else {
596                continue;
597            };
598            let data_type = self.data_types.at(decoded_idx);
599
600            let datum = if data.is_empty() {
601                None
602            } else if data_type.can_alter() == Some(true) {
603                Some(new_serde::new_deserialize_scalar(data_type, &mut data)?)
604            } else {
605                Some(plain::deserialize_value(data_type, &mut data)?)
606            };
607
608            row[decoded_idx] = datum;
609        }
610
611        Ok(row)
612    }
613}
614
615/// Combined column-aware `Serializer` and `Deserializer` given the same
616/// `column_ids` and `schema`
617#[derive(Clone)]
618pub struct ColumnAwareSerde {
619    pub serializer: Serializer,
620    pub deserializer: Deserializer,
621}
622
623impl ValueRowSerializer for ColumnAwareSerde {
624    fn serialize(&self, row: impl Row) -> Vec<u8> {
625        self.serializer.serialize(row)
626    }
627}
628
629impl ValueRowDeserializer for ColumnAwareSerde {
630    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
631        self.deserializer.deserialize(encoded_bytes)
632    }
633}
634
635/// Deserializes row `encoded_bytes`, drops columns not in `valid_column_ids`, serializes and returns.
636/// If no column is dropped, returns None.
637// TODO: we only support trimming dropped top-level columns here; also support nested fields.
638pub fn try_drop_invalid_columns(
639    encoded_bytes: &[u8],
640    valid_column_ids: &HashSet<i32>,
641) -> Option<Vec<u8>> {
642    let encoded_bytes = EncodedBytes::new(encoded_bytes).ok()?;
643
644    let has_invalid_column = (encoded_bytes.clone()).any(|(id, _)| !valid_column_ids.contains(&id));
645    if !has_invalid_column {
646        return None;
647    }
648
649    // Slow path that drops columns. Should be rare.
650
651    let mut datums = Vec::with_capacity(valid_column_ids.len());
652    let mut column_ids = Vec::with_capacity(valid_column_ids.len());
653
654    for (id, data) in encoded_bytes {
655        if valid_column_ids.contains(&id) {
656            column_ids.push(ColumnId::new(id));
657            datums.push(if data.is_empty() { None } else { Some(data) });
658        }
659    }
660
661    // Data types are only needed when we are actually serializing. But we have encoded data here.
662    // Simple pass dummy data types.
663    let dummy_data_types = vec![DataType::Boolean; column_ids.len()];
664
665    let row_bytes = Serializer::new(&column_ids, dummy_data_types).serialize_raw(datums);
666    Some(row_bytes)
667}