risingwave_common/util/value_encoding/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Value encoding is an encoding format which converts the data into a binary form (not
16//! memcomparable, i.e., Key encoding).
17
18use bytes::{Buf, BufMut};
19use chrono::{Datelike, Timelike};
20use either::{Either, for_both};
21use enum_as_inner::EnumAsInner;
22use risingwave_pb::data::PbDatum;
23
24use crate::array::ArrayImpl;
25use crate::row::Row;
26use crate::types::*;
27
28pub mod error;
29use error::ValueEncodingError;
30
31use self::column_aware_row_encoding::ColumnAwareSerde;
32pub mod column_aware_row_encoding;
33
34pub use crate::row::RowDeserializer as BasicDeserializer;
35use crate::vector::{decode_vector_payload, encode_vector_payload};
36
37pub type Result<T> = std::result::Result<T, ValueEncodingError>;
38
39/// The kind of all possible `ValueRowSerde`.
40#[derive(EnumAsInner)]
41pub enum ValueRowSerdeKind {
42    /// For `BasicSerde`, the value is encoded with value-encoding.
43    Basic,
44    /// For `ColumnAwareSerde`, the value is encoded with column-aware row encoding.
45    ColumnAware,
46}
47
48/// Part of `ValueRowSerde` that implements `serialize` a `Row` into bytes
49pub trait ValueRowSerializer: Clone {
50    fn serialize(&self, row: impl Row) -> Vec<u8>;
51}
52
53/// Part of `ValueRowSerde` that implements `deserialize` bytes into a `Row`
54pub trait ValueRowDeserializer: Clone {
55    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>>;
56}
57
58/// The type-erased `ValueRowSerde`, used for simplifying the code.
59#[derive(Clone)]
60pub struct EitherSerde(pub Either<BasicSerde, ColumnAwareSerde>);
61
62impl From<BasicSerde> for EitherSerde {
63    fn from(value: BasicSerde) -> Self {
64        Self(Either::Left(value))
65    }
66}
67impl From<ColumnAwareSerde> for EitherSerde {
68    fn from(value: ColumnAwareSerde) -> Self {
69        Self(Either::Right(value))
70    }
71}
72
73impl ValueRowSerializer for EitherSerde {
74    fn serialize(&self, row: impl Row) -> Vec<u8> {
75        for_both!(&self.0, s => s.serialize(row))
76    }
77}
78
79impl ValueRowDeserializer for EitherSerde {
80    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
81        for_both!(&self.0, s => s.deserialize(encoded_bytes))
82    }
83}
84
85/// Wrap of the original `Row` serializing function
86#[derive(Clone)]
87pub struct BasicSerializer;
88
89impl ValueRowSerializer for BasicSerializer {
90    fn serialize(&self, row: impl Row) -> Vec<u8> {
91        let mut buf = vec![];
92        for datum in row.iter() {
93            serialize_datum_into(datum, &mut buf);
94        }
95        buf
96    }
97}
98
99impl ValueRowDeserializer for BasicDeserializer {
100    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
101        Ok(self.deserialize(encoded_bytes)?.into_inner().into())
102    }
103}
104
105/// Wrap of the original `Row` serializing and deserializing function
106#[derive(Clone)]
107pub struct BasicSerde {
108    pub serializer: BasicSerializer,
109    pub deserializer: BasicDeserializer,
110}
111
112impl ValueRowSerializer for BasicSerde {
113    fn serialize(&self, row: impl Row) -> Vec<u8> {
114        self.serializer.serialize(row)
115    }
116}
117
118impl ValueRowDeserializer for BasicSerde {
119    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
120        Ok(self
121            .deserializer
122            .deserialize(encoded_bytes)?
123            .into_inner()
124            .into())
125    }
126}
127
128pub fn try_get_exact_serialize_datum_size(arr: &ArrayImpl) -> Option<usize> {
129    match arr {
130        ArrayImpl::Int16(_) => Some(2),
131        ArrayImpl::Int32(_) => Some(4),
132        ArrayImpl::Int64(_) => Some(8),
133        ArrayImpl::Serial(_) => Some(8),
134        ArrayImpl::Float32(_) => Some(4),
135        ArrayImpl::Float64(_) => Some(8),
136        ArrayImpl::Bool(_) => Some(1),
137        ArrayImpl::Decimal(_) => Some(estimate_serialize_decimal_size()),
138        ArrayImpl::Interval(_) => Some(estimate_serialize_interval_size()),
139        ArrayImpl::Date(_) => Some(estimate_serialize_date_size()),
140        ArrayImpl::Timestamp(_) => Some(estimate_serialize_timestamp_size()),
141        ArrayImpl::Time(_) => Some(estimate_serialize_time_size()),
142        _ => None,
143    }
144    .map(|x| x + 1)
145}
146
147/// Serialize a datum into bytes and return (Not order guarantee, used in value encoding).
148pub fn serialize_datum(cell: impl ToDatumRef) -> Vec<u8> {
149    let mut buf: Vec<u8> = vec![];
150    serialize_datum_into(cell, &mut buf);
151    buf
152}
153
154/// Serialize a datum into bytes (Not order guarantee, used in value encoding).
155pub fn serialize_datum_into(datum_ref: impl ToDatumRef, buf: &mut impl BufMut) {
156    if let Some(d) = datum_ref.to_datum_ref() {
157        buf.put_u8(1);
158        serialize_scalar(d, buf)
159    } else {
160        buf.put_u8(0);
161    }
162}
163
164pub fn estimate_serialize_datum_size(datum_ref: impl ToDatumRef) -> usize {
165    if let Some(d) = datum_ref.to_datum_ref() {
166        1 + estimate_serialize_scalar_size(d)
167    } else {
168        1
169    }
170}
171
172#[easy_ext::ext(DatumFromProtoExt)]
173impl Datum {
174    /// Create a datum from the protobuf representation with the given data type.
175    pub fn from_protobuf(proto: &PbDatum, data_type: &DataType) -> Result<Datum> {
176        deserialize_datum(proto.body.as_slice(), data_type)
177    }
178}
179
180#[easy_ext::ext(DatumToProtoExt)]
181impl<D: ToDatumRef> D {
182    /// Convert the datum to the protobuf representation.
183    pub fn to_protobuf(&self) -> PbDatum {
184        PbDatum {
185            body: serialize_datum(self),
186        }
187    }
188}
189
190/// Deserialize bytes into a datum (Not order guarantee, used in value encoding).
191pub fn deserialize_datum(mut data: impl Buf, ty: &DataType) -> Result<Datum> {
192    inner_deserialize_datum(&mut data, ty)
193}
194
195// prevent recursive use of &mut
196#[inline(always)]
197fn inner_deserialize_datum(data: &mut impl Buf, ty: &DataType) -> Result<Datum> {
198    let null_tag = data.get_u8();
199    match null_tag {
200        0 => Ok(None),
201        1 => Some(deserialize_value(ty, data)).transpose(),
202        _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
203    }
204}
205
206fn serialize_scalar(value: ScalarRefImpl<'_>, buf: &mut impl BufMut) {
207    match value {
208        ScalarRefImpl::Int16(v) => buf.put_i16_le(v),
209        ScalarRefImpl::Int32(v) => buf.put_i32_le(v),
210        ScalarRefImpl::Int64(v) => buf.put_i64_le(v),
211        ScalarRefImpl::Int256(v) => buf.put_slice(&v.to_le_bytes()),
212        ScalarRefImpl::Serial(v) => buf.put_i64_le(v.into_inner()),
213        ScalarRefImpl::Float32(v) => buf.put_f32_le(v.into_inner()),
214        ScalarRefImpl::Float64(v) => buf.put_f64_le(v.into_inner()),
215        ScalarRefImpl::Utf8(v) => serialize_str(v.as_bytes(), buf),
216        ScalarRefImpl::Bytea(v) => serialize_str(v, buf),
217        ScalarRefImpl::Bool(v) => buf.put_u8(v as u8),
218        ScalarRefImpl::Decimal(v) => serialize_decimal(&v, buf),
219        ScalarRefImpl::Interval(v) => serialize_interval(&v, buf),
220        ScalarRefImpl::Date(v) => serialize_date(v.0.num_days_from_ce(), buf),
221        ScalarRefImpl::Timestamp(v) => serialize_timestamp(
222            v.0.and_utc().timestamp(),
223            v.0.and_utc().timestamp_subsec_nanos(),
224            buf,
225        ),
226        ScalarRefImpl::Timestamptz(v) => buf.put_i64_le(v.timestamp_micros()),
227        ScalarRefImpl::Time(v) => {
228            serialize_time(v.0.num_seconds_from_midnight(), v.0.nanosecond(), buf)
229        }
230        ScalarRefImpl::Jsonb(v) => serialize_str(&v.value_serialize(), buf),
231        ScalarRefImpl::Struct(s) => serialize_struct(s, buf),
232        ScalarRefImpl::List(v) => serialize_list(v, buf),
233        ScalarRefImpl::Map(m) => serialize_list(m.into_inner(), buf),
234        ScalarRefImpl::Vector(v) => serialize_vector(v, buf),
235    }
236}
237
238fn estimate_serialize_scalar_size(value: ScalarRefImpl<'_>) -> usize {
239    match value {
240        ScalarRefImpl::Int16(_) => 2,
241        ScalarRefImpl::Int32(_) => 4,
242        ScalarRefImpl::Int64(_) => 8,
243        ScalarRefImpl::Int256(_) => 32,
244        ScalarRefImpl::Serial(_) => 8,
245        ScalarRefImpl::Float32(_) => 4,
246        ScalarRefImpl::Float64(_) => 8,
247        ScalarRefImpl::Utf8(v) => estimate_serialize_str_size(v.as_bytes()),
248        ScalarRefImpl::Bytea(v) => estimate_serialize_str_size(v),
249        ScalarRefImpl::Bool(_) => 1,
250        ScalarRefImpl::Decimal(_) => estimate_serialize_decimal_size(),
251        ScalarRefImpl::Interval(_) => estimate_serialize_interval_size(),
252        ScalarRefImpl::Date(_) => estimate_serialize_date_size(),
253        ScalarRefImpl::Timestamp(_) => estimate_serialize_timestamp_size(),
254        ScalarRefImpl::Timestamptz(_) => 8,
255        ScalarRefImpl::Time(_) => estimate_serialize_time_size(),
256        // not exact as we use internal encoding size to estimate the json string size
257        ScalarRefImpl::Jsonb(v) => v.capacity(),
258        ScalarRefImpl::Struct(s) => estimate_serialize_struct_size(s),
259        ScalarRefImpl::List(v) => estimate_serialize_list_size(v),
260        ScalarRefImpl::Map(v) => estimate_serialize_list_size(v.into_inner()),
261        ScalarRefImpl::Vector(v) => estimate_serialize_vector_size(v),
262    }
263}
264
265fn serialize_struct(value: StructRef<'_>, buf: &mut impl BufMut) {
266    value.iter_fields_ref().for_each(|field_value| {
267        serialize_datum_into(field_value, buf);
268    });
269}
270
271fn estimate_serialize_struct_size(s: StructRef<'_>) -> usize {
272    s.estimate_serialize_size_inner()
273}
274fn serialize_list(value: ListRef<'_>, buf: &mut impl BufMut) {
275    let elems = value.iter();
276    buf.put_u32_le(elems.len() as u32);
277
278    elems.for_each(|field_value| {
279        serialize_datum_into(field_value, buf);
280    });
281}
282fn estimate_serialize_list_size(list: ListRef<'_>) -> usize {
283    4 + list.estimate_serialize_size_inner()
284}
285
286fn serialize_vector(value: VectorRef<'_>, buf: &mut impl BufMut) {
287    let elems = value.as_slice();
288    encode_vector_payload(elems, buf);
289}
290fn estimate_serialize_vector_size(v: VectorRef<'_>) -> usize {
291    size_of_val(v.as_slice())
292}
293
294fn serialize_str(bytes: &[u8], buf: &mut impl BufMut) {
295    buf.put_u32_le(bytes.len() as u32);
296    buf.put_slice(bytes);
297}
298
299fn estimate_serialize_str_size(bytes: &[u8]) -> usize {
300    4 + bytes.len()
301}
302
303fn serialize_interval(interval: &Interval, buf: &mut impl BufMut) {
304    buf.put_i32_le(interval.months());
305    buf.put_i32_le(interval.days());
306    buf.put_i64_le(interval.usecs());
307}
308
309fn estimate_serialize_interval_size() -> usize {
310    4 + 4 + 8
311}
312
313fn serialize_date(days: i32, buf: &mut impl BufMut) {
314    buf.put_i32_le(days);
315}
316
317fn estimate_serialize_date_size() -> usize {
318    4
319}
320
321fn serialize_timestamp(secs: i64, nsecs: u32, buf: &mut impl BufMut) {
322    buf.put_i64_le(secs);
323    buf.put_u32_le(nsecs);
324}
325
326fn estimate_serialize_timestamp_size() -> usize {
327    8 + 4
328}
329
330fn serialize_time(secs: u32, nano: u32, buf: &mut impl BufMut) {
331    buf.put_u32_le(secs);
332    buf.put_u32_le(nano);
333}
334
335fn estimate_serialize_time_size() -> usize {
336    4 + 4
337}
338
339fn serialize_decimal(decimal: &Decimal, buf: &mut impl BufMut) {
340    buf.put_slice(&decimal.unordered_serialize());
341}
342
343fn estimate_serialize_decimal_size() -> usize {
344    16
345}
346
347fn deserialize_value(ty: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
348    Ok(match ty {
349        DataType::Int16 => ScalarImpl::Int16(data.get_i16_le()),
350        DataType::Int32 => ScalarImpl::Int32(data.get_i32_le()),
351        DataType::Int64 => ScalarImpl::Int64(data.get_i64_le()),
352        DataType::Int256 => ScalarImpl::Int256(deserialize_int256(data)),
353        DataType::Serial => ScalarImpl::Serial(Serial::from(data.get_i64_le())),
354        DataType::Float32 => ScalarImpl::Float32(F32::from(data.get_f32_le())),
355        DataType::Float64 => ScalarImpl::Float64(F64::from(data.get_f64_le())),
356        DataType::Varchar => ScalarImpl::Utf8(deserialize_str(data)?),
357        DataType::Boolean => ScalarImpl::Bool(deserialize_bool(data)?),
358        DataType::Decimal => ScalarImpl::Decimal(deserialize_decimal(data)?),
359        DataType::Interval => ScalarImpl::Interval(deserialize_interval(data)?),
360        DataType::Time => ScalarImpl::Time(deserialize_time(data)?),
361        DataType::Timestamp => ScalarImpl::Timestamp(deserialize_timestamp(data)?),
362        DataType::Timestamptz => {
363            ScalarImpl::Timestamptz(Timestamptz::from_micros(data.get_i64_le()))
364        }
365        DataType::Date => ScalarImpl::Date(deserialize_date(data)?),
366        DataType::Jsonb => ScalarImpl::Jsonb(
367            JsonbVal::value_deserialize(&deserialize_bytea(data))
368                .ok_or(ValueEncodingError::InvalidJsonbEncoding)?,
369        ),
370        DataType::Struct(struct_def) => deserialize_struct(struct_def, data)?,
371        DataType::Bytea => ScalarImpl::Bytea(deserialize_bytea(data).into()),
372        DataType::Vector(dimension) => deserialize_vector(*dimension, data),
373        DataType::List(list_type) => deserialize_list(list_type, data)?,
374        DataType::Map(map_type) => deserialize_map(map_type, data)?,
375    })
376}
377
378fn deserialize_struct(struct_def: &StructType, data: &mut impl Buf) -> Result<ScalarImpl> {
379    let mut field_values = Vec::with_capacity(struct_def.len());
380    for field_type in struct_def.types() {
381        field_values.push(inner_deserialize_datum(data, field_type)?);
382    }
383
384    Ok(ScalarImpl::Struct(StructValue::new(field_values)))
385}
386
387fn deserialize_list(list_type: &ListType, data: &mut impl Buf) -> Result<ScalarImpl> {
388    let elem_type = list_type.elem();
389    let len = data.get_u32_le();
390    let mut builder = elem_type.create_array_builder(len as usize);
391    for _ in 0..len {
392        builder.append(inner_deserialize_datum(data, elem_type)?);
393    }
394    Ok(ScalarImpl::List(ListValue::new(builder.finish())))
395}
396
397fn deserialize_map(map_type: &MapType, data: &mut impl Buf) -> Result<ScalarImpl> {
398    // FIXME: clone type everytime here is inefficient
399    let list = deserialize_list(&map_type.clone().into_list_type(), data)?.into_list();
400    Ok(ScalarImpl::Map(MapValue::from_entries(list)))
401}
402
403fn deserialize_vector(dimension: usize, data: &mut impl Buf) -> ScalarImpl {
404    VectorVal {
405        inner: decode_vector_payload(dimension, data).into_boxed_slice(),
406    }
407    .to_scalar_value()
408}
409
410fn deserialize_str(data: &mut impl Buf) -> Result<Box<str>> {
411    let len = data.get_u32_le();
412    let mut bytes = vec![0; len as usize];
413    data.copy_to_slice(&mut bytes);
414    String::from_utf8(bytes)
415        .map(String::into_boxed_str)
416        .map_err(ValueEncodingError::InvalidUtf8)
417}
418
419fn deserialize_bytea(data: &mut impl Buf) -> Vec<u8> {
420    let len = data.get_u32_le();
421    let mut bytes = vec![0; len as usize];
422    data.copy_to_slice(&mut bytes);
423    bytes
424}
425
426fn deserialize_int256(data: &mut impl Buf) -> Int256 {
427    let mut bytes = [0; Int256::size()];
428    data.copy_to_slice(&mut bytes);
429    Int256::from_le_bytes(bytes)
430}
431
432fn deserialize_bool(data: &mut impl Buf) -> Result<bool> {
433    match data.get_u8() {
434        1 => Ok(true),
435        0 => Ok(false),
436        value => Err(ValueEncodingError::InvalidBoolEncoding(value)),
437    }
438}
439
440fn deserialize_interval(data: &mut impl Buf) -> Result<Interval> {
441    let months = data.get_i32_le();
442    let days = data.get_i32_le();
443    let usecs = data.get_i64_le();
444    Ok(Interval::from_month_day_usec(months, days, usecs))
445}
446
447fn deserialize_time(data: &mut impl Buf) -> Result<Time> {
448    let secs = data.get_u32_le();
449    let nano = data.get_u32_le();
450    Time::with_secs_nano(secs, nano)
451        .map_err(|_e| ValueEncodingError::InvalidTimeEncoding(secs, nano))
452}
453
454fn deserialize_timestamp(data: &mut impl Buf) -> Result<Timestamp> {
455    let secs = data.get_i64_le();
456    let nsecs = data.get_u32_le();
457    Timestamp::with_secs_nsecs(secs, nsecs)
458        .map_err(|_e| ValueEncodingError::InvalidTimestampEncoding(secs, nsecs))
459}
460
461fn deserialize_date(data: &mut impl Buf) -> Result<Date> {
462    let days = data.get_i32_le();
463    Date::with_days_since_ce(days).map_err(|_e| ValueEncodingError::InvalidDateEncoding(days))
464}
465
466fn deserialize_decimal(data: &mut impl Buf) -> Result<Decimal> {
467    let mut bytes = [0; 16];
468    data.copy_to_slice(&mut bytes);
469    Ok(Decimal::unordered_deserialize(bytes))
470}
471
472#[cfg(test)]
473mod tests {
474    use crate::array::{ArrayImpl, ListValue, StructValue};
475    use crate::test_utils::rand_chunk;
476    use crate::types::{
477        DataType, Date, Datum, Decimal, Interval, ScalarImpl, Serial, Time, Timestamp,
478    };
479    use crate::util::value_encoding::{
480        estimate_serialize_datum_size, serialize_datum, try_get_exact_serialize_datum_size,
481    };
482
483    fn test_estimate_serialize_scalar_size(s: ScalarImpl) {
484        let d = Datum::from(s);
485        assert_eq!(estimate_serialize_datum_size(&d), serialize_datum(&d).len());
486    }
487
488    fn test_try_get_exact_serialize_datum_size(s: &ArrayImpl) {
489        let d = s.to_datum();
490        if let Some(ret) = try_get_exact_serialize_datum_size(s) {
491            assert_eq!(ret, serialize_datum(&d).len());
492        }
493    }
494
495    #[test]
496    fn test_estimate_size() {
497        let d: Datum = None;
498        assert_eq!(estimate_serialize_datum_size(&d), serialize_datum(&d).len());
499
500        test_estimate_serialize_scalar_size(ScalarImpl::Bool(true));
501        test_estimate_serialize_scalar_size(ScalarImpl::Int16(1));
502        test_estimate_serialize_scalar_size(ScalarImpl::Int32(1));
503        test_estimate_serialize_scalar_size(ScalarImpl::Int64(1));
504        test_estimate_serialize_scalar_size(ScalarImpl::Float32(1.0.into()));
505        test_estimate_serialize_scalar_size(ScalarImpl::Float64(1.0.into()));
506        test_estimate_serialize_scalar_size(ScalarImpl::Serial(Serial::from(i64::MIN)));
507
508        test_estimate_serialize_scalar_size(ScalarImpl::Utf8("abc".into()));
509        test_estimate_serialize_scalar_size(ScalarImpl::Utf8("".into()));
510        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::NegativeInf));
511        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::PositiveInf));
512        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::NaN));
513        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(123123.into()));
514        test_estimate_serialize_scalar_size(ScalarImpl::Interval(Interval::from_month_day_usec(
515            7, 8, 9,
516        )));
517        test_estimate_serialize_scalar_size(ScalarImpl::Date(Date::from_ymd_uncheck(2333, 3, 3)));
518        test_estimate_serialize_scalar_size(ScalarImpl::Bytea("\\x233".as_bytes().into()));
519        test_estimate_serialize_scalar_size(ScalarImpl::Time(Time::from_hms_uncheck(2, 3, 3)));
520        test_estimate_serialize_scalar_size(ScalarImpl::Timestamp(
521            Timestamp::from_timestamp_uncheck(23333333, 2333),
522        ));
523        test_estimate_serialize_scalar_size(ScalarImpl::Interval(Interval::from_month_day_usec(
524            2, 3, 3333,
525        )));
526        test_estimate_serialize_scalar_size(ScalarImpl::Struct(StructValue::new(vec![
527            ScalarImpl::Int64(233).into(),
528            ScalarImpl::Float64(23.33.into()).into(),
529        ])));
530        test_estimate_serialize_scalar_size(ScalarImpl::List(ListValue::from_iter([233i64, 2333])));
531    }
532
533    #[test]
534    fn test_try_estimate_size() {
535        let chunk = rand_chunk::gen_chunk(
536            &[
537                DataType::Int16,
538                DataType::Int32,
539                DataType::Int64,
540                DataType::Serial,
541                DataType::Float32,
542                DataType::Float64,
543                DataType::Boolean,
544                DataType::Decimal,
545                DataType::Interval,
546                DataType::Time,
547                DataType::Timestamp,
548                DataType::Date,
549            ],
550            1,
551            0,
552            0.0,
553        );
554        for column in chunk.columns() {
555            test_try_get_exact_serialize_datum_size(column);
556        }
557    }
558}