1use bytes::{Buf, BufMut};
19use chrono::{Datelike, Timelike};
20use either::{Either, for_both};
21use enum_as_inner::EnumAsInner;
22use risingwave_pb::data::PbDatum;
23
24use crate::array::ArrayImpl;
25use crate::row::{Row, RowDeserializer as BasicDeserializer};
26use crate::types::*;
27
28pub mod error;
29use error::ValueEncodingError;
30
31use self::column_aware_row_encoding::ColumnAwareSerde;
32pub mod column_aware_row_encoding;
33
34pub type Result<T> = std::result::Result<T, ValueEncodingError>;
35
36#[derive(EnumAsInner)]
38pub enum ValueRowSerdeKind {
39 Basic,
41 ColumnAware,
43}
44
45pub trait ValueRowSerializer: Clone {
47 fn serialize(&self, row: impl Row) -> Vec<u8>;
48}
49
50pub trait ValueRowDeserializer: Clone {
52 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>>;
53}
54
55#[derive(Clone)]
57pub struct EitherSerde(pub Either<BasicSerde, ColumnAwareSerde>);
58
59impl From<BasicSerde> for EitherSerde {
60 fn from(value: BasicSerde) -> Self {
61 Self(Either::Left(value))
62 }
63}
64impl From<ColumnAwareSerde> for EitherSerde {
65 fn from(value: ColumnAwareSerde) -> Self {
66 Self(Either::Right(value))
67 }
68}
69
70impl ValueRowSerializer for EitherSerde {
71 fn serialize(&self, row: impl Row) -> Vec<u8> {
72 for_both!(&self.0, s => s.serialize(row))
73 }
74}
75
76impl ValueRowDeserializer for EitherSerde {
77 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
78 for_both!(&self.0, s => s.deserialize(encoded_bytes))
79 }
80}
81
82#[derive(Clone)]
84pub struct BasicSerializer;
85
86impl ValueRowSerializer for BasicSerializer {
87 fn serialize(&self, row: impl Row) -> Vec<u8> {
88 let mut buf = vec![];
89 for datum in row.iter() {
90 serialize_datum_into(datum, &mut buf);
91 }
92 buf
93 }
94}
95
96impl ValueRowDeserializer for BasicDeserializer {
97 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
98 Ok(self.deserialize(encoded_bytes)?.into_inner().into())
99 }
100}
101
102#[derive(Clone)]
104pub struct BasicSerde {
105 pub serializer: BasicSerializer,
106 pub deserializer: BasicDeserializer,
107}
108
109impl ValueRowSerializer for BasicSerde {
110 fn serialize(&self, row: impl Row) -> Vec<u8> {
111 self.serializer.serialize(row)
112 }
113}
114
115impl ValueRowDeserializer for BasicSerde {
116 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
117 Ok(self
118 .deserializer
119 .deserialize(encoded_bytes)?
120 .into_inner()
121 .into())
122 }
123}
124
125pub fn try_get_exact_serialize_datum_size(arr: &ArrayImpl) -> Option<usize> {
126 match arr {
127 ArrayImpl::Int16(_) => Some(2),
128 ArrayImpl::Int32(_) => Some(4),
129 ArrayImpl::Int64(_) => Some(8),
130 ArrayImpl::Serial(_) => Some(8),
131 ArrayImpl::Float32(_) => Some(4),
132 ArrayImpl::Float64(_) => Some(8),
133 ArrayImpl::Bool(_) => Some(1),
134 ArrayImpl::Decimal(_) => Some(estimate_serialize_decimal_size()),
135 ArrayImpl::Interval(_) => Some(estimate_serialize_interval_size()),
136 ArrayImpl::Date(_) => Some(estimate_serialize_date_size()),
137 ArrayImpl::Timestamp(_) => Some(estimate_serialize_timestamp_size()),
138 ArrayImpl::Time(_) => Some(estimate_serialize_time_size()),
139 _ => None,
140 }
141 .map(|x| x + 1)
142}
143
144pub fn serialize_datum(cell: impl ToDatumRef) -> Vec<u8> {
146 let mut buf: Vec<u8> = vec![];
147 serialize_datum_into(cell, &mut buf);
148 buf
149}
150
151pub fn serialize_datum_into(datum_ref: impl ToDatumRef, buf: &mut impl BufMut) {
153 if let Some(d) = datum_ref.to_datum_ref() {
154 buf.put_u8(1);
155 serialize_scalar(d, buf)
156 } else {
157 buf.put_u8(0);
158 }
159}
160
161pub fn estimate_serialize_datum_size(datum_ref: impl ToDatumRef) -> usize {
162 if let Some(d) = datum_ref.to_datum_ref() {
163 1 + estimate_serialize_scalar_size(d)
164 } else {
165 1
166 }
167}
168
169#[easy_ext::ext(DatumFromProtoExt)]
170impl Datum {
171 pub fn from_protobuf(proto: &PbDatum, data_type: &DataType) -> Result<Datum> {
173 deserialize_datum(proto.body.as_slice(), data_type)
174 }
175}
176
177#[easy_ext::ext(DatumToProtoExt)]
178impl<D: ToDatumRef> D {
179 pub fn to_protobuf(&self) -> PbDatum {
181 PbDatum {
182 body: serialize_datum(self),
183 }
184 }
185}
186
187pub fn deserialize_datum(mut data: impl Buf, ty: &DataType) -> Result<Datum> {
189 inner_deserialize_datum(&mut data, ty)
190}
191
192#[inline(always)]
194fn inner_deserialize_datum(data: &mut impl Buf, ty: &DataType) -> Result<Datum> {
195 let null_tag = data.get_u8();
196 match null_tag {
197 0 => Ok(None),
198 1 => Some(deserialize_value(ty, data)).transpose(),
199 _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
200 }
201}
202
203fn serialize_scalar(value: ScalarRefImpl<'_>, buf: &mut impl BufMut) {
204 match value {
205 ScalarRefImpl::Int16(v) => buf.put_i16_le(v),
206 ScalarRefImpl::Int32(v) => buf.put_i32_le(v),
207 ScalarRefImpl::Int64(v) => buf.put_i64_le(v),
208 ScalarRefImpl::Int256(v) => buf.put_slice(&v.to_le_bytes()),
209 ScalarRefImpl::Serial(v) => buf.put_i64_le(v.into_inner()),
210 ScalarRefImpl::Float32(v) => buf.put_f32_le(v.into_inner()),
211 ScalarRefImpl::Float64(v) => buf.put_f64_le(v.into_inner()),
212 ScalarRefImpl::Utf8(v) => serialize_str(v.as_bytes(), buf),
213 ScalarRefImpl::Bytea(v) => serialize_str(v, buf),
214 ScalarRefImpl::Bool(v) => buf.put_u8(v as u8),
215 ScalarRefImpl::Decimal(v) => serialize_decimal(&v, buf),
216 ScalarRefImpl::Interval(v) => serialize_interval(&v, buf),
217 ScalarRefImpl::Date(v) => serialize_date(v.0.num_days_from_ce(), buf),
218 ScalarRefImpl::Timestamp(v) => serialize_timestamp(
219 v.0.and_utc().timestamp(),
220 v.0.and_utc().timestamp_subsec_nanos(),
221 buf,
222 ),
223 ScalarRefImpl::Timestamptz(v) => buf.put_i64_le(v.timestamp_micros()),
224 ScalarRefImpl::Time(v) => {
225 serialize_time(v.0.num_seconds_from_midnight(), v.0.nanosecond(), buf)
226 }
227 ScalarRefImpl::Jsonb(v) => serialize_str(&v.value_serialize(), buf),
228 ScalarRefImpl::Struct(s) => serialize_struct(s, buf),
229 ScalarRefImpl::List(v) => serialize_list(v, buf),
230 ScalarRefImpl::Map(m) => serialize_list(m.into_inner(), buf),
231 }
232}
233
234fn estimate_serialize_scalar_size(value: ScalarRefImpl<'_>) -> usize {
235 match value {
236 ScalarRefImpl::Int16(_) => 2,
237 ScalarRefImpl::Int32(_) => 4,
238 ScalarRefImpl::Int64(_) => 8,
239 ScalarRefImpl::Int256(_) => 32,
240 ScalarRefImpl::Serial(_) => 8,
241 ScalarRefImpl::Float32(_) => 4,
242 ScalarRefImpl::Float64(_) => 8,
243 ScalarRefImpl::Utf8(v) => estimate_serialize_str_size(v.as_bytes()),
244 ScalarRefImpl::Bytea(v) => estimate_serialize_str_size(v),
245 ScalarRefImpl::Bool(_) => 1,
246 ScalarRefImpl::Decimal(_) => estimate_serialize_decimal_size(),
247 ScalarRefImpl::Interval(_) => estimate_serialize_interval_size(),
248 ScalarRefImpl::Date(_) => estimate_serialize_date_size(),
249 ScalarRefImpl::Timestamp(_) => estimate_serialize_timestamp_size(),
250 ScalarRefImpl::Timestamptz(_) => 8,
251 ScalarRefImpl::Time(_) => estimate_serialize_time_size(),
252 ScalarRefImpl::Jsonb(v) => v.capacity(),
254 ScalarRefImpl::Struct(s) => estimate_serialize_struct_size(s),
255 ScalarRefImpl::List(v) => estimate_serialize_list_size(v),
256 ScalarRefImpl::Map(v) => estimate_serialize_list_size(v.into_inner()),
257 }
258}
259
260fn serialize_struct(value: StructRef<'_>, buf: &mut impl BufMut) {
261 value.iter_fields_ref().for_each(|field_value| {
262 serialize_datum_into(field_value, buf);
263 });
264}
265
266fn estimate_serialize_struct_size(s: StructRef<'_>) -> usize {
267 s.estimate_serialize_size_inner()
268}
269fn serialize_list(value: ListRef<'_>, buf: &mut impl BufMut) {
270 let elems = value.iter();
271 buf.put_u32_le(elems.len() as u32);
272
273 elems.for_each(|field_value| {
274 serialize_datum_into(field_value, buf);
275 });
276}
277fn estimate_serialize_list_size(list: ListRef<'_>) -> usize {
278 4 + list.estimate_serialize_size_inner()
279}
280
281fn serialize_str(bytes: &[u8], buf: &mut impl BufMut) {
282 buf.put_u32_le(bytes.len() as u32);
283 buf.put_slice(bytes);
284}
285
286fn estimate_serialize_str_size(bytes: &[u8]) -> usize {
287 4 + bytes.len()
288}
289
290fn serialize_interval(interval: &Interval, buf: &mut impl BufMut) {
291 buf.put_i32_le(interval.months());
292 buf.put_i32_le(interval.days());
293 buf.put_i64_le(interval.usecs());
294}
295
296fn estimate_serialize_interval_size() -> usize {
297 4 + 4 + 8
298}
299
300fn serialize_date(days: i32, buf: &mut impl BufMut) {
301 buf.put_i32_le(days);
302}
303
304fn estimate_serialize_date_size() -> usize {
305 4
306}
307
308fn serialize_timestamp(secs: i64, nsecs: u32, buf: &mut impl BufMut) {
309 buf.put_i64_le(secs);
310 buf.put_u32_le(nsecs);
311}
312
313fn estimate_serialize_timestamp_size() -> usize {
314 8 + 4
315}
316
317fn serialize_time(secs: u32, nano: u32, buf: &mut impl BufMut) {
318 buf.put_u32_le(secs);
319 buf.put_u32_le(nano);
320}
321
322fn estimate_serialize_time_size() -> usize {
323 4 + 4
324}
325
326fn serialize_decimal(decimal: &Decimal, buf: &mut impl BufMut) {
327 buf.put_slice(&decimal.unordered_serialize());
328}
329
330fn estimate_serialize_decimal_size() -> usize {
331 16
332}
333
334fn deserialize_value(ty: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
335 Ok(match ty {
336 DataType::Int16 => ScalarImpl::Int16(data.get_i16_le()),
337 DataType::Int32 => ScalarImpl::Int32(data.get_i32_le()),
338 DataType::Int64 => ScalarImpl::Int64(data.get_i64_le()),
339 DataType::Int256 => ScalarImpl::Int256(deserialize_int256(data)),
340 DataType::Serial => ScalarImpl::Serial(Serial::from(data.get_i64_le())),
341 DataType::Float32 => ScalarImpl::Float32(F32::from(data.get_f32_le())),
342 DataType::Float64 => ScalarImpl::Float64(F64::from(data.get_f64_le())),
343 DataType::Varchar => ScalarImpl::Utf8(deserialize_str(data)?),
344 DataType::Boolean => ScalarImpl::Bool(deserialize_bool(data)?),
345 DataType::Decimal => ScalarImpl::Decimal(deserialize_decimal(data)?),
346 DataType::Interval => ScalarImpl::Interval(deserialize_interval(data)?),
347 DataType::Time => ScalarImpl::Time(deserialize_time(data)?),
348 DataType::Timestamp => ScalarImpl::Timestamp(deserialize_timestamp(data)?),
349 DataType::Timestamptz => {
350 ScalarImpl::Timestamptz(Timestamptz::from_micros(data.get_i64_le()))
351 }
352 DataType::Date => ScalarImpl::Date(deserialize_date(data)?),
353 DataType::Jsonb => ScalarImpl::Jsonb(
354 JsonbVal::value_deserialize(&deserialize_bytea(data))
355 .ok_or(ValueEncodingError::InvalidJsonbEncoding)?,
356 ),
357 DataType::Struct(struct_def) => deserialize_struct(struct_def, data)?,
358 DataType::Bytea => ScalarImpl::Bytea(deserialize_bytea(data).into()),
359 DataType::List(item_type) => deserialize_list(item_type, data)?,
360 DataType::Map(map_type) => {
361 let list = deserialize_list(&map_type.clone().into_struct(), data)?.into_list();
363 ScalarImpl::Map(MapValue::from_entries(list))
364 }
365 })
366}
367
368fn deserialize_struct(struct_def: &StructType, data: &mut impl Buf) -> Result<ScalarImpl> {
369 let mut field_values = Vec::with_capacity(struct_def.len());
370 for field_type in struct_def.types() {
371 field_values.push(inner_deserialize_datum(data, field_type)?);
372 }
373
374 Ok(ScalarImpl::Struct(StructValue::new(field_values)))
375}
376
377fn deserialize_list(item_type: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
378 let len = data.get_u32_le();
379 let mut builder = item_type.create_array_builder(len as usize);
380 for _ in 0..len {
381 builder.append(inner_deserialize_datum(data, item_type)?);
382 }
383 Ok(ScalarImpl::List(ListValue::new(builder.finish())))
384}
385
386fn deserialize_str(data: &mut impl Buf) -> Result<Box<str>> {
387 let len = data.get_u32_le();
388 let mut bytes = vec![0; len as usize];
389 data.copy_to_slice(&mut bytes);
390 String::from_utf8(bytes)
391 .map(String::into_boxed_str)
392 .map_err(ValueEncodingError::InvalidUtf8)
393}
394
395fn deserialize_bytea(data: &mut impl Buf) -> Vec<u8> {
396 let len = data.get_u32_le();
397 let mut bytes = vec![0; len as usize];
398 data.copy_to_slice(&mut bytes);
399 bytes
400}
401
402fn deserialize_int256(data: &mut impl Buf) -> Int256 {
403 let mut bytes = [0; Int256::size()];
404 data.copy_to_slice(&mut bytes);
405 Int256::from_le_bytes(bytes)
406}
407
408fn deserialize_bool(data: &mut impl Buf) -> Result<bool> {
409 match data.get_u8() {
410 1 => Ok(true),
411 0 => Ok(false),
412 value => Err(ValueEncodingError::InvalidBoolEncoding(value)),
413 }
414}
415
416fn deserialize_interval(data: &mut impl Buf) -> Result<Interval> {
417 let months = data.get_i32_le();
418 let days = data.get_i32_le();
419 let usecs = data.get_i64_le();
420 Ok(Interval::from_month_day_usec(months, days, usecs))
421}
422
423fn deserialize_time(data: &mut impl Buf) -> Result<Time> {
424 let secs = data.get_u32_le();
425 let nano = data.get_u32_le();
426 Time::with_secs_nano(secs, nano)
427 .map_err(|_e| ValueEncodingError::InvalidTimeEncoding(secs, nano))
428}
429
430fn deserialize_timestamp(data: &mut impl Buf) -> Result<Timestamp> {
431 let secs = data.get_i64_le();
432 let nsecs = data.get_u32_le();
433 Timestamp::with_secs_nsecs(secs, nsecs)
434 .map_err(|_e| ValueEncodingError::InvalidTimestampEncoding(secs, nsecs))
435}
436
437fn deserialize_date(data: &mut impl Buf) -> Result<Date> {
438 let days = data.get_i32_le();
439 Date::with_days_since_ce(days).map_err(|_e| ValueEncodingError::InvalidDateEncoding(days))
440}
441
442fn deserialize_decimal(data: &mut impl Buf) -> Result<Decimal> {
443 let mut bytes = [0; 16];
444 data.copy_to_slice(&mut bytes);
445 Ok(Decimal::unordered_deserialize(bytes))
446}
447
448#[cfg(test)]
449mod tests {
450 use crate::array::{ArrayImpl, ListValue, StructValue};
451 use crate::test_utils::rand_chunk;
452 use crate::types::{
453 DataType, Date, Datum, Decimal, Interval, ScalarImpl, Serial, Time, Timestamp,
454 };
455 use crate::util::value_encoding::{
456 estimate_serialize_datum_size, serialize_datum, try_get_exact_serialize_datum_size,
457 };
458
459 fn test_estimate_serialize_scalar_size(s: ScalarImpl) {
460 let d = Datum::from(s);
461 assert_eq!(estimate_serialize_datum_size(&d), serialize_datum(&d).len());
462 }
463
464 fn test_try_get_exact_serialize_datum_size(s: &ArrayImpl) {
465 let d = s.to_datum();
466 if let Some(ret) = try_get_exact_serialize_datum_size(s) {
467 assert_eq!(ret, serialize_datum(&d).len());
468 }
469 }
470
471 #[test]
472 fn test_estimate_size() {
473 let d: Datum = None;
474 assert_eq!(estimate_serialize_datum_size(&d), serialize_datum(&d).len());
475
476 test_estimate_serialize_scalar_size(ScalarImpl::Bool(true));
477 test_estimate_serialize_scalar_size(ScalarImpl::Int16(1));
478 test_estimate_serialize_scalar_size(ScalarImpl::Int32(1));
479 test_estimate_serialize_scalar_size(ScalarImpl::Int64(1));
480 test_estimate_serialize_scalar_size(ScalarImpl::Float32(1.0.into()));
481 test_estimate_serialize_scalar_size(ScalarImpl::Float64(1.0.into()));
482 test_estimate_serialize_scalar_size(ScalarImpl::Serial(Serial::from(i64::MIN)));
483
484 test_estimate_serialize_scalar_size(ScalarImpl::Utf8("abc".into()));
485 test_estimate_serialize_scalar_size(ScalarImpl::Utf8("".into()));
486 test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::NegativeInf));
487 test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::PositiveInf));
488 test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::NaN));
489 test_estimate_serialize_scalar_size(ScalarImpl::Decimal(123123.into()));
490 test_estimate_serialize_scalar_size(ScalarImpl::Interval(Interval::from_month_day_usec(
491 7, 8, 9,
492 )));
493 test_estimate_serialize_scalar_size(ScalarImpl::Date(Date::from_ymd_uncheck(2333, 3, 3)));
494 test_estimate_serialize_scalar_size(ScalarImpl::Bytea("\\x233".as_bytes().into()));
495 test_estimate_serialize_scalar_size(ScalarImpl::Time(Time::from_hms_uncheck(2, 3, 3)));
496 test_estimate_serialize_scalar_size(ScalarImpl::Timestamp(
497 Timestamp::from_timestamp_uncheck(23333333, 2333),
498 ));
499 test_estimate_serialize_scalar_size(ScalarImpl::Interval(Interval::from_month_day_usec(
500 2, 3, 3333,
501 )));
502 test_estimate_serialize_scalar_size(ScalarImpl::Struct(StructValue::new(vec![
503 ScalarImpl::Int64(233).into(),
504 ScalarImpl::Float64(23.33.into()).into(),
505 ])));
506 test_estimate_serialize_scalar_size(ScalarImpl::List(ListValue::from_iter([233i64, 2333])));
507 }
508
509 #[test]
510 fn test_try_estimate_size() {
511 let chunk = rand_chunk::gen_chunk(
512 &[
513 DataType::Int16,
514 DataType::Int32,
515 DataType::Int64,
516 DataType::Serial,
517 DataType::Float32,
518 DataType::Float64,
519 DataType::Boolean,
520 DataType::Decimal,
521 DataType::Interval,
522 DataType::Time,
523 DataType::Timestamp,
524 DataType::Date,
525 ],
526 1,
527 0,
528 0.0,
529 );
530 for column in chunk.columns() {
531 test_try_get_exact_serialize_datum_size(column);
532 }
533 }
534}