risingwave_common/util/value_encoding/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Value encoding is an encoding format which converts the data into a binary form (not
16//! memcomparable, i.e., Key encoding).
17
18use bytes::{Buf, BufMut};
19use chrono::{Datelike, Timelike};
20use either::{Either, for_both};
21use enum_as_inner::EnumAsInner;
22use risingwave_pb::data::PbDatum;
23
24use crate::array::ArrayImpl;
25use crate::row::Row;
26use crate::types::*;
27
28pub mod error;
29use error::ValueEncodingError;
30
31use self::column_aware_row_encoding::ColumnAwareSerde;
32pub mod column_aware_row_encoding;
33
34pub use crate::row::RowDeserializer as BasicDeserializer;
35use crate::vector::{decode_vector_payload, encode_vector_payload};
36
37pub type Result<T> = std::result::Result<T, ValueEncodingError>;
38
39/// The kind of all possible `ValueRowSerde`.
40#[derive(EnumAsInner)]
41pub enum ValueRowSerdeKind {
42    /// For `BasicSerde`, the value is encoded with value-encoding.
43    Basic,
44    /// For `ColumnAwareSerde`, the value is encoded with column-aware row encoding.
45    ColumnAware,
46}
47
48/// Part of `ValueRowSerde` that implements `serialize` a `Row` into bytes
49pub trait ValueRowSerializer: Clone {
50    fn serialize(&self, row: impl Row) -> Vec<u8>;
51}
52
53/// Part of `ValueRowSerde` that implements `deserialize` bytes into a `Row`
54pub trait ValueRowDeserializer: Clone {
55    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>>;
56}
57
58/// The type-erased `ValueRowSerde`, used for simplifying the code.
59#[derive(Clone)]
60pub struct EitherSerde(pub Either<BasicSerde, ColumnAwareSerde>);
61
62impl From<BasicSerde> for EitherSerde {
63    fn from(value: BasicSerde) -> Self {
64        Self(Either::Left(value))
65    }
66}
67impl From<ColumnAwareSerde> for EitherSerde {
68    fn from(value: ColumnAwareSerde) -> Self {
69        Self(Either::Right(value))
70    }
71}
72
73impl ValueRowSerializer for EitherSerde {
74    fn serialize(&self, row: impl Row) -> Vec<u8> {
75        for_both!(&self.0, s => s.serialize(row))
76    }
77}
78
79impl ValueRowDeserializer for EitherSerde {
80    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
81        for_both!(&self.0, s => s.deserialize(encoded_bytes))
82    }
83}
84
85/// Wrap of the original `Row` serializing function
86#[derive(Clone)]
87pub struct BasicSerializer;
88
89impl ValueRowSerializer for BasicSerializer {
90    fn serialize(&self, row: impl Row) -> Vec<u8> {
91        let mut buf = vec![];
92        for datum in row.iter() {
93            serialize_datum_into(datum, &mut buf);
94        }
95        buf
96    }
97}
98
99impl ValueRowDeserializer for BasicDeserializer {
100    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
101        Ok(self.deserialize(encoded_bytes)?.into_inner().into())
102    }
103}
104
105/// Wrap of the original `Row` serializing and deserializing function
106#[derive(Clone)]
107pub struct BasicSerde {
108    pub serializer: BasicSerializer,
109    pub deserializer: BasicDeserializer,
110}
111
112impl ValueRowSerializer for BasicSerde {
113    fn serialize(&self, row: impl Row) -> Vec<u8> {
114        self.serializer.serialize(row)
115    }
116}
117
118impl ValueRowDeserializer for BasicSerde {
119    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
120        Ok(self
121            .deserializer
122            .deserialize(encoded_bytes)?
123            .into_inner()
124            .into())
125    }
126}
127
128pub fn try_get_exact_serialize_datum_size(arr: &ArrayImpl) -> Option<usize> {
129    match arr {
130        ArrayImpl::Int16(_) => Some(2),
131        ArrayImpl::Int32(_) => Some(4),
132        ArrayImpl::Int64(_) => Some(8),
133        ArrayImpl::Serial(_) => Some(8),
134        ArrayImpl::Float32(_) => Some(4),
135        ArrayImpl::Float64(_) => Some(8),
136        ArrayImpl::Bool(_) => Some(1),
137        ArrayImpl::Decimal(_) => Some(estimate_serialize_decimal_size()),
138        ArrayImpl::Interval(_) => Some(estimate_serialize_interval_size()),
139        ArrayImpl::Date(_) => Some(estimate_serialize_date_size()),
140        ArrayImpl::Timestamp(_) => Some(estimate_serialize_timestamp_size()),
141        ArrayImpl::Time(_) => Some(estimate_serialize_time_size()),
142        _ => None,
143    }
144    .map(|x| x + 1)
145}
146
147/// Serialize a datum into bytes and return (Not order guarantee, used in value encoding).
148pub fn serialize_datum(cell: impl ToDatumRef) -> Vec<u8> {
149    let mut buf: Vec<u8> = vec![];
150    serialize_datum_into(cell, &mut buf);
151    buf
152}
153
154/// Serialize a datum into bytes (Not order guarantee, used in value encoding).
155pub fn serialize_datum_into(datum_ref: impl ToDatumRef, buf: &mut impl BufMut) {
156    if let Some(d) = datum_ref.to_datum_ref() {
157        buf.put_u8(1);
158        serialize_scalar(d, buf)
159    } else {
160        buf.put_u8(0);
161    }
162}
163
164pub fn estimate_serialize_datum_size(datum_ref: impl ToDatumRef) -> usize {
165    if let Some(d) = datum_ref.to_datum_ref() {
166        1 + estimate_serialize_scalar_size(d)
167    } else {
168        1
169    }
170}
171
172#[easy_ext::ext(DatumFromProtoExt)]
173impl Datum {
174    /// Create a datum from the protobuf representation with the given data type.
175    pub fn from_protobuf(proto: &PbDatum, data_type: &DataType) -> Result<Datum> {
176        deserialize_datum(proto.body.as_slice(), data_type)
177    }
178}
179
180#[easy_ext::ext(DatumToProtoExt)]
181impl<D: ToDatumRef> D {
182    /// Convert the datum to the protobuf representation.
183    pub fn to_protobuf(&self) -> PbDatum {
184        PbDatum {
185            body: serialize_datum(self),
186        }
187    }
188}
189
190/// Deserialize bytes into a datum (Not order guarantee, used in value encoding).
191pub fn deserialize_datum(mut data: impl Buf, ty: &DataType) -> Result<Datum> {
192    inner_deserialize_datum(&mut data, ty)
193}
194
195// prevent recursive use of &mut
196#[inline(always)]
197fn inner_deserialize_datum(data: &mut impl Buf, ty: &DataType) -> Result<Datum> {
198    let null_tag = data.get_u8();
199    match null_tag {
200        0 => Ok(None),
201        1 => Some(deserialize_value(ty, data)).transpose(),
202        _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
203    }
204}
205
206fn serialize_scalar(value: ScalarRefImpl<'_>, buf: &mut impl BufMut) {
207    match value {
208        ScalarRefImpl::Int16(v) => buf.put_i16_le(v),
209        ScalarRefImpl::Int32(v) => buf.put_i32_le(v),
210        ScalarRefImpl::Int64(v) => buf.put_i64_le(v),
211        ScalarRefImpl::Int256(v) => buf.put_slice(&v.to_le_bytes()),
212        ScalarRefImpl::Serial(v) => buf.put_i64_le(v.into_inner()),
213        ScalarRefImpl::Float32(v) => buf.put_f32_le(v.into_inner()),
214        ScalarRefImpl::Float64(v) => buf.put_f64_le(v.into_inner()),
215        ScalarRefImpl::Utf8(v) => serialize_str(v.as_bytes(), buf),
216        ScalarRefImpl::Bytea(v) => serialize_str(v, buf),
217        ScalarRefImpl::Bool(v) => buf.put_u8(v as u8),
218        ScalarRefImpl::Decimal(v) => serialize_decimal(&v, buf),
219        ScalarRefImpl::Interval(v) => serialize_interval(&v, buf),
220        ScalarRefImpl::Date(v) => serialize_date(v.0.num_days_from_ce(), buf),
221        ScalarRefImpl::Timestamp(v) => serialize_timestamp(
222            v.0.and_utc().timestamp(),
223            v.0.and_utc().timestamp_subsec_nanos(),
224            buf,
225        ),
226        ScalarRefImpl::Timestamptz(v) => buf.put_i64_le(v.timestamp_micros()),
227        ScalarRefImpl::Time(v) => {
228            serialize_time(v.0.num_seconds_from_midnight(), v.0.nanosecond(), buf)
229        }
230        ScalarRefImpl::Jsonb(v) => serialize_str(&v.value_serialize(), buf),
231        ScalarRefImpl::Struct(s) => serialize_struct(s, buf),
232        ScalarRefImpl::List(v) => serialize_list(v, buf),
233        ScalarRefImpl::Map(m) => serialize_list(m.into_inner(), buf),
234        ScalarRefImpl::Vector(v) => serialize_vector(v, buf),
235    }
236}
237
238fn estimate_serialize_scalar_size(value: ScalarRefImpl<'_>) -> usize {
239    match value {
240        ScalarRefImpl::Int16(_) => 2,
241        ScalarRefImpl::Int32(_) => 4,
242        ScalarRefImpl::Int64(_) => 8,
243        ScalarRefImpl::Int256(_) => 32,
244        ScalarRefImpl::Serial(_) => 8,
245        ScalarRefImpl::Float32(_) => 4,
246        ScalarRefImpl::Float64(_) => 8,
247        ScalarRefImpl::Utf8(v) => estimate_serialize_str_size(v.as_bytes()),
248        ScalarRefImpl::Bytea(v) => estimate_serialize_str_size(v),
249        ScalarRefImpl::Bool(_) => 1,
250        ScalarRefImpl::Decimal(_) => estimate_serialize_decimal_size(),
251        ScalarRefImpl::Interval(_) => estimate_serialize_interval_size(),
252        ScalarRefImpl::Date(_) => estimate_serialize_date_size(),
253        ScalarRefImpl::Timestamp(_) => estimate_serialize_timestamp_size(),
254        ScalarRefImpl::Timestamptz(_) => 8,
255        ScalarRefImpl::Time(_) => estimate_serialize_time_size(),
256        // not exact as we use internal encoding size to estimate the json string size
257        ScalarRefImpl::Jsonb(v) => v.capacity(),
258        ScalarRefImpl::Struct(s) => estimate_serialize_struct_size(s),
259        ScalarRefImpl::List(v) => estimate_serialize_list_size(v),
260        ScalarRefImpl::Map(v) => estimate_serialize_list_size(v.into_inner()),
261        ScalarRefImpl::Vector(v) => estimate_serialize_vector_size(v),
262    }
263}
264
265fn serialize_struct(value: StructRef<'_>, buf: &mut impl BufMut) {
266    value.iter_fields_ref().for_each(|field_value| {
267        serialize_datum_into(field_value, buf);
268    });
269}
270
271fn estimate_serialize_struct_size(s: StructRef<'_>) -> usize {
272    s.estimate_serialize_size_inner()
273}
274fn serialize_list(value: ListRef<'_>, buf: &mut impl BufMut) {
275    let elems = value.iter();
276    buf.put_u32_le(elems.len() as u32);
277
278    elems.for_each(|field_value| {
279        serialize_datum_into(field_value, buf);
280    });
281}
282fn estimate_serialize_list_size(list: ListRef<'_>) -> usize {
283    4 + list.estimate_serialize_size_inner()
284}
285
286fn serialize_vector(value: VectorRef<'_>, buf: &mut impl BufMut) {
287    let elems = value.as_slice();
288    encode_vector_payload(elems, buf);
289}
290fn estimate_serialize_vector_size(v: VectorRef<'_>) -> usize {
291    size_of_val(v.as_slice())
292}
293
294fn serialize_str(bytes: &[u8], buf: &mut impl BufMut) {
295    buf.put_u32_le(bytes.len() as u32);
296    buf.put_slice(bytes);
297}
298
299fn estimate_serialize_str_size(bytes: &[u8]) -> usize {
300    4 + bytes.len()
301}
302
303fn serialize_interval(interval: &Interval, buf: &mut impl BufMut) {
304    buf.put_i32_le(interval.months());
305    buf.put_i32_le(interval.days());
306    buf.put_i64_le(interval.usecs());
307}
308
309fn estimate_serialize_interval_size() -> usize {
310    4 + 4 + 8
311}
312
313fn serialize_date(days: i32, buf: &mut impl BufMut) {
314    buf.put_i32_le(days);
315}
316
317fn estimate_serialize_date_size() -> usize {
318    4
319}
320
321fn serialize_timestamp(secs: i64, nsecs: u32, buf: &mut impl BufMut) {
322    buf.put_i64_le(secs);
323    buf.put_u32_le(nsecs);
324}
325
326fn estimate_serialize_timestamp_size() -> usize {
327    8 + 4
328}
329
330fn serialize_time(secs: u32, nano: u32, buf: &mut impl BufMut) {
331    buf.put_u32_le(secs);
332    buf.put_u32_le(nano);
333}
334
335fn estimate_serialize_time_size() -> usize {
336    4 + 4
337}
338
339fn serialize_decimal(decimal: &Decimal, buf: &mut impl BufMut) {
340    buf.put_slice(&decimal.unordered_serialize());
341}
342
343fn estimate_serialize_decimal_size() -> usize {
344    16
345}
346
347fn deserialize_value(ty: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
348    Ok(match ty {
349        DataType::Int16 => ScalarImpl::Int16(data.get_i16_le()),
350        DataType::Int32 => ScalarImpl::Int32(data.get_i32_le()),
351        DataType::Int64 => ScalarImpl::Int64(data.get_i64_le()),
352        DataType::Int256 => ScalarImpl::Int256(deserialize_int256(data)),
353        DataType::Serial => ScalarImpl::Serial(Serial::from(data.get_i64_le())),
354        DataType::Float32 => ScalarImpl::Float32(F32::from(data.get_f32_le())),
355        DataType::Float64 => ScalarImpl::Float64(F64::from(data.get_f64_le())),
356        DataType::Varchar => ScalarImpl::Utf8(deserialize_str(data)?),
357        DataType::Boolean => ScalarImpl::Bool(deserialize_bool(data)?),
358        DataType::Decimal => ScalarImpl::Decimal(deserialize_decimal(data)?),
359        DataType::Interval => ScalarImpl::Interval(deserialize_interval(data)?),
360        DataType::Time => ScalarImpl::Time(deserialize_time(data)?),
361        DataType::Timestamp => ScalarImpl::Timestamp(deserialize_timestamp(data)?),
362        DataType::Timestamptz => {
363            ScalarImpl::Timestamptz(Timestamptz::from_micros(data.get_i64_le()))
364        }
365        DataType::Date => ScalarImpl::Date(deserialize_date(data)?),
366        DataType::Jsonb => ScalarImpl::Jsonb(
367            JsonbVal::value_deserialize(&deserialize_bytea(data))
368                .ok_or(ValueEncodingError::InvalidJsonbEncoding)?,
369        ),
370        DataType::Struct(struct_def) => deserialize_struct(struct_def, data)?,
371        DataType::Bytea => ScalarImpl::Bytea(deserialize_bytea(data).into()),
372        DataType::Vector(dimension) => deserialize_vector(*dimension, data),
373        DataType::List(item_type) => deserialize_list(item_type, data)?,
374        DataType::Map(map_type) => {
375            // FIXME: clone type everytime here is inefficient
376            let list = deserialize_list(&map_type.clone().into_struct(), data)?.into_list();
377            ScalarImpl::Map(MapValue::from_entries(list))
378        }
379    })
380}
381
382fn deserialize_struct(struct_def: &StructType, data: &mut impl Buf) -> Result<ScalarImpl> {
383    let mut field_values = Vec::with_capacity(struct_def.len());
384    for field_type in struct_def.types() {
385        field_values.push(inner_deserialize_datum(data, field_type)?);
386    }
387
388    Ok(ScalarImpl::Struct(StructValue::new(field_values)))
389}
390
391fn deserialize_list(item_type: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
392    let len = data.get_u32_le();
393    let mut builder = item_type.create_array_builder(len as usize);
394    for _ in 0..len {
395        builder.append(inner_deserialize_datum(data, item_type)?);
396    }
397    Ok(ScalarImpl::List(ListValue::new(builder.finish())))
398}
399
400fn deserialize_vector(dimension: usize, data: &mut impl Buf) -> ScalarImpl {
401    VectorVal {
402        inner: decode_vector_payload(dimension, data).into_boxed_slice(),
403    }
404    .to_scalar_value()
405}
406
407fn deserialize_str(data: &mut impl Buf) -> Result<Box<str>> {
408    let len = data.get_u32_le();
409    let mut bytes = vec![0; len as usize];
410    data.copy_to_slice(&mut bytes);
411    String::from_utf8(bytes)
412        .map(String::into_boxed_str)
413        .map_err(ValueEncodingError::InvalidUtf8)
414}
415
416fn deserialize_bytea(data: &mut impl Buf) -> Vec<u8> {
417    let len = data.get_u32_le();
418    let mut bytes = vec![0; len as usize];
419    data.copy_to_slice(&mut bytes);
420    bytes
421}
422
423fn deserialize_int256(data: &mut impl Buf) -> Int256 {
424    let mut bytes = [0; Int256::size()];
425    data.copy_to_slice(&mut bytes);
426    Int256::from_le_bytes(bytes)
427}
428
429fn deserialize_bool(data: &mut impl Buf) -> Result<bool> {
430    match data.get_u8() {
431        1 => Ok(true),
432        0 => Ok(false),
433        value => Err(ValueEncodingError::InvalidBoolEncoding(value)),
434    }
435}
436
437fn deserialize_interval(data: &mut impl Buf) -> Result<Interval> {
438    let months = data.get_i32_le();
439    let days = data.get_i32_le();
440    let usecs = data.get_i64_le();
441    Ok(Interval::from_month_day_usec(months, days, usecs))
442}
443
444fn deserialize_time(data: &mut impl Buf) -> Result<Time> {
445    let secs = data.get_u32_le();
446    let nano = data.get_u32_le();
447    Time::with_secs_nano(secs, nano)
448        .map_err(|_e| ValueEncodingError::InvalidTimeEncoding(secs, nano))
449}
450
451fn deserialize_timestamp(data: &mut impl Buf) -> Result<Timestamp> {
452    let secs = data.get_i64_le();
453    let nsecs = data.get_u32_le();
454    Timestamp::with_secs_nsecs(secs, nsecs)
455        .map_err(|_e| ValueEncodingError::InvalidTimestampEncoding(secs, nsecs))
456}
457
458fn deserialize_date(data: &mut impl Buf) -> Result<Date> {
459    let days = data.get_i32_le();
460    Date::with_days_since_ce(days).map_err(|_e| ValueEncodingError::InvalidDateEncoding(days))
461}
462
463fn deserialize_decimal(data: &mut impl Buf) -> Result<Decimal> {
464    let mut bytes = [0; 16];
465    data.copy_to_slice(&mut bytes);
466    Ok(Decimal::unordered_deserialize(bytes))
467}
468
469#[cfg(test)]
470mod tests {
471    use crate::array::{ArrayImpl, ListValue, StructValue};
472    use crate::test_utils::rand_chunk;
473    use crate::types::{
474        DataType, Date, Datum, Decimal, Interval, ScalarImpl, Serial, Time, Timestamp,
475    };
476    use crate::util::value_encoding::{
477        estimate_serialize_datum_size, serialize_datum, try_get_exact_serialize_datum_size,
478    };
479
480    fn test_estimate_serialize_scalar_size(s: ScalarImpl) {
481        let d = Datum::from(s);
482        assert_eq!(estimate_serialize_datum_size(&d), serialize_datum(&d).len());
483    }
484
485    fn test_try_get_exact_serialize_datum_size(s: &ArrayImpl) {
486        let d = s.to_datum();
487        if let Some(ret) = try_get_exact_serialize_datum_size(s) {
488            assert_eq!(ret, serialize_datum(&d).len());
489        }
490    }
491
492    #[test]
493    fn test_estimate_size() {
494        let d: Datum = None;
495        assert_eq!(estimate_serialize_datum_size(&d), serialize_datum(&d).len());
496
497        test_estimate_serialize_scalar_size(ScalarImpl::Bool(true));
498        test_estimate_serialize_scalar_size(ScalarImpl::Int16(1));
499        test_estimate_serialize_scalar_size(ScalarImpl::Int32(1));
500        test_estimate_serialize_scalar_size(ScalarImpl::Int64(1));
501        test_estimate_serialize_scalar_size(ScalarImpl::Float32(1.0.into()));
502        test_estimate_serialize_scalar_size(ScalarImpl::Float64(1.0.into()));
503        test_estimate_serialize_scalar_size(ScalarImpl::Serial(Serial::from(i64::MIN)));
504
505        test_estimate_serialize_scalar_size(ScalarImpl::Utf8("abc".into()));
506        test_estimate_serialize_scalar_size(ScalarImpl::Utf8("".into()));
507        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::NegativeInf));
508        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::PositiveInf));
509        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::NaN));
510        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(123123.into()));
511        test_estimate_serialize_scalar_size(ScalarImpl::Interval(Interval::from_month_day_usec(
512            7, 8, 9,
513        )));
514        test_estimate_serialize_scalar_size(ScalarImpl::Date(Date::from_ymd_uncheck(2333, 3, 3)));
515        test_estimate_serialize_scalar_size(ScalarImpl::Bytea("\\x233".as_bytes().into()));
516        test_estimate_serialize_scalar_size(ScalarImpl::Time(Time::from_hms_uncheck(2, 3, 3)));
517        test_estimate_serialize_scalar_size(ScalarImpl::Timestamp(
518            Timestamp::from_timestamp_uncheck(23333333, 2333),
519        ));
520        test_estimate_serialize_scalar_size(ScalarImpl::Interval(Interval::from_month_day_usec(
521            2, 3, 3333,
522        )));
523        test_estimate_serialize_scalar_size(ScalarImpl::Struct(StructValue::new(vec![
524            ScalarImpl::Int64(233).into(),
525            ScalarImpl::Float64(23.33.into()).into(),
526        ])));
527        test_estimate_serialize_scalar_size(ScalarImpl::List(ListValue::from_iter([233i64, 2333])));
528    }
529
530    #[test]
531    fn test_try_estimate_size() {
532        let chunk = rand_chunk::gen_chunk(
533            &[
534                DataType::Int16,
535                DataType::Int32,
536                DataType::Int64,
537                DataType::Serial,
538                DataType::Float32,
539                DataType::Float64,
540                DataType::Boolean,
541                DataType::Decimal,
542                DataType::Interval,
543                DataType::Time,
544                DataType::Timestamp,
545                DataType::Date,
546            ],
547            1,
548            0,
549            0.0,
550        );
551        for column in chunk.columns() {
552            test_try_get_exact_serialize_datum_size(column);
553        }
554    }
555}