risingwave_common/util/value_encoding/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Value encoding is an encoding format which converts the data into a binary form (not
16//! memcomparable, i.e., Key encoding).
17
18use bytes::{Buf, BufMut};
19use chrono::{Datelike, Timelike};
20use either::{Either, for_both};
21use enum_as_inner::EnumAsInner;
22use risingwave_pb::data::PbDatum;
23
24use crate::array::ArrayImpl;
25use crate::row::{Row, RowDeserializer as BasicDeserializer};
26use crate::types::*;
27
28pub mod error;
29use error::ValueEncodingError;
30
31use self::column_aware_row_encoding::ColumnAwareSerde;
32pub mod column_aware_row_encoding;
33
34pub type Result<T> = std::result::Result<T, ValueEncodingError>;
35
36/// The kind of all possible `ValueRowSerde`.
37#[derive(EnumAsInner)]
38pub enum ValueRowSerdeKind {
39    /// For `BasicSerde`, the value is encoded with value-encoding.
40    Basic,
41    /// For `ColumnAwareSerde`, the value is encoded with column-aware row encoding.
42    ColumnAware,
43}
44
45/// Part of `ValueRowSerde` that implements `serialize` a `Row` into bytes
46pub trait ValueRowSerializer: Clone {
47    fn serialize(&self, row: impl Row) -> Vec<u8>;
48}
49
50/// Part of `ValueRowSerde` that implements `deserialize` bytes into a `Row`
51pub trait ValueRowDeserializer: Clone {
52    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>>;
53}
54
55/// The type-erased `ValueRowSerde`, used for simplifying the code.
56#[derive(Clone)]
57pub struct EitherSerde(pub Either<BasicSerde, ColumnAwareSerde>);
58
59impl From<BasicSerde> for EitherSerde {
60    fn from(value: BasicSerde) -> Self {
61        Self(Either::Left(value))
62    }
63}
64impl From<ColumnAwareSerde> for EitherSerde {
65    fn from(value: ColumnAwareSerde) -> Self {
66        Self(Either::Right(value))
67    }
68}
69
70impl ValueRowSerializer for EitherSerde {
71    fn serialize(&self, row: impl Row) -> Vec<u8> {
72        for_both!(&self.0, s => s.serialize(row))
73    }
74}
75
76impl ValueRowDeserializer for EitherSerde {
77    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
78        for_both!(&self.0, s => s.deserialize(encoded_bytes))
79    }
80}
81
82/// Wrap of the original `Row` serializing function
83#[derive(Clone)]
84pub struct BasicSerializer;
85
86impl ValueRowSerializer for BasicSerializer {
87    fn serialize(&self, row: impl Row) -> Vec<u8> {
88        let mut buf = vec![];
89        for datum in row.iter() {
90            serialize_datum_into(datum, &mut buf);
91        }
92        buf
93    }
94}
95
96impl ValueRowDeserializer for BasicDeserializer {
97    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
98        Ok(self.deserialize(encoded_bytes)?.into_inner().into())
99    }
100}
101
102/// Wrap of the original `Row` serializing and deserializing function
103#[derive(Clone)]
104pub struct BasicSerde {
105    pub serializer: BasicSerializer,
106    pub deserializer: BasicDeserializer,
107}
108
109impl ValueRowSerializer for BasicSerde {
110    fn serialize(&self, row: impl Row) -> Vec<u8> {
111        self.serializer.serialize(row)
112    }
113}
114
115impl ValueRowDeserializer for BasicSerde {
116    fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
117        Ok(self
118            .deserializer
119            .deserialize(encoded_bytes)?
120            .into_inner()
121            .into())
122    }
123}
124
125pub fn try_get_exact_serialize_datum_size(arr: &ArrayImpl) -> Option<usize> {
126    match arr {
127        ArrayImpl::Int16(_) => Some(2),
128        ArrayImpl::Int32(_) => Some(4),
129        ArrayImpl::Int64(_) => Some(8),
130        ArrayImpl::Serial(_) => Some(8),
131        ArrayImpl::Float32(_) => Some(4),
132        ArrayImpl::Float64(_) => Some(8),
133        ArrayImpl::Bool(_) => Some(1),
134        ArrayImpl::Decimal(_) => Some(estimate_serialize_decimal_size()),
135        ArrayImpl::Interval(_) => Some(estimate_serialize_interval_size()),
136        ArrayImpl::Date(_) => Some(estimate_serialize_date_size()),
137        ArrayImpl::Timestamp(_) => Some(estimate_serialize_timestamp_size()),
138        ArrayImpl::Time(_) => Some(estimate_serialize_time_size()),
139        _ => None,
140    }
141    .map(|x| x + 1)
142}
143
144/// Serialize a datum into bytes and return (Not order guarantee, used in value encoding).
145pub fn serialize_datum(cell: impl ToDatumRef) -> Vec<u8> {
146    let mut buf: Vec<u8> = vec![];
147    serialize_datum_into(cell, &mut buf);
148    buf
149}
150
151/// Serialize a datum into bytes (Not order guarantee, used in value encoding).
152pub fn serialize_datum_into(datum_ref: impl ToDatumRef, buf: &mut impl BufMut) {
153    if let Some(d) = datum_ref.to_datum_ref() {
154        buf.put_u8(1);
155        serialize_scalar(d, buf)
156    } else {
157        buf.put_u8(0);
158    }
159}
160
161pub fn estimate_serialize_datum_size(datum_ref: impl ToDatumRef) -> usize {
162    if let Some(d) = datum_ref.to_datum_ref() {
163        1 + estimate_serialize_scalar_size(d)
164    } else {
165        1
166    }
167}
168
169#[easy_ext::ext(DatumFromProtoExt)]
170impl Datum {
171    /// Create a datum from the protobuf representation with the given data type.
172    pub fn from_protobuf(proto: &PbDatum, data_type: &DataType) -> Result<Datum> {
173        deserialize_datum(proto.body.as_slice(), data_type)
174    }
175}
176
177#[easy_ext::ext(DatumToProtoExt)]
178impl<D: ToDatumRef> D {
179    /// Convert the datum to the protobuf representation.
180    pub fn to_protobuf(&self) -> PbDatum {
181        PbDatum {
182            body: serialize_datum(self),
183        }
184    }
185}
186
187/// Deserialize bytes into a datum (Not order guarantee, used in value encoding).
188pub fn deserialize_datum(mut data: impl Buf, ty: &DataType) -> Result<Datum> {
189    inner_deserialize_datum(&mut data, ty)
190}
191
192// prevent recursive use of &mut
193#[inline(always)]
194fn inner_deserialize_datum(data: &mut impl Buf, ty: &DataType) -> Result<Datum> {
195    let null_tag = data.get_u8();
196    match null_tag {
197        0 => Ok(None),
198        1 => Some(deserialize_value(ty, data)).transpose(),
199        _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
200    }
201}
202
203fn serialize_scalar(value: ScalarRefImpl<'_>, buf: &mut impl BufMut) {
204    match value {
205        ScalarRefImpl::Int16(v) => buf.put_i16_le(v),
206        ScalarRefImpl::Int32(v) => buf.put_i32_le(v),
207        ScalarRefImpl::Int64(v) => buf.put_i64_le(v),
208        ScalarRefImpl::Int256(v) => buf.put_slice(&v.to_le_bytes()),
209        ScalarRefImpl::Serial(v) => buf.put_i64_le(v.into_inner()),
210        ScalarRefImpl::Float32(v) => buf.put_f32_le(v.into_inner()),
211        ScalarRefImpl::Float64(v) => buf.put_f64_le(v.into_inner()),
212        ScalarRefImpl::Utf8(v) => serialize_str(v.as_bytes(), buf),
213        ScalarRefImpl::Bytea(v) => serialize_str(v, buf),
214        ScalarRefImpl::Bool(v) => buf.put_u8(v as u8),
215        ScalarRefImpl::Decimal(v) => serialize_decimal(&v, buf),
216        ScalarRefImpl::Interval(v) => serialize_interval(&v, buf),
217        ScalarRefImpl::Date(v) => serialize_date(v.0.num_days_from_ce(), buf),
218        ScalarRefImpl::Timestamp(v) => serialize_timestamp(
219            v.0.and_utc().timestamp(),
220            v.0.and_utc().timestamp_subsec_nanos(),
221            buf,
222        ),
223        ScalarRefImpl::Timestamptz(v) => buf.put_i64_le(v.timestamp_micros()),
224        ScalarRefImpl::Time(v) => {
225            serialize_time(v.0.num_seconds_from_midnight(), v.0.nanosecond(), buf)
226        }
227        ScalarRefImpl::Jsonb(v) => serialize_str(&v.value_serialize(), buf),
228        ScalarRefImpl::Struct(s) => serialize_struct(s, buf),
229        ScalarRefImpl::List(v) => serialize_list(v, buf),
230        ScalarRefImpl::Map(m) => serialize_list(m.into_inner(), buf),
231    }
232}
233
234fn estimate_serialize_scalar_size(value: ScalarRefImpl<'_>) -> usize {
235    match value {
236        ScalarRefImpl::Int16(_) => 2,
237        ScalarRefImpl::Int32(_) => 4,
238        ScalarRefImpl::Int64(_) => 8,
239        ScalarRefImpl::Int256(_) => 32,
240        ScalarRefImpl::Serial(_) => 8,
241        ScalarRefImpl::Float32(_) => 4,
242        ScalarRefImpl::Float64(_) => 8,
243        ScalarRefImpl::Utf8(v) => estimate_serialize_str_size(v.as_bytes()),
244        ScalarRefImpl::Bytea(v) => estimate_serialize_str_size(v),
245        ScalarRefImpl::Bool(_) => 1,
246        ScalarRefImpl::Decimal(_) => estimate_serialize_decimal_size(),
247        ScalarRefImpl::Interval(_) => estimate_serialize_interval_size(),
248        ScalarRefImpl::Date(_) => estimate_serialize_date_size(),
249        ScalarRefImpl::Timestamp(_) => estimate_serialize_timestamp_size(),
250        ScalarRefImpl::Timestamptz(_) => 8,
251        ScalarRefImpl::Time(_) => estimate_serialize_time_size(),
252        // not exact as we use internal encoding size to estimate the json string size
253        ScalarRefImpl::Jsonb(v) => v.capacity(),
254        ScalarRefImpl::Struct(s) => estimate_serialize_struct_size(s),
255        ScalarRefImpl::List(v) => estimate_serialize_list_size(v),
256        ScalarRefImpl::Map(v) => estimate_serialize_list_size(v.into_inner()),
257    }
258}
259
260fn serialize_struct(value: StructRef<'_>, buf: &mut impl BufMut) {
261    value.iter_fields_ref().for_each(|field_value| {
262        serialize_datum_into(field_value, buf);
263    });
264}
265
266fn estimate_serialize_struct_size(s: StructRef<'_>) -> usize {
267    s.estimate_serialize_size_inner()
268}
269fn serialize_list(value: ListRef<'_>, buf: &mut impl BufMut) {
270    let elems = value.iter();
271    buf.put_u32_le(elems.len() as u32);
272
273    elems.for_each(|field_value| {
274        serialize_datum_into(field_value, buf);
275    });
276}
277fn estimate_serialize_list_size(list: ListRef<'_>) -> usize {
278    4 + list.estimate_serialize_size_inner()
279}
280
281fn serialize_str(bytes: &[u8], buf: &mut impl BufMut) {
282    buf.put_u32_le(bytes.len() as u32);
283    buf.put_slice(bytes);
284}
285
286fn estimate_serialize_str_size(bytes: &[u8]) -> usize {
287    4 + bytes.len()
288}
289
290fn serialize_interval(interval: &Interval, buf: &mut impl BufMut) {
291    buf.put_i32_le(interval.months());
292    buf.put_i32_le(interval.days());
293    buf.put_i64_le(interval.usecs());
294}
295
296fn estimate_serialize_interval_size() -> usize {
297    4 + 4 + 8
298}
299
300fn serialize_date(days: i32, buf: &mut impl BufMut) {
301    buf.put_i32_le(days);
302}
303
304fn estimate_serialize_date_size() -> usize {
305    4
306}
307
308fn serialize_timestamp(secs: i64, nsecs: u32, buf: &mut impl BufMut) {
309    buf.put_i64_le(secs);
310    buf.put_u32_le(nsecs);
311}
312
313fn estimate_serialize_timestamp_size() -> usize {
314    8 + 4
315}
316
317fn serialize_time(secs: u32, nano: u32, buf: &mut impl BufMut) {
318    buf.put_u32_le(secs);
319    buf.put_u32_le(nano);
320}
321
322fn estimate_serialize_time_size() -> usize {
323    4 + 4
324}
325
326fn serialize_decimal(decimal: &Decimal, buf: &mut impl BufMut) {
327    buf.put_slice(&decimal.unordered_serialize());
328}
329
330fn estimate_serialize_decimal_size() -> usize {
331    16
332}
333
334fn deserialize_value(ty: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
335    Ok(match ty {
336        DataType::Int16 => ScalarImpl::Int16(data.get_i16_le()),
337        DataType::Int32 => ScalarImpl::Int32(data.get_i32_le()),
338        DataType::Int64 => ScalarImpl::Int64(data.get_i64_le()),
339        DataType::Int256 => ScalarImpl::Int256(deserialize_int256(data)),
340        DataType::Serial => ScalarImpl::Serial(Serial::from(data.get_i64_le())),
341        DataType::Float32 => ScalarImpl::Float32(F32::from(data.get_f32_le())),
342        DataType::Float64 => ScalarImpl::Float64(F64::from(data.get_f64_le())),
343        DataType::Varchar => ScalarImpl::Utf8(deserialize_str(data)?),
344        DataType::Boolean => ScalarImpl::Bool(deserialize_bool(data)?),
345        DataType::Decimal => ScalarImpl::Decimal(deserialize_decimal(data)?),
346        DataType::Interval => ScalarImpl::Interval(deserialize_interval(data)?),
347        DataType::Time => ScalarImpl::Time(deserialize_time(data)?),
348        DataType::Timestamp => ScalarImpl::Timestamp(deserialize_timestamp(data)?),
349        DataType::Timestamptz => {
350            ScalarImpl::Timestamptz(Timestamptz::from_micros(data.get_i64_le()))
351        }
352        DataType::Date => ScalarImpl::Date(deserialize_date(data)?),
353        DataType::Jsonb => ScalarImpl::Jsonb(
354            JsonbVal::value_deserialize(&deserialize_bytea(data))
355                .ok_or(ValueEncodingError::InvalidJsonbEncoding)?,
356        ),
357        DataType::Struct(struct_def) => deserialize_struct(struct_def, data)?,
358        DataType::Bytea => ScalarImpl::Bytea(deserialize_bytea(data).into()),
359        DataType::List(item_type) => deserialize_list(item_type, data)?,
360        DataType::Map(map_type) => {
361            // FIXME: clone type everytime here is inefficient
362            let list = deserialize_list(&map_type.clone().into_struct(), data)?.into_list();
363            ScalarImpl::Map(MapValue::from_entries(list))
364        }
365    })
366}
367
368fn deserialize_struct(struct_def: &StructType, data: &mut impl Buf) -> Result<ScalarImpl> {
369    let mut field_values = Vec::with_capacity(struct_def.len());
370    for field_type in struct_def.types() {
371        field_values.push(inner_deserialize_datum(data, field_type)?);
372    }
373
374    Ok(ScalarImpl::Struct(StructValue::new(field_values)))
375}
376
377fn deserialize_list(item_type: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
378    let len = data.get_u32_le();
379    let mut builder = item_type.create_array_builder(len as usize);
380    for _ in 0..len {
381        builder.append(inner_deserialize_datum(data, item_type)?);
382    }
383    Ok(ScalarImpl::List(ListValue::new(builder.finish())))
384}
385
386fn deserialize_str(data: &mut impl Buf) -> Result<Box<str>> {
387    let len = data.get_u32_le();
388    let mut bytes = vec![0; len as usize];
389    data.copy_to_slice(&mut bytes);
390    String::from_utf8(bytes)
391        .map(String::into_boxed_str)
392        .map_err(ValueEncodingError::InvalidUtf8)
393}
394
395fn deserialize_bytea(data: &mut impl Buf) -> Vec<u8> {
396    let len = data.get_u32_le();
397    let mut bytes = vec![0; len as usize];
398    data.copy_to_slice(&mut bytes);
399    bytes
400}
401
402fn deserialize_int256(data: &mut impl Buf) -> Int256 {
403    let mut bytes = [0; Int256::size()];
404    data.copy_to_slice(&mut bytes);
405    Int256::from_le_bytes(bytes)
406}
407
408fn deserialize_bool(data: &mut impl Buf) -> Result<bool> {
409    match data.get_u8() {
410        1 => Ok(true),
411        0 => Ok(false),
412        value => Err(ValueEncodingError::InvalidBoolEncoding(value)),
413    }
414}
415
416fn deserialize_interval(data: &mut impl Buf) -> Result<Interval> {
417    let months = data.get_i32_le();
418    let days = data.get_i32_le();
419    let usecs = data.get_i64_le();
420    Ok(Interval::from_month_day_usec(months, days, usecs))
421}
422
423fn deserialize_time(data: &mut impl Buf) -> Result<Time> {
424    let secs = data.get_u32_le();
425    let nano = data.get_u32_le();
426    Time::with_secs_nano(secs, nano)
427        .map_err(|_e| ValueEncodingError::InvalidTimeEncoding(secs, nano))
428}
429
430fn deserialize_timestamp(data: &mut impl Buf) -> Result<Timestamp> {
431    let secs = data.get_i64_le();
432    let nsecs = data.get_u32_le();
433    Timestamp::with_secs_nsecs(secs, nsecs)
434        .map_err(|_e| ValueEncodingError::InvalidTimestampEncoding(secs, nsecs))
435}
436
437fn deserialize_date(data: &mut impl Buf) -> Result<Date> {
438    let days = data.get_i32_le();
439    Date::with_days_since_ce(days).map_err(|_e| ValueEncodingError::InvalidDateEncoding(days))
440}
441
442fn deserialize_decimal(data: &mut impl Buf) -> Result<Decimal> {
443    let mut bytes = [0; 16];
444    data.copy_to_slice(&mut bytes);
445    Ok(Decimal::unordered_deserialize(bytes))
446}
447
448#[cfg(test)]
449mod tests {
450    use crate::array::{ArrayImpl, ListValue, StructValue};
451    use crate::test_utils::rand_chunk;
452    use crate::types::{
453        DataType, Date, Datum, Decimal, Interval, ScalarImpl, Serial, Time, Timestamp,
454    };
455    use crate::util::value_encoding::{
456        estimate_serialize_datum_size, serialize_datum, try_get_exact_serialize_datum_size,
457    };
458
459    fn test_estimate_serialize_scalar_size(s: ScalarImpl) {
460        let d = Datum::from(s);
461        assert_eq!(estimate_serialize_datum_size(&d), serialize_datum(&d).len());
462    }
463
464    fn test_try_get_exact_serialize_datum_size(s: &ArrayImpl) {
465        let d = s.to_datum();
466        if let Some(ret) = try_get_exact_serialize_datum_size(s) {
467            assert_eq!(ret, serialize_datum(&d).len());
468        }
469    }
470
471    #[test]
472    fn test_estimate_size() {
473        let d: Datum = None;
474        assert_eq!(estimate_serialize_datum_size(&d), serialize_datum(&d).len());
475
476        test_estimate_serialize_scalar_size(ScalarImpl::Bool(true));
477        test_estimate_serialize_scalar_size(ScalarImpl::Int16(1));
478        test_estimate_serialize_scalar_size(ScalarImpl::Int32(1));
479        test_estimate_serialize_scalar_size(ScalarImpl::Int64(1));
480        test_estimate_serialize_scalar_size(ScalarImpl::Float32(1.0.into()));
481        test_estimate_serialize_scalar_size(ScalarImpl::Float64(1.0.into()));
482        test_estimate_serialize_scalar_size(ScalarImpl::Serial(Serial::from(i64::MIN)));
483
484        test_estimate_serialize_scalar_size(ScalarImpl::Utf8("abc".into()));
485        test_estimate_serialize_scalar_size(ScalarImpl::Utf8("".into()));
486        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::NegativeInf));
487        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::PositiveInf));
488        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::NaN));
489        test_estimate_serialize_scalar_size(ScalarImpl::Decimal(123123.into()));
490        test_estimate_serialize_scalar_size(ScalarImpl::Interval(Interval::from_month_day_usec(
491            7, 8, 9,
492        )));
493        test_estimate_serialize_scalar_size(ScalarImpl::Date(Date::from_ymd_uncheck(2333, 3, 3)));
494        test_estimate_serialize_scalar_size(ScalarImpl::Bytea("\\x233".as_bytes().into()));
495        test_estimate_serialize_scalar_size(ScalarImpl::Time(Time::from_hms_uncheck(2, 3, 3)));
496        test_estimate_serialize_scalar_size(ScalarImpl::Timestamp(
497            Timestamp::from_timestamp_uncheck(23333333, 2333),
498        ));
499        test_estimate_serialize_scalar_size(ScalarImpl::Interval(Interval::from_month_day_usec(
500            2, 3, 3333,
501        )));
502        test_estimate_serialize_scalar_size(ScalarImpl::Struct(StructValue::new(vec![
503            ScalarImpl::Int64(233).into(),
504            ScalarImpl::Float64(23.33.into()).into(),
505        ])));
506        test_estimate_serialize_scalar_size(ScalarImpl::List(ListValue::from_iter([233i64, 2333])));
507    }
508
509    #[test]
510    fn test_try_estimate_size() {
511        let chunk = rand_chunk::gen_chunk(
512            &[
513                DataType::Int16,
514                DataType::Int32,
515                DataType::Int64,
516                DataType::Serial,
517                DataType::Float32,
518                DataType::Float64,
519                DataType::Boolean,
520                DataType::Decimal,
521                DataType::Interval,
522                DataType::Time,
523                DataType::Timestamp,
524                DataType::Date,
525            ],
526            1,
527            0,
528            0.0,
529        );
530        for column in chunk.columns() {
531            test_try_get_exact_serialize_datum_size(column);
532        }
533    }
534}