1use bytes::{Buf, BufMut};
19use chrono::{Datelike, Timelike};
20use either::{Either, for_both};
21use enum_as_inner::EnumAsInner;
22use risingwave_pb::data::PbDatum;
23
24use crate::array::ArrayImpl;
25use crate::row::Row;
26use crate::types::*;
27
28pub mod error;
29use error::ValueEncodingError;
30
31use self::column_aware_row_encoding::ColumnAwareSerde;
32pub mod column_aware_row_encoding;
33
34pub use crate::row::RowDeserializer as BasicDeserializer;
35
36pub type Result<T> = std::result::Result<T, ValueEncodingError>;
37
38#[derive(EnumAsInner)]
40pub enum ValueRowSerdeKind {
41 Basic,
43 ColumnAware,
45}
46
47pub trait ValueRowSerializer: Clone {
49 fn serialize(&self, row: impl Row) -> Vec<u8>;
50}
51
52pub trait ValueRowDeserializer: Clone {
54 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>>;
55}
56
57#[derive(Clone)]
59pub struct EitherSerde(pub Either<BasicSerde, ColumnAwareSerde>);
60
61impl From<BasicSerde> for EitherSerde {
62 fn from(value: BasicSerde) -> Self {
63 Self(Either::Left(value))
64 }
65}
66impl From<ColumnAwareSerde> for EitherSerde {
67 fn from(value: ColumnAwareSerde) -> Self {
68 Self(Either::Right(value))
69 }
70}
71
72impl ValueRowSerializer for EitherSerde {
73 fn serialize(&self, row: impl Row) -> Vec<u8> {
74 for_both!(&self.0, s => s.serialize(row))
75 }
76}
77
78impl ValueRowDeserializer for EitherSerde {
79 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
80 for_both!(&self.0, s => s.deserialize(encoded_bytes))
81 }
82}
83
84#[derive(Clone)]
86pub struct BasicSerializer;
87
88impl ValueRowSerializer for BasicSerializer {
89 fn serialize(&self, row: impl Row) -> Vec<u8> {
90 let mut buf = vec![];
91 for datum in row.iter() {
92 serialize_datum_into(datum, &mut buf);
93 }
94 buf
95 }
96}
97
98impl ValueRowDeserializer for BasicDeserializer {
99 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
100 Ok(self.deserialize(encoded_bytes)?.into_inner().into())
101 }
102}
103
104#[derive(Clone)]
106pub struct BasicSerde {
107 pub serializer: BasicSerializer,
108 pub deserializer: BasicDeserializer,
109}
110
111impl ValueRowSerializer for BasicSerde {
112 fn serialize(&self, row: impl Row) -> Vec<u8> {
113 self.serializer.serialize(row)
114 }
115}
116
117impl ValueRowDeserializer for BasicSerde {
118 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
119 Ok(self
120 .deserializer
121 .deserialize(encoded_bytes)?
122 .into_inner()
123 .into())
124 }
125}
126
127pub fn try_get_exact_serialize_datum_size(arr: &ArrayImpl) -> Option<usize> {
128 match arr {
129 ArrayImpl::Int16(_) => Some(2),
130 ArrayImpl::Int32(_) => Some(4),
131 ArrayImpl::Int64(_) => Some(8),
132 ArrayImpl::Serial(_) => Some(8),
133 ArrayImpl::Float32(_) => Some(4),
134 ArrayImpl::Float64(_) => Some(8),
135 ArrayImpl::Bool(_) => Some(1),
136 ArrayImpl::Decimal(_) => Some(estimate_serialize_decimal_size()),
137 ArrayImpl::Interval(_) => Some(estimate_serialize_interval_size()),
138 ArrayImpl::Date(_) => Some(estimate_serialize_date_size()),
139 ArrayImpl::Timestamp(_) => Some(estimate_serialize_timestamp_size()),
140 ArrayImpl::Time(_) => Some(estimate_serialize_time_size()),
141 _ => None,
142 }
143 .map(|x| x + 1)
144}
145
146pub fn serialize_datum(cell: impl ToDatumRef) -> Vec<u8> {
148 let mut buf: Vec<u8> = vec![];
149 serialize_datum_into(cell, &mut buf);
150 buf
151}
152
153pub fn serialize_datum_into(datum_ref: impl ToDatumRef, buf: &mut impl BufMut) {
155 if let Some(d) = datum_ref.to_datum_ref() {
156 buf.put_u8(1);
157 serialize_scalar(d, buf)
158 } else {
159 buf.put_u8(0);
160 }
161}
162
163pub fn estimate_serialize_datum_size(datum_ref: impl ToDatumRef) -> usize {
164 if let Some(d) = datum_ref.to_datum_ref() {
165 1 + estimate_serialize_scalar_size(d)
166 } else {
167 1
168 }
169}
170
171#[easy_ext::ext(DatumFromProtoExt)]
172impl Datum {
173 pub fn from_protobuf(proto: &PbDatum, data_type: &DataType) -> Result<Datum> {
175 deserialize_datum(proto.body.as_slice(), data_type)
176 }
177}
178
179#[easy_ext::ext(DatumToProtoExt)]
180impl<D: ToDatumRef> D {
181 pub fn to_protobuf(&self) -> PbDatum {
183 PbDatum {
184 body: serialize_datum(self),
185 }
186 }
187}
188
189pub fn deserialize_datum(mut data: impl Buf, ty: &DataType) -> Result<Datum> {
191 inner_deserialize_datum(&mut data, ty)
192}
193
194#[inline(always)]
196fn inner_deserialize_datum(data: &mut impl Buf, ty: &DataType) -> Result<Datum> {
197 let null_tag = data.get_u8();
198 match null_tag {
199 0 => Ok(None),
200 1 => Some(deserialize_value(ty, data)).transpose(),
201 _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
202 }
203}
204
205fn serialize_scalar(value: ScalarRefImpl<'_>, buf: &mut impl BufMut) {
206 match value {
207 ScalarRefImpl::Int16(v) => buf.put_i16_le(v),
208 ScalarRefImpl::Int32(v) => buf.put_i32_le(v),
209 ScalarRefImpl::Int64(v) => buf.put_i64_le(v),
210 ScalarRefImpl::Int256(v) => buf.put_slice(&v.to_le_bytes()),
211 ScalarRefImpl::Serial(v) => buf.put_i64_le(v.into_inner()),
212 ScalarRefImpl::Float32(v) => buf.put_f32_le(v.into_inner()),
213 ScalarRefImpl::Float64(v) => buf.put_f64_le(v.into_inner()),
214 ScalarRefImpl::Utf8(v) => serialize_str(v.as_bytes(), buf),
215 ScalarRefImpl::Bytea(v) => serialize_str(v, buf),
216 ScalarRefImpl::Bool(v) => buf.put_u8(v as u8),
217 ScalarRefImpl::Decimal(v) => serialize_decimal(&v, buf),
218 ScalarRefImpl::Interval(v) => serialize_interval(&v, buf),
219 ScalarRefImpl::Date(v) => serialize_date(v.0.num_days_from_ce(), buf),
220 ScalarRefImpl::Timestamp(v) => serialize_timestamp(
221 v.0.and_utc().timestamp(),
222 v.0.and_utc().timestamp_subsec_nanos(),
223 buf,
224 ),
225 ScalarRefImpl::Timestamptz(v) => buf.put_i64_le(v.timestamp_micros()),
226 ScalarRefImpl::Time(v) => {
227 serialize_time(v.0.num_seconds_from_midnight(), v.0.nanosecond(), buf)
228 }
229 ScalarRefImpl::Jsonb(v) => serialize_str(&v.value_serialize(), buf),
230 ScalarRefImpl::Struct(s) => serialize_struct(s, buf),
231 ScalarRefImpl::List(v) => serialize_list(v, buf),
232 ScalarRefImpl::Map(m) => serialize_list(m.into_inner(), buf),
233 ScalarRefImpl::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
234 }
235}
236
237fn estimate_serialize_scalar_size(value: ScalarRefImpl<'_>) -> usize {
238 match value {
239 ScalarRefImpl::Int16(_) => 2,
240 ScalarRefImpl::Int32(_) => 4,
241 ScalarRefImpl::Int64(_) => 8,
242 ScalarRefImpl::Int256(_) => 32,
243 ScalarRefImpl::Serial(_) => 8,
244 ScalarRefImpl::Float32(_) => 4,
245 ScalarRefImpl::Float64(_) => 8,
246 ScalarRefImpl::Utf8(v) => estimate_serialize_str_size(v.as_bytes()),
247 ScalarRefImpl::Bytea(v) => estimate_serialize_str_size(v),
248 ScalarRefImpl::Bool(_) => 1,
249 ScalarRefImpl::Decimal(_) => estimate_serialize_decimal_size(),
250 ScalarRefImpl::Interval(_) => estimate_serialize_interval_size(),
251 ScalarRefImpl::Date(_) => estimate_serialize_date_size(),
252 ScalarRefImpl::Timestamp(_) => estimate_serialize_timestamp_size(),
253 ScalarRefImpl::Timestamptz(_) => 8,
254 ScalarRefImpl::Time(_) => estimate_serialize_time_size(),
255 ScalarRefImpl::Jsonb(v) => v.capacity(),
257 ScalarRefImpl::Struct(s) => estimate_serialize_struct_size(s),
258 ScalarRefImpl::List(v) => estimate_serialize_list_size(v),
259 ScalarRefImpl::Map(v) => estimate_serialize_list_size(v.into_inner()),
260 ScalarRefImpl::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
261 }
262}
263
264fn serialize_struct(value: StructRef<'_>, buf: &mut impl BufMut) {
265 value.iter_fields_ref().for_each(|field_value| {
266 serialize_datum_into(field_value, buf);
267 });
268}
269
270fn estimate_serialize_struct_size(s: StructRef<'_>) -> usize {
271 s.estimate_serialize_size_inner()
272}
273fn serialize_list(value: ListRef<'_>, buf: &mut impl BufMut) {
274 let elems = value.iter();
275 buf.put_u32_le(elems.len() as u32);
276
277 elems.for_each(|field_value| {
278 serialize_datum_into(field_value, buf);
279 });
280}
281fn estimate_serialize_list_size(list: ListRef<'_>) -> usize {
282 4 + list.estimate_serialize_size_inner()
283}
284
285fn serialize_str(bytes: &[u8], buf: &mut impl BufMut) {
286 buf.put_u32_le(bytes.len() as u32);
287 buf.put_slice(bytes);
288}
289
290fn estimate_serialize_str_size(bytes: &[u8]) -> usize {
291 4 + bytes.len()
292}
293
294fn serialize_interval(interval: &Interval, buf: &mut impl BufMut) {
295 buf.put_i32_le(interval.months());
296 buf.put_i32_le(interval.days());
297 buf.put_i64_le(interval.usecs());
298}
299
300fn estimate_serialize_interval_size() -> usize {
301 4 + 4 + 8
302}
303
304fn serialize_date(days: i32, buf: &mut impl BufMut) {
305 buf.put_i32_le(days);
306}
307
308fn estimate_serialize_date_size() -> usize {
309 4
310}
311
312fn serialize_timestamp(secs: i64, nsecs: u32, buf: &mut impl BufMut) {
313 buf.put_i64_le(secs);
314 buf.put_u32_le(nsecs);
315}
316
317fn estimate_serialize_timestamp_size() -> usize {
318 8 + 4
319}
320
321fn serialize_time(secs: u32, nano: u32, buf: &mut impl BufMut) {
322 buf.put_u32_le(secs);
323 buf.put_u32_le(nano);
324}
325
326fn estimate_serialize_time_size() -> usize {
327 4 + 4
328}
329
330fn serialize_decimal(decimal: &Decimal, buf: &mut impl BufMut) {
331 buf.put_slice(&decimal.unordered_serialize());
332}
333
334fn estimate_serialize_decimal_size() -> usize {
335 16
336}
337
338fn deserialize_value(ty: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
339 Ok(match ty {
340 DataType::Int16 => ScalarImpl::Int16(data.get_i16_le()),
341 DataType::Int32 => ScalarImpl::Int32(data.get_i32_le()),
342 DataType::Int64 => ScalarImpl::Int64(data.get_i64_le()),
343 DataType::Int256 => ScalarImpl::Int256(deserialize_int256(data)),
344 DataType::Serial => ScalarImpl::Serial(Serial::from(data.get_i64_le())),
345 DataType::Float32 => ScalarImpl::Float32(F32::from(data.get_f32_le())),
346 DataType::Float64 => ScalarImpl::Float64(F64::from(data.get_f64_le())),
347 DataType::Varchar => ScalarImpl::Utf8(deserialize_str(data)?),
348 DataType::Boolean => ScalarImpl::Bool(deserialize_bool(data)?),
349 DataType::Decimal => ScalarImpl::Decimal(deserialize_decimal(data)?),
350 DataType::Interval => ScalarImpl::Interval(deserialize_interval(data)?),
351 DataType::Time => ScalarImpl::Time(deserialize_time(data)?),
352 DataType::Timestamp => ScalarImpl::Timestamp(deserialize_timestamp(data)?),
353 DataType::Timestamptz => {
354 ScalarImpl::Timestamptz(Timestamptz::from_micros(data.get_i64_le()))
355 }
356 DataType::Date => ScalarImpl::Date(deserialize_date(data)?),
357 DataType::Jsonb => ScalarImpl::Jsonb(
358 JsonbVal::value_deserialize(&deserialize_bytea(data))
359 .ok_or(ValueEncodingError::InvalidJsonbEncoding)?,
360 ),
361 DataType::Struct(struct_def) => deserialize_struct(struct_def, data)?,
362 DataType::Bytea => ScalarImpl::Bytea(deserialize_bytea(data).into()),
363 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
364 DataType::List(item_type) => deserialize_list(item_type, data)?,
365 DataType::Map(map_type) => {
366 let list = deserialize_list(&map_type.clone().into_struct(), data)?.into_list();
368 ScalarImpl::Map(MapValue::from_entries(list))
369 }
370 })
371}
372
373fn deserialize_struct(struct_def: &StructType, data: &mut impl Buf) -> Result<ScalarImpl> {
374 let mut field_values = Vec::with_capacity(struct_def.len());
375 for field_type in struct_def.types() {
376 field_values.push(inner_deserialize_datum(data, field_type)?);
377 }
378
379 Ok(ScalarImpl::Struct(StructValue::new(field_values)))
380}
381
382fn deserialize_list(item_type: &DataType, data: &mut impl Buf) -> Result<ScalarImpl> {
383 let len = data.get_u32_le();
384 let mut builder = item_type.create_array_builder(len as usize);
385 for _ in 0..len {
386 builder.append(inner_deserialize_datum(data, item_type)?);
387 }
388 Ok(ScalarImpl::List(ListValue::new(builder.finish())))
389}
390
391fn deserialize_str(data: &mut impl Buf) -> Result<Box<str>> {
392 let len = data.get_u32_le();
393 let mut bytes = vec![0; len as usize];
394 data.copy_to_slice(&mut bytes);
395 String::from_utf8(bytes)
396 .map(String::into_boxed_str)
397 .map_err(ValueEncodingError::InvalidUtf8)
398}
399
400fn deserialize_bytea(data: &mut impl Buf) -> Vec<u8> {
401 let len = data.get_u32_le();
402 let mut bytes = vec![0; len as usize];
403 data.copy_to_slice(&mut bytes);
404 bytes
405}
406
407fn deserialize_int256(data: &mut impl Buf) -> Int256 {
408 let mut bytes = [0; Int256::size()];
409 data.copy_to_slice(&mut bytes);
410 Int256::from_le_bytes(bytes)
411}
412
413fn deserialize_bool(data: &mut impl Buf) -> Result<bool> {
414 match data.get_u8() {
415 1 => Ok(true),
416 0 => Ok(false),
417 value => Err(ValueEncodingError::InvalidBoolEncoding(value)),
418 }
419}
420
421fn deserialize_interval(data: &mut impl Buf) -> Result<Interval> {
422 let months = data.get_i32_le();
423 let days = data.get_i32_le();
424 let usecs = data.get_i64_le();
425 Ok(Interval::from_month_day_usec(months, days, usecs))
426}
427
428fn deserialize_time(data: &mut impl Buf) -> Result<Time> {
429 let secs = data.get_u32_le();
430 let nano = data.get_u32_le();
431 Time::with_secs_nano(secs, nano)
432 .map_err(|_e| ValueEncodingError::InvalidTimeEncoding(secs, nano))
433}
434
435fn deserialize_timestamp(data: &mut impl Buf) -> Result<Timestamp> {
436 let secs = data.get_i64_le();
437 let nsecs = data.get_u32_le();
438 Timestamp::with_secs_nsecs(secs, nsecs)
439 .map_err(|_e| ValueEncodingError::InvalidTimestampEncoding(secs, nsecs))
440}
441
442fn deserialize_date(data: &mut impl Buf) -> Result<Date> {
443 let days = data.get_i32_le();
444 Date::with_days_since_ce(days).map_err(|_e| ValueEncodingError::InvalidDateEncoding(days))
445}
446
447fn deserialize_decimal(data: &mut impl Buf) -> Result<Decimal> {
448 let mut bytes = [0; 16];
449 data.copy_to_slice(&mut bytes);
450 Ok(Decimal::unordered_deserialize(bytes))
451}
452
453#[cfg(test)]
454mod tests {
455 use crate::array::{ArrayImpl, ListValue, StructValue};
456 use crate::test_utils::rand_chunk;
457 use crate::types::{
458 DataType, Date, Datum, Decimal, Interval, ScalarImpl, Serial, Time, Timestamp,
459 };
460 use crate::util::value_encoding::{
461 estimate_serialize_datum_size, serialize_datum, try_get_exact_serialize_datum_size,
462 };
463
464 fn test_estimate_serialize_scalar_size(s: ScalarImpl) {
465 let d = Datum::from(s);
466 assert_eq!(estimate_serialize_datum_size(&d), serialize_datum(&d).len());
467 }
468
469 fn test_try_get_exact_serialize_datum_size(s: &ArrayImpl) {
470 let d = s.to_datum();
471 if let Some(ret) = try_get_exact_serialize_datum_size(s) {
472 assert_eq!(ret, serialize_datum(&d).len());
473 }
474 }
475
476 #[test]
477 fn test_estimate_size() {
478 let d: Datum = None;
479 assert_eq!(estimate_serialize_datum_size(&d), serialize_datum(&d).len());
480
481 test_estimate_serialize_scalar_size(ScalarImpl::Bool(true));
482 test_estimate_serialize_scalar_size(ScalarImpl::Int16(1));
483 test_estimate_serialize_scalar_size(ScalarImpl::Int32(1));
484 test_estimate_serialize_scalar_size(ScalarImpl::Int64(1));
485 test_estimate_serialize_scalar_size(ScalarImpl::Float32(1.0.into()));
486 test_estimate_serialize_scalar_size(ScalarImpl::Float64(1.0.into()));
487 test_estimate_serialize_scalar_size(ScalarImpl::Serial(Serial::from(i64::MIN)));
488
489 test_estimate_serialize_scalar_size(ScalarImpl::Utf8("abc".into()));
490 test_estimate_serialize_scalar_size(ScalarImpl::Utf8("".into()));
491 test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::NegativeInf));
492 test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::PositiveInf));
493 test_estimate_serialize_scalar_size(ScalarImpl::Decimal(Decimal::NaN));
494 test_estimate_serialize_scalar_size(ScalarImpl::Decimal(123123.into()));
495 test_estimate_serialize_scalar_size(ScalarImpl::Interval(Interval::from_month_day_usec(
496 7, 8, 9,
497 )));
498 test_estimate_serialize_scalar_size(ScalarImpl::Date(Date::from_ymd_uncheck(2333, 3, 3)));
499 test_estimate_serialize_scalar_size(ScalarImpl::Bytea("\\x233".as_bytes().into()));
500 test_estimate_serialize_scalar_size(ScalarImpl::Time(Time::from_hms_uncheck(2, 3, 3)));
501 test_estimate_serialize_scalar_size(ScalarImpl::Timestamp(
502 Timestamp::from_timestamp_uncheck(23333333, 2333),
503 ));
504 test_estimate_serialize_scalar_size(ScalarImpl::Interval(Interval::from_month_day_usec(
505 2, 3, 3333,
506 )));
507 test_estimate_serialize_scalar_size(ScalarImpl::Struct(StructValue::new(vec![
508 ScalarImpl::Int64(233).into(),
509 ScalarImpl::Float64(23.33.into()).into(),
510 ])));
511 test_estimate_serialize_scalar_size(ScalarImpl::List(ListValue::from_iter([233i64, 2333])));
512 }
513
514 #[test]
515 fn test_try_estimate_size() {
516 let chunk = rand_chunk::gen_chunk(
517 &[
518 DataType::Int16,
519 DataType::Int32,
520 DataType::Int64,
521 DataType::Serial,
522 DataType::Float32,
523 DataType::Float64,
524 DataType::Boolean,
525 DataType::Decimal,
526 DataType::Interval,
527 DataType::Time,
528 DataType::Timestamp,
529 DataType::Date,
530 ],
531 1,
532 0,
533 0.0,
534 );
535 for column in chunk.columns() {
536 test_try_get_exact_serialize_datum_size(column);
537 }
538 }
539}