1use bytes::{Buf, BufMut};
19use chrono::{Datelike, Timelike};
20use either::{Either, for_both};
21use enum_as_inner::EnumAsInner;
22use risingwave_pb::data::PbDatum;
23
24use crate::array::ArrayImpl;
25use crate::row::Row;
26use crate::types::*;
27
28pub mod error;
29use error::ValueEncodingError;
30
31use self::column_aware_row_encoding::ColumnAwareSerde;
32pub mod column_aware_row_encoding;
33
34pub use crate::row::RowDeserializer as BasicDeserializer;
35use crate::vector::{decode_vector_payload, encode_vector_payload};
36
37pub type Result<T> = std::result::Result<T, ValueEncodingError>;
38
39#[derive(EnumAsInner)]
41pub enum ValueRowSerdeKind {
42 Basic,
44 ColumnAware,
46}
47
48pub trait ValueRowSerializer: Clone {
50 fn serialize(&self, row: impl Row) -> Vec<u8>;
51}
52
53pub trait ValueRowDeserializer: Clone {
55 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>>;
56}
57
58#[derive(Clone)]
60pub struct EitherSerde(pub Either<BasicSerde, ColumnAwareSerde>);
61
62impl From<BasicSerde> for EitherSerde {
63 fn from(value: BasicSerde) -> Self {
64 Self(Either::Left(value))
65 }
66}
67impl From<ColumnAwareSerde> for EitherSerde {
68 fn from(value: ColumnAwareSerde) -> Self {
69 Self(Either::Right(value))
70 }
71}
72
73impl ValueRowSerializer for EitherSerde {
74 fn serialize(&self, row: impl Row) -> Vec<u8> {
75 for_both!(&self.0, s => s.serialize(row))
76 }
77}
78
79impl ValueRowDeserializer for EitherSerde {
80 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
81 for_both!(&self.0, s => s.deserialize(encoded_bytes))
82 }
83}
84
85#[derive(Clone)]
87pub struct BasicSerializer;
88
89impl ValueRowSerializer for BasicSerializer {
90 fn serialize(&self, row: impl Row) -> Vec<u8> {
91 let mut buf = vec![];
92 for datum in row.iter() {
93 serialize_datum_into(datum, &mut buf);
94 }
95 buf
96 }
97}
98
99impl ValueRowDeserializer for BasicDeserializer {
100 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
101 Ok(self.deserialize(encoded_bytes)?.into_inner().into())
102 }
103}
104
105#[derive(Clone)]
107pub struct BasicSerde {
108 pub serializer: BasicSerializer,
109 pub deserializer: BasicDeserializer,
110}
111
112impl ValueRowSerializer for BasicSerde {
113 fn serialize(&self, row: impl Row) -> Vec<u8> {
114 self.serializer.serialize(row)
115 }
116}
117
118impl ValueRowDeserializer for BasicSerde {
119 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
120 Ok(self
121 .deserializer
122 .deserialize(encoded_bytes)?
123 .into_inner()
124 .into())
125 }
126}
127
128pub fn try_get_exact_serialize_datum_size(arr: &ArrayImpl) -> Option<usize> {
129 match arr {
130 ArrayImpl::Int16(_) => Some(2),
131 ArrayImpl::Int32(_) => Some(4),
132 ArrayImpl::Int64(_) => Some(8),
133 ArrayImpl::Serial(_) => Some(8),
134 ArrayImpl::Float32(_) => Some(4),
135 ArrayImpl::Float64(_) => Some(8),
136 ArrayImpl::Bool(_) => Some(1),
137 ArrayImpl::Decimal(_) => Some(estimate_serialize_decimal_size()),
138 ArrayImpl::Interval(_) => Some(estimate_serialize_interval_size()),
139 ArrayImpl::Date(_) => Some(estimate_serialize_date_size()),
140 ArrayImpl::Timestamp(_) => Some(estimate_serialize_timestamp_size()),
141 ArrayImpl::Time(_) => Some(estimate_serialize_time_size()),
142 _ => None,
143 }
144 .map(|x| x + 1)
145}
146
147pub fn serialize_datum(cell: impl ToDatumRef) -> Vec<u8> {
149 let mut buf: Vec<u8> = vec![];
150 serialize_datum_into(cell, &mut buf);
151 buf
152}
153
154pub fn serialize_datum_into(datum_ref: impl ToDatumRef, buf: &mut impl BufMut) {
156 if let Some(d) = datum_ref.to_datum_ref() {
157 buf.put_u8(1);
158 serialize_scalar(d, buf)
159 } else {
160 buf.put_u8(0);
161 }
162}
163
164pub fn estimate_serialize_datum_size(datum_ref: impl ToDatumRef) -> usize {
165 if let Some(d) = datum_ref.to_datum_ref() {
166 1 + estimate_serialize_scalar_size(d)
167 } else {
168 1
169 }
170}
171
172#[easy_ext::ext(DatumFromProtoExt)]
173impl Datum {
174 pub fn from_protobuf(proto: &PbDatum, data_type: &DataType) -> Result<Datum> {
176 deserialize_datum(proto.body.as_slice(), data_type)
177 }
178}
179
180#[easy_ext::ext(DatumToProtoExt)]
181impl<D: ToDatumRef> D {
182 pub fn to_protobuf(&self) -> PbDatum {
184 PbDatum {
185 body: serialize_datum(self),
186 }
187 }
188}
189
190pub fn deserialize_datum(mut data: impl Buf, ty: &DataType) -> Result<Datum> {
192 inner_deserialize_datum(&mut data, ty)
193}
194
195#[inline(always)]
197fn inner_deserialize_datum(data: &mut impl Buf, ty: &DataType) -> Result<Datum> {
198 let null_tag = data.get_u8();
199 match null_tag {
200 0 => Ok(None),
201 1 => Some(deserialize_value(ty, data)).transpose(),
202 _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
203 }
204}
205
206fn serialize_scalar(value: ScalarRefImpl<'_>, buf: &mut impl BufMut) {
207 match value {
208 ScalarRefImpl::Int16(v) => buf.put_i16_le(v),
209 ScalarRefImpl::Int32(v) => buf.put_i32_le(v),
210 ScalarRefImpl::Int64(v) => buf.put_i64_le(v),
211 ScalarRefImpl::Int256(v) => buf.put_slice(&v.to_le_bytes()),
212 ScalarRefImpl::Serial(v) => buf.put_i64_le(v.into_inner()),
213 ScalarRefImpl::Float32(v) => buf.put_f32_le(v.into_inner()),
214 ScalarRefImpl::Float64(v) => buf.put_f64_le(v.into_inner()),
215 ScalarRefImpl::Utf8(v) => serialize_str(v.as_bytes(), buf),
216 ScalarRefImpl::Bytea(v) => serialize_str(v, buf),
217 ScalarRefImpl::Bool(v) => buf.put_u8(v as u8),
218 ScalarRefImpl::Decimal(v) => serialize_decimal(&v, buf),
219 ScalarRefImpl::Interval(v) => serialize_interval(&v, buf),
220 ScalarRefImpl::Date(v) => serialize_date(v.0.num_days_from_ce(), buf),
221 ScalarRefImpl::Timestamp(v) => serialize_timestamp(
222 v.0.and_utc().timestamp(),
223 v.0.and_utc().timestamp_subsec_nanos(),
224 buf,
225 ),
226 ScalarRefImpl::Timestamptz(v) => buf.put_i64_le(v.timestamp_micros()),
227 ScalarRefImpl::Time(v) => {
228 serialize_time(v.0.num_seconds_from_midnight(), v.0.nanosecond(), buf)
229 }
230 ScalarRefImpl::Jsonb(v) => serialize_str(&v.value_serialize(), buf),
231 ScalarRefImpl::Struct(s) => serialize_struct(s, buf),
232 ScalarRefImpl::List(v) => serialize_list(v, buf),
233 ScalarRefImpl::Map(m) => serialize_list(m.into_inner(), buf),
234 ScalarRefImpl::Vector(v) => serialize_vector(v, buf),
235 }
236}
237
238fn estimate_serialize_scalar_size(value: ScalarRefImpl<'_>) -> usize {
239 match value {
240 ScalarRefImpl::Int16(_) => 2,
241 ScalarRefImpl::Int32(_) => 4,
242 ScalarRefImpl::Int64(_) => 8,
243 ScalarRefImpl::Int256(_) => 32,
244 ScalarRefImpl::Serial(_) => 8,
245 ScalarRefImpl::Float32(_) => 4,
246 ScalarRefImpl::Float64(_) => 8,
247 ScalarRefImpl::Utf8(v) => estimate_serialize_str_size(v.as_bytes()),
248 ScalarRefImpl::Bytea(v) => estimate_serialize_str_size(v),
249 ScalarRefImpl::Bool(_) => 1,
250 ScalarRefImpl::Decimal(_) => estimate_serialize_decimal_size(),
251 ScalarRefImpl::Interval(_) => estimate_serialize_interval_size(),
252 ScalarRefImpl::Date(_) => estimate_serialize_date_size(),
253 ScalarRefImpl::Timestamp(_) => estimate_serialize_timestamp_size(),
254 ScalarRefImpl::Timestamptz(_) => 8,
255 ScalarRefImpl::Time(_) => estimate_serialize_time_size(),
256 ScalarRefImpl::Jsonb(v) => v.capacity(),
258 ScalarRefImpl::Struct(s) => estimate_serialize_struct_size(s),
259 ScalarRefImpl::List(v) => estimate_serialize_list_size(v),
260 ScalarRefImpl::Map(v) => estimate_serialize_list_size(v.into_inner()),
261 ScalarRefImpl::Vector(v) => estimate_serialize_vector_size(v),
262 }
263}
264
265fn serialize_struct(value: StructRef<'_>, buf: &mut impl BufMut) {
266 value.iter_fields_ref().for_each(|field_value| {
267 serialize_datum_into(field_value, buf);
268 });
269}
270
271fn estimate_serialize_struct_size(s: StructRef<'_>) -> usize {
272 s.estimate_serialize_size_inner()
273}
274fn serialize_list(value: ListRef<'_>, buf: &mut impl BufMut) {
275 let elems = value.iter();
276 buf.put_u32_le(elems.len() as u32);
277
278 elems.for_each(|field_value| {
279 serialize_datum_into(field_value, buf);
280 });
281}
282fn estimate_serialize_list_size(list: ListRef<'_>) -> usize {
283 4 + list.estimate_serialize_size_inner()
284}
285
286fn serialize_vector(value: VectorRef<'_>, buf: &mut impl BufMut) {
287 let elems = value.as_slice();
288 encode_vector_payload(elems, buf);
289}
290fn estimate_serialize_vector_size(v: VectorRef<'_>) -> usize {
291 size_of_val(v.as_slice())
292}
293
294fn serialize_str(bytes: &[u8], buf: &mut impl BufMut) {
295 buf.put_u32_le(bytes.len() as u32);
296 buf.put_slice(bytes);
297}
298
299fn estimate_serialize_str_size(bytes: &[u8]) -> usize {
300 4 + bytes.len()
301}
302
303fn serialize_interval(interval: &Interval, buf: &mut impl BufMut) {
304 buf.put_i32_le(interval.months());
305 buf.put_i32_le(interval.days());
306 buf.put_i64_le(interval.usecs());
307}
308
309fn estimate_serialize_interval_size() -> usize {
310 4 + 4 + 8
311}
312
313fn serialize_date(days: i32, buf: &mut impl BufMut) {
314 buf.put_i32_le(days);
315}
316
317fn estimate_serialize_date_size() -> usize {
318 4
319}
320
321fn serialize_timestamp(secs: i64, nsecs: u32, buf: &mut impl BufMut) {
322 buf.put_i64_le(secs);
323 buf.put_u32_le(nsecs);
324}
325
326fn estimate_serialize_timestamp_size() -> usize {
327 8 + 4
328}
329
330fn serialize_time(secs: u32, nano: u32, buf: &mut impl BufMut) {
331 buf.put_u32_le(secs);
332 buf.put_u32_le(nano);
333}
334
335fn estimate_serialize_time_size() -> usize {
336 4 + 4
337}
338
339fn serialize_decimal(decimal: &Decimal, buf: &mut impl BufMut) {
340 buf.put_slice(&decimal.unordered_serialize());
341}
342
343fn estimate_serialize_decimal_size() -> usize {
344 16
345}
346
347fn deserialize_value(ty: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
348 Ok(match ty {
349 DataType::Int16 => ScalarImpl::Int16(data.get_i16_le()),
350 DataType::Int32 => ScalarImpl::Int32(data.get_i32_le()),
351 DataType::Int64 => ScalarImpl::Int64(data.get_i64_le()),
352 DataType::Int256 => ScalarImpl::Int256(deserialize_int256(data)),
353 DataType::Serial => ScalarImpl::Serial(Serial::from(data.get_i64_le())),
354 DataType::Float32 => ScalarImpl::Float32(F32::from(data.get_f32_le())),
355 DataType::Float64 => ScalarImpl::Float64(F64::from(data.get_f64_le())),
356 DataType::Varchar => ScalarImpl::Utf8(deserialize_str(data)?),
357 DataType::Boolean => ScalarImpl::Bool(deserialize_bool(data)?),
358 DataType::Decimal => ScalarImpl::Decimal(deserialize_decimal(data)?),
359 DataType::Interval => ScalarImpl::Interval(deserialize_interval(data)?),
360 DataType::Time => ScalarImpl::Time(deserialize_time(data)?),
361 DataType::Timestamp => ScalarImpl::Timestamp(deserialize_timestamp(data)?),
362 DataType::Timestamptz => {
363 ScalarImpl::Timestamptz(Timestamptz::from_micros(data.get_i64_le()))
364 }
365 DataType::Date => ScalarImpl::Date(deserialize_date(data)?),
366 DataType::Jsonb => ScalarImpl::Jsonb(
367 JsonbVal::value_deserialize(&deserialize_bytea(data))
368 .ok_or(ValueEncodingError::InvalidJsonbEncoding)?,
369 ),
370 DataType::Struct(struct_def) => deserialize_struct(struct_def, data)?,
371 DataType::Bytea => ScalarImpl::Bytea(deserialize_bytea(data).into()),
372 DataType::Vector(dimension) => deserialize_vector(*dimension, data),
373 DataType::List(item_type) => deserialize_list(item_type, data)?,
374 DataType::Map(map_type) => {
375 let list = deserialize_list(&map_type.clone().into_struct(), data)?.into_list();
377 ScalarImpl::Map(MapValue::from_entries(list))
378 }
379 })
380}
381
382fn deserialize_struct(struct_def: &StructType, data: &mut impl Buf) -> Result<ScalarImpl> {
383 let mut field_values = Vec::with_capacity(struct_def.len());
384 for field_type in struct_def.types() {
385 field_values.push(inner_deserialize_datum(data, field_type)?);
386 }
387
388 Ok(ScalarImpl::Struct(StructValue::new(field_values)))
389}
390
391fn deserialize_list(item_type: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
392 let len = data.get_u32_le();
393 let mut builder = item_type.create_array_builder(len as usize);
394 for _ in 0..len {
395 builder.append(inner_deserialize_datum(data, item_type)?);
396 }
397 Ok(ScalarImpl::List(ListValue::new(builder.finish())))
398}
399
400fn deserialize_vector(dimension: usize, data: &mut impl Buf) -> ScalarImpl {
401 VectorVal {
402 inner: decode_vector_payload(dimension, data).into_boxed_slice(),
403 }
404 .to_scalar_value()
405}
406
407fn deserialize_str(data: &mut impl Buf) -> Result<Box<str>> {
408 let len = data.get_u32_le();
409 let mut bytes = vec![0; len as usize];
410 data.copy_to_slice(&mut bytes);
411 String::from_utf8(bytes)
412 .map(String::into_boxed_str)
413 .map_err(ValueEncodingError::InvalidUtf8)
414}
415
416fn deserialize_bytea(data: &mut impl Buf) -> Vec<u8> {
417 let len = data.get_u32_le();
418 let mut bytes = vec![0; len as usize];
419 data.copy_to_slice(&mut bytes);
420 bytes
421}
422
423fn deserialize_int256(data: &mut impl Buf) -> Int256 {
424 let mut bytes = [0; Int256::size()];
425 data.copy_to_slice(&mut bytes);
426 Int256::from_le_bytes(bytes)
427}
428
429fn deserialize_bool(data: &mut impl Buf) -> Result<bool> {
430 match data.get_u8() {
431 1 => Ok(true),
432 0 => Ok(false),
433 value => Err(ValueEncodingError::InvalidBoolEncoding(value)),
434 }
435}
436
437fn deserialize_interval(data: &mut impl Buf) -> Result<Interval> {
438 let months = data.get_i32_le();
439 let days = data.get_i32_le();
440 let usecs = data.get_i64_le();
441 Ok(Interval::from_month_day_usec(months, days, usecs))
442}
443
444fn deserialize_time(data: &mut impl Buf) -> Result<Time> {
445 let secs = data.get_u32_le();
446 let nano = data.get_u32_le();
447 Time::with_secs_nano(secs, nano)
448 .map_err(|_e| ValueEncodingError::InvalidTimeEncoding(secs, nano))
449}
450
451fn deserialize_timestamp(data: &mut impl Buf) -> Result<Timestamp> {
452 let secs = data.get_i64_le();
453 let nsecs = data.get_u32_le();
454 Timestamp::with_secs_nsecs(secs, nsecs)
455 .map_err(|_e| ValueEncodingError::InvalidTimestampEncoding(secs, nsecs))
456}
457
458fn deserialize_date(data: &mut impl Buf) -> Result<Date> {
459 let days = data.get_i32_le();
460 Date::with_days_since_ce(days).map_err(|_e| ValueEncodingError::InvalidDateEncoding(days))
461}
462
463fn deserialize_decimal(data: &mut impl Buf) -> Result<Decimal> {
464 let mut bytes = [0; 16];
465 data.copy_to_slice(&mut bytes);
466 Ok(Decimal::unordered_deserialize(bytes))
467}
468
469#[cfg(test)]
470mod tests {
471 use crate::array::{ArrayImpl, ListValue, StructValue};
472 use crate::test_utils::rand_chunk;
473 use crate::types::{
474 DataType, Date, Datum, Decimal, Interval, ScalarImpl, Serial, Time, Timestamp,
475 };
476 use crate::util::value_encoding::{
477 estimate_serialize_datum_size, serialize_datum, try_get_exact_serialize_datum_size,
478 };
479
480 fn test_estimate_serialize_scalar_size(s: ScalarImpl) {
481 let d = Datum::from(s);
482 assert_eq!(estimate_serialize_datum_size(&d), serialize_datum(&d).len());
483 }
484
485 fn test_try_get_exact_serialize_datum_size(s: &ArrayImpl) {
486 let d = s.to_datum();
487 if let Some(ret) = try_get_exact_serialize_datum_size(s) {
488 assert_eq!(ret, serialize_datum(&d).len());
489 }
490 }
491
492 #[test]
493 fn test_estimate_size() {
494 let d: Datum = None;
495 assert_eq!(estimate_serialize_datum_size(&d), serialize_datum(&d).len());
496
497 test_estimate_serialize_scalar_size(ScalarImpl::Bool(true));
498 test_estimate_serialize_scalar_size(ScalarImpl::Int16(1));
499 test_estimate_serialize_scalar_size(ScalarImpl::Int32(1));
500 test_estimate_serialize_scalar_size(ScalarImpl::Int64(1));
501 test_estimate_serialize_scalar_size(ScalarImpl::Float32(1.0.into()));
502 test_estimate_serialize_scalar_size(ScalarImpl::Float64(1.0.into()));
503 test_estimate_serialize_scalar_size(ScalarImpl::Serial(Serial::from(i64::MIN)));
504
505 test_estimate_serialize_scalar_size(ScalarImpl::Utf8("abc".into()));
506 test_estimate_serialize_scalar_size(ScalarImpl::Utf8("".into()));
507 test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::NegativeInf));
508 test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::PositiveInf));
509 test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::NaN));
510 test_estimate_serialize_scalar_size(ScalarImpl::Decimal(123123.into()));
511 test_estimate_serialize_scalar_size(ScalarImpl::Interval(Interval::from_month_day_usec(
512 7, 8, 9,
513 )));
514 test_estimate_serialize_scalar_size(ScalarImpl::Date(Date::from_ymd_uncheck(2333, 3, 3)));
515 test_estimate_serialize_scalar_size(ScalarImpl::Bytea("\\x233".as_bytes().into()));
516 test_estimate_serialize_scalar_size(ScalarImpl::Time(Time::from_hms_uncheck(2, 3, 3)));
517 test_estimate_serialize_scalar_size(ScalarImpl::Timestamp(
518 Timestamp::from_timestamp_uncheck(23333333, 2333),
519 ));
520 test_estimate_serialize_scalar_size(ScalarImpl::Interval(Interval::from_month_day_usec(
521 2, 3, 3333,
522 )));
523 test_estimate_serialize_scalar_size(ScalarImpl::Struct(StructValue::new(vec![
524 ScalarImpl::Int64(233).into(),
525 ScalarImpl::Float64(23.33.into()).into(),
526 ])));
527 test_estimate_serialize_scalar_size(ScalarImpl::List(ListValue::from_iter([233i64, 2333])));
528 }
529
530 #[test]
531 fn test_try_estimate_size() {
532 let chunk = rand_chunk::gen_chunk(
533 &[
534 DataType::Int16,
535 DataType::Int32,
536 DataType::Int64,
537 DataType::Serial,
538 DataType::Float32,
539 DataType::Float64,
540 DataType::Boolean,
541 DataType::Decimal,
542 DataType::Interval,
543 DataType::Time,
544 DataType::Timestamp,
545 DataType::Date,
546 ],
547 1,
548 0,
549 0.0,
550 );
551 for column in chunk.columns() {
552 test_try_get_exact_serialize_datum_size(column);
553 }
554 }
555}