risingwave_common/util/value_encoding/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Value encoding is an encoding format which converts the data into a binary form (not
16//! memcomparable, i.e., Key encoding).
17
18use bytes::{Buf, BufMut};
19use chrono::{Datelike, Timelike};
20use either::{Either, for_both};
21use enum_as_inner::EnumAsInner;
22use risingwave_pb::data::PbDatum;
23
24use crate::array::ArrayImpl;
25use crate::row::Row;
26use crate::types::*;
27
28pub mod error;
29use error::ValueEncodingError;
30
31use self::column_aware_row_encoding::ColumnAwareSerde;
32pub mod column_aware_row_encoding;
33
34pub use crate::row::RowDeserializer as BasicDeserializer;
35
36pub type Result<T> = std::result::Result<T, ValueEncodingError>;
37
38/// The kind of all possible `ValueRowSerde`.
39#[derive(EnumAsInner)]
40pub enum ValueRowSerdeKind {
41    /// For `BasicSerde`, the value is encoded with value-encoding.
42    Basic,
43    /// For `ColumnAwareSerde`, the value is encoded with column-aware row encoding.
44    ColumnAware,
45}
46
47/// Part of `ValueRowSerde` that implements `serialize` a `Row` into bytes
48pub trait ValueRowSerializer: Clone {
49    fn serialize(&self, row: impl Row) -> Vec<u8>;
50}
51
52/// Part of `ValueRowSerde` that implements `deserialize` bytes into a `Row`
53pub trait ValueRowDeserializer: Clone {
54    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>>;
55}
56
57/// The type-erased `ValueRowSerde`, used for simplifying the code.
58#[derive(Clone)]
59pub struct EitherSerde(pub Either<BasicSerde, ColumnAwareSerde>);
60
61impl From<BasicSerde> for EitherSerde {
62    fn from(value: BasicSerde) -> Self {
63        Self(Either::Left(value))
64    }
65}
66impl From<ColumnAwareSerde> for EitherSerde {
67    fn from(value: ColumnAwareSerde) -> Self {
68        Self(Either::Right(value))
69    }
70}
71
72impl ValueRowSerializer for EitherSerde {
73    fn serialize(&self, row: impl Row) -> Vec<u8> {
74        for_both!(&self.0, s => s.serialize(row))
75    }
76}
77
78impl ValueRowDeserializer for EitherSerde {
79    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
80        for_both!(&self.0, s => s.deserialize(encoded_bytes))
81    }
82}
83
84/// Wrap of the original `Row` serializing function
85#[derive(Clone)]
86pub struct BasicSerializer;
87
88impl ValueRowSerializer for BasicSerializer {
89    fn serialize(&self, row: impl Row) -> Vec<u8> {
90        let mut buf = vec![];
91        for datum in row.iter() {
92            serialize_datum_into(datum, &mut buf);
93        }
94        buf
95    }
96}
97
98impl ValueRowDeserializer for BasicDeserializer {
99    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
100        Ok(self.deserialize(encoded_bytes)?.into_inner().into())
101    }
102}
103
104/// Wrap of the original `Row` serializing and deserializing function
105#[derive(Clone)]
106pub struct BasicSerde {
107    pub serializer: BasicSerializer,
108    pub deserializer: BasicDeserializer,
109}
110
111impl ValueRowSerializer for BasicSerde {
112    fn serialize(&self, row: impl Row) -> Vec<u8> {
113        self.serializer.serialize(row)
114    }
115}
116
117impl ValueRowDeserializer for BasicSerde {
118    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
119        Ok(self
120            .deserializer
121            .deserialize(encoded_bytes)?
122            .into_inner()
123            .into())
124    }
125}
126
127pub fn try_get_exact_serialize_datum_size(arr: &ArrayImpl) -> Option<usize> {
128    match arr {
129        ArrayImpl::Int16(_) => Some(2),
130        ArrayImpl::Int32(_) => Some(4),
131        ArrayImpl::Int64(_) => Some(8),
132        ArrayImpl::Serial(_) => Some(8),
133        ArrayImpl::Float32(_) => Some(4),
134        ArrayImpl::Float64(_) => Some(8),
135        ArrayImpl::Bool(_) => Some(1),
136        ArrayImpl::Decimal(_) => Some(estimate_serialize_decimal_size()),
137        ArrayImpl::Interval(_) => Some(estimate_serialize_interval_size()),
138        ArrayImpl::Date(_) => Some(estimate_serialize_date_size()),
139        ArrayImpl::Timestamp(_) => Some(estimate_serialize_timestamp_size()),
140        ArrayImpl::Time(_) => Some(estimate_serialize_time_size()),
141        _ => None,
142    }
143    .map(|x| x + 1)
144}
145
146/// Serialize a datum into bytes and return (Not order guarantee, used in value encoding).
147pub fn serialize_datum(cell: impl ToDatumRef) -> Vec<u8> {
148    let mut buf: Vec<u8> = vec![];
149    serialize_datum_into(cell, &mut buf);
150    buf
151}
152
153/// Serialize a datum into bytes (Not order guarantee, used in value encoding).
154pub fn serialize_datum_into(datum_ref: impl ToDatumRef, buf: &mut impl BufMut) {
155    if let Some(d) = datum_ref.to_datum_ref() {
156        buf.put_u8(1);
157        serialize_scalar(d, buf)
158    } else {
159        buf.put_u8(0);
160    }
161}
162
163pub fn estimate_serialize_datum_size(datum_ref: impl ToDatumRef) -> usize {
164    if let Some(d) = datum_ref.to_datum_ref() {
165        1 + estimate_serialize_scalar_size(d)
166    } else {
167        1
168    }
169}
170
171#[easy_ext::ext(DatumFromProtoExt)]
172impl Datum {
173    /// Create a datum from the protobuf representation with the given data type.
174    pub fn from_protobuf(proto: &PbDatum, data_type: &DataType) -> Result<Datum> {
175        deserialize_datum(proto.body.as_slice(), data_type)
176    }
177}
178
179#[easy_ext::ext(DatumToProtoExt)]
180impl<D: ToDatumRef> D {
181    /// Convert the datum to the protobuf representation.
182    pub fn to_protobuf(&self) -> PbDatum {
183        PbDatum {
184            body: serialize_datum(self),
185        }
186    }
187}
188
189/// Deserialize bytes into a datum (Not order guarantee, used in value encoding).
190pub fn deserialize_datum(mut data: impl Buf, ty: &DataType) -> Result<Datum> {
191    inner_deserialize_datum(&mut data, ty)
192}
193
194// prevent recursive use of &mut
195#[inline(always)]
196fn inner_deserialize_datum(data: &mut impl Buf, ty: &DataType) -> Result<Datum> {
197    let null_tag = data.get_u8();
198    match null_tag {
199        0 => Ok(None),
200        1 => Some(deserialize_value(ty, data)).transpose(),
201        _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
202    }
203}
204
205fn serialize_scalar(value: ScalarRefImpl<'_>, buf: &mut impl BufMut) {
206    match value {
207        ScalarRefImpl::Int16(v) => buf.put_i16_le(v),
208        ScalarRefImpl::Int32(v) => buf.put_i32_le(v),
209        ScalarRefImpl::Int64(v) => buf.put_i64_le(v),
210        ScalarRefImpl::Int256(v) => buf.put_slice(&v.to_le_bytes()),
211        ScalarRefImpl::Serial(v) => buf.put_i64_le(v.into_inner()),
212        ScalarRefImpl::Float32(v) => buf.put_f32_le(v.into_inner()),
213        ScalarRefImpl::Float64(v) => buf.put_f64_le(v.into_inner()),
214        ScalarRefImpl::Utf8(v) => serialize_str(v.as_bytes(), buf),
215        ScalarRefImpl::Bytea(v) => serialize_str(v, buf),
216        ScalarRefImpl::Bool(v) => buf.put_u8(v as u8),
217        ScalarRefImpl::Decimal(v) => serialize_decimal(&v, buf),
218        ScalarRefImpl::Interval(v) => serialize_interval(&v, buf),
219        ScalarRefImpl::Date(v) => serialize_date(v.0.num_days_from_ce(), buf),
220        ScalarRefImpl::Timestamp(v) => serialize_timestamp(
221            v.0.and_utc().timestamp(),
222            v.0.and_utc().timestamp_subsec_nanos(),
223            buf,
224        ),
225        ScalarRefImpl::Timestamptz(v) => buf.put_i64_le(v.timestamp_micros()),
226        ScalarRefImpl::Time(v) => {
227            serialize_time(v.0.num_seconds_from_midnight(), v.0.nanosecond(), buf)
228        }
229        ScalarRefImpl::Jsonb(v) => serialize_str(&v.value_serialize(), buf),
230        ScalarRefImpl::Struct(s) => serialize_struct(s, buf),
231        ScalarRefImpl::List(v) => serialize_list(v, buf),
232        ScalarRefImpl::Map(m) => serialize_list(m.into_inner(), buf),
233        ScalarRefImpl::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
234    }
235}
236
237fn estimate_serialize_scalar_size(value: ScalarRefImpl<'_>) -> usize {
238    match value {
239        ScalarRefImpl::Int16(_) => 2,
240        ScalarRefImpl::Int32(_) => 4,
241        ScalarRefImpl::Int64(_) => 8,
242        ScalarRefImpl::Int256(_) => 32,
243        ScalarRefImpl::Serial(_) => 8,
244        ScalarRefImpl::Float32(_) => 4,
245        ScalarRefImpl::Float64(_) => 8,
246        ScalarRefImpl::Utf8(v) => estimate_serialize_str_size(v.as_bytes()),
247        ScalarRefImpl::Bytea(v) => estimate_serialize_str_size(v),
248        ScalarRefImpl::Bool(_) => 1,
249        ScalarRefImpl::Decimal(_) => estimate_serialize_decimal_size(),
250        ScalarRefImpl::Interval(_) => estimate_serialize_interval_size(),
251        ScalarRefImpl::Date(_) => estimate_serialize_date_size(),
252        ScalarRefImpl::Timestamp(_) => estimate_serialize_timestamp_size(),
253        ScalarRefImpl::Timestamptz(_) => 8,
254        ScalarRefImpl::Time(_) => estimate_serialize_time_size(),
255        // not exact as we use internal encoding size to estimate the json string size
256        ScalarRefImpl::Jsonb(v) => v.capacity(),
257        ScalarRefImpl::Struct(s) => estimate_serialize_struct_size(s),
258        ScalarRefImpl::List(v) => estimate_serialize_list_size(v),
259        ScalarRefImpl::Map(v) => estimate_serialize_list_size(v.into_inner()),
260        ScalarRefImpl::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
261    }
262}
263
264fn serialize_struct(value: StructRef<'_>, buf: &mut impl BufMut) {
265    value.iter_fields_ref().for_each(|field_value| {
266        serialize_datum_into(field_value, buf);
267    });
268}
269
270fn estimate_serialize_struct_size(s: StructRef<'_>) -> usize {
271    s.estimate_serialize_size_inner()
272}
273fn serialize_list(value: ListRef<'_>, buf: &mut impl BufMut) {
274    let elems = value.iter();
275    buf.put_u32_le(elems.len() as u32);
276
277    elems.for_each(|field_value| {
278        serialize_datum_into(field_value, buf);
279    });
280}
281fn estimate_serialize_list_size(list: ListRef<'_>) -> usize {
282    4 + list.estimate_serialize_size_inner()
283}
284
285fn serialize_str(bytes: &[u8], buf: &mut impl BufMut) {
286    buf.put_u32_le(bytes.len() as u32);
287    buf.put_slice(bytes);
288}
289
290fn estimate_serialize_str_size(bytes: &[u8]) -> usize {
291    4 + bytes.len()
292}
293
294fn serialize_interval(interval: &Interval, buf: &mut impl BufMut) {
295    buf.put_i32_le(interval.months());
296    buf.put_i32_le(interval.days());
297    buf.put_i64_le(interval.usecs());
298}
299
300fn estimate_serialize_interval_size() -> usize {
301    4 + 4 + 8
302}
303
304fn serialize_date(days: i32, buf: &mut impl BufMut) {
305    buf.put_i32_le(days);
306}
307
308fn estimate_serialize_date_size() -> usize {
309    4
310}
311
312fn serialize_timestamp(secs: i64, nsecs: u32, buf: &mut impl BufMut) {
313    buf.put_i64_le(secs);
314    buf.put_u32_le(nsecs);
315}
316
317fn estimate_serialize_timestamp_size() -> usize {
318    8 + 4
319}
320
321fn serialize_time(secs: u32, nano: u32, buf: &mut impl BufMut) {
322    buf.put_u32_le(secs);
323    buf.put_u32_le(nano);
324}
325
326fn estimate_serialize_time_size() -> usize {
327    4 + 4
328}
329
330fn serialize_decimal(decimal: &Decimal, buf: &mut impl BufMut) {
331    buf.put_slice(&decimal.unordered_serialize());
332}
333
334fn estimate_serialize_decimal_size() -> usize {
335    16
336}
337
338fn deserialize_value(ty: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
339    Ok(match ty {
340        DataType::Int16 => ScalarImpl::Int16(data.get_i16_le()),
341        DataType::Int32 => ScalarImpl::Int32(data.get_i32_le()),
342        DataType::Int64 => ScalarImpl::Int64(data.get_i64_le()),
343        DataType::Int256 => ScalarImpl::Int256(deserialize_int256(data)),
344        DataType::Serial => ScalarImpl::Serial(Serial::from(data.get_i64_le())),
345        DataType::Float32 => ScalarImpl::Float32(F32::from(data.get_f32_le())),
346        DataType::Float64 => ScalarImpl::Float64(F64::from(data.get_f64_le())),
347        DataType::Varchar => ScalarImpl::Utf8(deserialize_str(data)?),
348        DataType::Boolean => ScalarImpl::Bool(deserialize_bool(data)?),
349        DataType::Decimal => ScalarImpl::Decimal(deserialize_decimal(data)?),
350        DataType::Interval => ScalarImpl::Interval(deserialize_interval(data)?),
351        DataType::Time => ScalarImpl::Time(deserialize_time(data)?),
352        DataType::Timestamp => ScalarImpl::Timestamp(deserialize_timestamp(data)?),
353        DataType::Timestamptz => {
354            ScalarImpl::Timestamptz(Timestamptz::from_micros(data.get_i64_le()))
355        }
356        DataType::Date => ScalarImpl::Date(deserialize_date(data)?),
357        DataType::Jsonb => ScalarImpl::Jsonb(
358            JsonbVal::value_deserialize(&deserialize_bytea(data))
359                .ok_or(ValueEncodingError::InvalidJsonbEncoding)?,
360        ),
361        DataType::Struct(struct_def) => deserialize_struct(struct_def, data)?,
362        DataType::Bytea => ScalarImpl::Bytea(deserialize_bytea(data).into()),
363        DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
364        DataType::List(item_type) => deserialize_list(item_type, data)?,
365        DataType::Map(map_type) => {
366            // FIXME: clone type everytime here is inefficient
367            let list = deserialize_list(&map_type.clone().into_struct(), data)?.into_list();
368            ScalarImpl::Map(MapValue::from_entries(list))
369        }
370    })
371}
372
373fn deserialize_struct(struct_def: &StructType, data: &mut impl Buf) -> Result<ScalarImpl> {
374    let mut field_values = Vec::with_capacity(struct_def.len());
375    for field_type in struct_def.types() {
376        field_values.push(inner_deserialize_datum(data, field_type)?);
377    }
378
379    Ok(ScalarImpl::Struct(StructValue::new(field_values)))
380}
381
382fn deserialize_list(item_type: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
383    let len = data.get_u32_le();
384    let mut builder = item_type.create_array_builder(len as usize);
385    for _ in 0..len {
386        builder.append(inner_deserialize_datum(data, item_type)?);
387    }
388    Ok(ScalarImpl::List(ListValue::new(builder.finish())))
389}
390
391fn deserialize_str(data: &mut impl Buf) -> Result<Box<str>> {
392    let len = data.get_u32_le();
393    let mut bytes = vec![0; len as usize];
394    data.copy_to_slice(&mut bytes);
395    String::from_utf8(bytes)
396        .map(String::into_boxed_str)
397        .map_err(ValueEncodingError::InvalidUtf8)
398}
399
400fn deserialize_bytea(data: &mut impl Buf) -> Vec<u8> {
401    let len = data.get_u32_le();
402    let mut bytes = vec![0; len as usize];
403    data.copy_to_slice(&mut bytes);
404    bytes
405}
406
407fn deserialize_int256(data: &mut impl Buf) -> Int256 {
408    let mut bytes = [0; Int256::size()];
409    data.copy_to_slice(&mut bytes);
410    Int256::from_le_bytes(bytes)
411}
412
413fn deserialize_bool(data: &mut impl Buf) -> Result<bool> {
414    match data.get_u8() {
415        1 => Ok(true),
416        0 => Ok(false),
417        value => Err(ValueEncodingError::InvalidBoolEncoding(value)),
418    }
419}
420
421fn deserialize_interval(data: &mut impl Buf) -> Result<Interval> {
422    let months = data.get_i32_le();
423    let days = data.get_i32_le();
424    let usecs = data.get_i64_le();
425    Ok(Interval::from_month_day_usec(months, days, usecs))
426}
427
428fn deserialize_time(data: &mut impl Buf) -> Result<Time> {
429    let secs = data.get_u32_le();
430    let nano = data.get_u32_le();
431    Time::with_secs_nano(secs, nano)
432        .map_err(|_e| ValueEncodingError::InvalidTimeEncoding(secs, nano))
433}
434
435fn deserialize_timestamp(data: &mut impl Buf) -> Result<Timestamp> {
436    let secs = data.get_i64_le();
437    let nsecs = data.get_u32_le();
438    Timestamp::with_secs_nsecs(secs, nsecs)
439        .map_err(|_e| ValueEncodingError::InvalidTimestampEncoding(secs, nsecs))
440}
441
442fn deserialize_date(data: &mut impl Buf) -> Result<Date> {
443    let days = data.get_i32_le();
444    Date::with_days_since_ce(days).map_err(|_e| ValueEncodingError::InvalidDateEncoding(days))
445}
446
447fn deserialize_decimal(data: &mut impl Buf) -> Result<Decimal> {
448    let mut bytes = [0; 16];
449    data.copy_to_slice(&mut bytes);
450    Ok(Decimal::unordered_deserialize(bytes))
451}
452
453#[cfg(test)]
454mod tests {
455    use crate::array::{ArrayImpl, ListValue, StructValue};
456    use crate::test_utils::rand_chunk;
457    use crate::types::{
458        DataType, Date, Datum, Decimal, Interval, ScalarImpl, Serial, Time, Timestamp,
459    };
460    use crate::util::value_encoding::{
461        estimate_serialize_datum_size, serialize_datum, try_get_exact_serialize_datum_size,
462    };
463
464    fn test_estimate_serialize_scalar_size(s: ScalarImpl) {
465        let d = Datum::from(s);
466        assert_eq!(estimate_serialize_datum_size(&d), serialize_datum(&d).len());
467    }
468
469    fn test_try_get_exact_serialize_datum_size(s: &ArrayImpl) {
470        let d = s.to_datum();
471        if let Some(ret) = try_get_exact_serialize_datum_size(s) {
472            assert_eq!(ret, serialize_datum(&d).len());
473        }
474    }
475
476    #[test]
477    fn test_estimate_size() {
478        let d: Datum = None;
479        assert_eq!(estimate_serialize_datum_size(&d), serialize_datum(&d).len());
480
481        test_estimate_serialize_scalar_size(ScalarImpl::Bool(true));
482        test_estimate_serialize_scalar_size(ScalarImpl::Int16(1));
483        test_estimate_serialize_scalar_size(ScalarImpl::Int32(1));
484        test_estimate_serialize_scalar_size(ScalarImpl::Int64(1));
485        test_estimate_serialize_scalar_size(ScalarImpl::Float32(1.0.into()));
486        test_estimate_serialize_scalar_size(ScalarImpl::Float64(1.0.into()));
487        test_estimate_serialize_scalar_size(ScalarImpl::Serial(Serial::from(i64::MIN)));
488
489        test_estimate_serialize_scalar_size(ScalarImpl::Utf8("abc".into()));
490        test_estimate_serialize_scalar_size(ScalarImpl::Utf8("".into()));
491        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::NegativeInf));
492        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::PositiveInf));
493        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::NaN));
494        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(123123.into()));
495        test_estimate_serialize_scalar_size(ScalarImpl::Interval(Interval::from_month_day_usec(
496            7, 8, 9,
497        )));
498        test_estimate_serialize_scalar_size(ScalarImpl::Date(Date::from_ymd_uncheck(2333, 3, 3)));
499        test_estimate_serialize_scalar_size(ScalarImpl::Bytea("\\x233".as_bytes().into()));
500        test_estimate_serialize_scalar_size(ScalarImpl::Time(Time::from_hms_uncheck(2, 3, 3)));
501        test_estimate_serialize_scalar_size(ScalarImpl::Timestamp(
502            Timestamp::from_timestamp_uncheck(23333333, 2333),
503        ));
504        test_estimate_serialize_scalar_size(ScalarImpl::Interval(Interval::from_month_day_usec(
505            2, 3, 3333,
506        )));
507        test_estimate_serialize_scalar_size(ScalarImpl::Struct(StructValue::new(vec![
508            ScalarImpl::Int64(233).into(),
509            ScalarImpl::Float64(23.33.into()).into(),
510        ])));
511        test_estimate_serialize_scalar_size(ScalarImpl::List(ListValue::from_iter([233i64, 2333])));
512    }
513
514    #[test]
515    fn test_try_estimate_size() {
516        let chunk = rand_chunk::gen_chunk(
517            &[
518                DataType::Int16,
519                DataType::Int32,
520                DataType::Int64,
521                DataType::Serial,
522                DataType::Float32,
523                DataType::Float64,
524                DataType::Boolean,
525                DataType::Decimal,
526                DataType::Interval,
527                DataType::Time,
528                DataType::Timestamp,
529                DataType::Date,
530            ],
531            1,
532            0,
533            0.0,
534        );
535        for column in chunk.columns() {
536            test_try_get_exact_serialize_datum_size(column);
537        }
538    }
539}