1use std::collections::HashSet;
23use std::sync::Arc;
24
25use ahash::HashMap;
26use bitfield_struct::bitfield;
27use bytes::{Buf, BufMut};
28use rw_iter_util::ZipEqDebug;
29use smallvec::{SmallVec, smallvec};
30
31use super::error::ValueEncodingError;
32use super::{Result, ValueRowDeserializer, ValueRowSerializer};
33use crate::catalog::ColumnId;
34use crate::row::Row;
35use crate::types::{DataType, Datum, ScalarRefImpl, StructType, ToDatumRef};
36use crate::util::value_encoding as plain;
37
38mod new_serde {
43 use super::*;
44 use crate::array::{ListRef, ListValue, MapRef, MapValue, StructRef, StructValue};
45 use crate::types::{MapType, ScalarImpl, StructType, data_types};
46
47 fn new_serialize_datum(
51 data_type: &DataType,
52 datum_ref: impl ToDatumRef,
53 buf: &mut impl BufMut,
54 ) {
55 if let Some(d) = datum_ref.to_datum_ref() {
56 buf.put_u8(1);
57 new_serialize_scalar(data_type, d, buf)
58 } else {
59 buf.put_u8(0);
60 }
61 }
62
63 fn new_serialize_struct(struct_type: &StructType, value: StructRef<'_>, buf: &mut impl BufMut) {
67 let serializer = super::Serializer::from_struct(struct_type.clone()); let bytes = serializer.serialize(value); buf.put_u32_le(bytes.len() as _);
72 buf.put_slice(&bytes);
73 }
74
75 fn new_serialize_list(inner_type: &DataType, value: ListRef<'_>, buf: &mut impl BufMut) {
77 let elems = value.iter();
78 buf.put_u32_le(elems.len() as u32);
79
80 elems.for_each(|field_value| {
81 new_serialize_datum(inner_type, field_value, buf);
82 });
83 }
84
85 fn new_serialize_map(map_type: &MapType, value: MapRef<'_>, buf: &mut impl BufMut) {
90 let elems = value.iter();
91 buf.put_u32_le(elems.len() as u32);
92
93 elems.for_each(|(k, v)| {
94 new_serialize_scalar(map_type.key(), k, buf);
95 new_serialize_datum(map_type.value(), v, buf);
96 });
97 }
98
99 pub fn new_serialize_scalar(
101 data_type: &DataType,
102 value: ScalarRefImpl<'_>,
103 buf: &mut impl BufMut,
104 ) {
105 match value {
106 ScalarRefImpl::Struct(s) => new_serialize_struct(data_type.as_struct(), s, buf),
107 ScalarRefImpl::List(l) => new_serialize_list(data_type.as_list_element_type(), l, buf),
108 ScalarRefImpl::Map(m) => new_serialize_map(data_type.as_map(), m, buf),
109
110 _ => plain::serialize_scalar(value, buf),
111 }
112 }
113
114 fn new_inner_deserialize_datum(data: &mut &[u8], ty: &DataType) -> Result<Datum> {
118 let null_tag = data.get_u8();
119 match null_tag {
120 0 => Ok(None),
121 1 => Some(new_deserialize_scalar(ty, data)).transpose(),
122 _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
123 }
124 }
125
126 fn new_deserialize_struct(struct_def: &StructType, data: &mut &[u8]) -> Result<ScalarImpl> {
130 let deserializer = super::Deserializer::from_struct(struct_def.clone()); let encoded_len = data.get_u32_le() as usize;
132
133 let (struct_data, remaining) = data.split_at(encoded_len);
134 *data = remaining;
135 let fields = deserializer.deserialize(struct_data)?;
136
137 Ok(ScalarImpl::Struct(StructValue::new(fields)))
138 }
139
140 fn new_deserialize_list(item_type: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
142 let len = data.get_u32_le();
143 let mut builder = item_type.create_array_builder(len as usize);
144 for _ in 0..len {
145 builder.append(new_inner_deserialize_datum(data, item_type)?);
146 }
147 Ok(ScalarImpl::List(ListValue::new(builder.finish())))
148 }
149
150 fn new_deserialize_map(map_type: &MapType, data: &mut &[u8]) -> Result<ScalarImpl> {
155 let len = data.get_u32_le();
156 let mut builder = map_type
157 .clone() .into_struct()
159 .create_array_builder(len as usize);
160 for _ in 0..len {
161 let key = new_deserialize_scalar(map_type.key(), data)?;
162 let value = new_inner_deserialize_datum(data, map_type.value())?;
163 let entry = StructValue::new(vec![Some(key), value]);
164 builder.append(Some(ScalarImpl::Struct(entry)));
165 }
166 Ok(ScalarImpl::Map(MapValue::from_entries(ListValue::new(
167 builder.finish(),
168 ))))
169 }
170
171 pub fn new_deserialize_scalar(ty: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
173 Ok(match ty {
174 DataType::Struct(struct_def) => new_deserialize_struct(struct_def, data)?,
175 DataType::List(item_type) => new_deserialize_list(item_type, data)?,
176 DataType::Map(map_type) => new_deserialize_map(map_type, data)?,
177 data_types::simple!() => plain::deserialize_value(ty, data)?,
178 })
179 }
180}
181
182const COLUMN_ON_STACK: usize = 8;
184
185#[derive(Debug, Clone, Copy, PartialEq, Eq)]
187#[repr(u8)]
188enum OffsetWidth {
189 Offset8 = 0b01,
191 Offset16 = 0b10,
193 Offset32 = 0b11,
195}
196
197impl OffsetWidth {
198 const fn width(self) -> usize {
200 match self {
201 OffsetWidth::Offset8 => 1,
202 OffsetWidth::Offset16 => 2,
203 OffsetWidth::Offset32 => 4,
204 }
205 }
206
207 const fn into_bits(self) -> u8 {
208 self as u8
209 }
210
211 const fn from_bits(bits: u8) -> Self {
212 match bits {
213 0b01 => OffsetWidth::Offset8,
214 0b10 => OffsetWidth::Offset16,
215 0b11 => OffsetWidth::Offset32,
216 _ => panic!("invalid offset width bits"),
217 }
218 }
219}
220
221#[bitfield(u8, order = Msb)]
230#[derive(PartialEq, Eq)]
231struct Header {
232 #[bits(1, default = true, access = RO)]
236 magic: bool,
237
238 #[bits(5)]
239 _reserved: u8,
240
241 #[bits(2, default = OffsetWidth::Offset8)]
243 offset: OffsetWidth,
244}
245
246struct RowEncoding {
248 header: Header,
249 offsets: SmallVec<[u8; COLUMN_ON_STACK * 2]>,
250 data: Vec<u8>,
251}
252
253trait Encode {
255 fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>);
256}
257
258impl<T> Encode for T
259where
260 T: ToDatumRef,
261{
262 fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>) {
263 if let Some(v) = self.to_datum_ref() {
264 let curr_len = data.len();
265
266 if data_type.can_alter() == Some(true) {
268 new_serde::new_serialize_scalar(data_type, v, data);
269 } else {
270 plain::serialize_scalar(v, data);
271 }
272
273 debug_assert_ne!(
276 data.len(),
277 curr_len,
278 "scalar ({v:?}) should not be encoded to empty bytes, as it will be indistinguishable from NULL",
279 );
280 }
281 }
282}
283
284impl Encode for Option<&[u8]> {
285 fn encode_to(self, _data_type: &DataType, data: &mut Vec<u8>) {
286 if let Some(v) = self {
288 data.extend(v);
289 }
290 }
291}
292
293impl RowEncoding {
294 fn new() -> Self {
295 RowEncoding {
296 header: Header::new(),
297 offsets: Default::default(),
298 data: Default::default(),
299 }
300 }
301
302 fn set_offsets(&mut self, usize_offsets: &[usize]) {
303 debug_assert!(
304 self.offsets.is_empty(),
305 "should not set offsets multiple times"
306 );
307
308 let max_offset = usize_offsets.last().copied().unwrap_or(0);
310
311 const U8_MAX: usize = u8::MAX as usize;
312 const U16_MAX: usize = u16::MAX as usize;
313 const U32_MAX: usize = u32::MAX as usize;
314
315 let offset_width = match max_offset {
316 _n @ ..=U8_MAX => OffsetWidth::Offset8,
317 _n @ ..=U16_MAX => OffsetWidth::Offset16,
318 _n @ ..=U32_MAX => OffsetWidth::Offset32,
319 _ => panic!("encoding length {} exceeds u32", max_offset),
320 };
321 self.header.set_offset(offset_width);
322
323 self.offsets
324 .resize(usize_offsets.len() * offset_width.width(), 0);
325
326 let mut offsets_buf = &mut self.offsets[..];
327 for &offset in usize_offsets {
328 offsets_buf.put_uint_le(offset as u64, offset_width.width());
329 }
330 }
331
332 fn encode<T: Encode>(
333 &mut self,
334 datums: impl IntoIterator<Item = T>,
335 data_types: impl IntoIterator<Item = &DataType>,
336 ) {
337 debug_assert!(
338 self.data.is_empty(),
339 "should not encode one RowEncoding object multiple times."
340 );
341 let datums = datums.into_iter();
342 let mut offset_usize =
343 SmallVec::<[usize; COLUMN_ON_STACK]>::with_capacity(datums.size_hint().0);
344 for (datum, data_type) in datums.zip_eq_debug(data_types) {
345 offset_usize.push(self.data.len());
346 datum.encode_to(data_type, &mut self.data);
347 }
348 self.set_offsets(&offset_usize);
349 }
350}
351
352mod data_types {
353 use crate::types::{DataType, StructType};
354
355 pub trait DataTypes: Clone {
357 fn iter(&self) -> impl ExactSizeIterator<Item = &DataType>;
358 fn at(&self, index: usize) -> &DataType;
359 }
360
361 impl<T> DataTypes for T
362 where
363 T: AsRef<[DataType]> + Clone,
364 {
365 fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
366 self.as_ref().iter()
367 }
368
369 fn at(&self, index: usize) -> &DataType {
370 &self.as_ref()[index]
371 }
372 }
373
374 impl DataTypes for StructType {
375 fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
376 self.types()
377 }
378
379 fn at(&self, index: usize) -> &DataType {
380 self.type_at(index)
381 }
382 }
383}
384use data_types::DataTypes;
385
386#[derive(Clone)]
389pub struct Serializer<D: DataTypes = Vec<DataType>> {
390 encoded_column_ids: EncodedColumnIds,
391 data_types: D,
392}
393
394type EncodedColumnIds = SmallVec<[u8; COLUMN_ON_STACK * 4]>;
395
396fn encode_column_ids(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> EncodedColumnIds {
397 let mut encoded_column_ids = smallvec![0; column_ids.len() * 4];
399 let mut buf = &mut encoded_column_ids[..];
400 for id in column_ids {
401 buf.put_i32_le(id.get_id());
402 }
403 encoded_column_ids
404}
405
406impl Serializer {
407 pub fn new(column_ids: &[ColumnId], data_types: impl IntoIterator<Item = DataType>) -> Self {
409 Self {
410 encoded_column_ids: encode_column_ids(column_ids.iter().copied()),
411 data_types: data_types.into_iter().collect(),
412 }
413 }
414}
415
416impl Serializer<StructType> {
417 pub fn from_struct(struct_type: StructType) -> Self {
421 Self {
422 encoded_column_ids: encode_column_ids(struct_type.ids().unwrap()),
423 data_types: struct_type,
424 }
425 }
426}
427
428impl<D: DataTypes> Serializer<D> {
429 fn datum_num(&self) -> usize {
430 self.encoded_column_ids.len() / 4
431 }
432
433 fn serialize_raw<T: Encode>(&self, datums: impl IntoIterator<Item = T>) -> Vec<u8> {
434 let mut encoding = RowEncoding::new();
435 encoding.encode(datums, self.data_types.iter());
436 self.finish(encoding)
437 }
438
439 fn finish(&self, encoding: RowEncoding) -> Vec<u8> {
440 let mut row_bytes = Vec::with_capacity(
441 5 + self.encoded_column_ids.len() + encoding.offsets.len() + encoding.data.len(), );
443 row_bytes.put_u8(encoding.header.into_bits());
444 row_bytes.put_u32_le(self.datum_num() as u32);
445 row_bytes.extend(&self.encoded_column_ids);
446 row_bytes.extend(&encoding.offsets);
447 row_bytes.extend(&encoding.data);
448
449 row_bytes
450 }
451}
452
453impl<D: DataTypes> ValueRowSerializer for Serializer<D> {
454 fn serialize(&self, row: impl Row) -> Vec<u8> {
456 assert_eq!(row.len(), self.datum_num());
457 self.serialize_raw(row.iter())
458 }
459}
460
461#[derive(Clone)]
465struct EncodedBytes<'a> {
466 header: Header,
467
468 column_ids: &'a [u8],
471 offsets: &'a [u8],
472 data: &'a [u8],
473}
474
475impl<'a> EncodedBytes<'a> {
476 fn new(mut encoded_bytes: &'a [u8]) -> Result<Self> {
477 let header = Header::from_bits(encoded_bytes.get_u8());
478 if !header.magic() {
479 return Err(ValueEncodingError::InvalidFlag(header.into_bits()));
480 }
481 let offset_bytes = header.offset().width();
482
483 let datum_num = encoded_bytes.get_u32_le() as usize;
484 let offsets_start_idx = 4 * datum_num;
485 let data_start_idx = offsets_start_idx + datum_num * offset_bytes;
486
487 Ok(EncodedBytes {
488 header,
489 column_ids: &encoded_bytes[..offsets_start_idx],
490 offsets: &encoded_bytes[offsets_start_idx..data_start_idx],
491 data: &encoded_bytes[data_start_idx..],
492 })
493 }
494}
495
496impl<'a> Iterator for EncodedBytes<'a> {
497 type Item = (i32, &'a [u8]);
498
499 fn next(&mut self) -> Option<Self::Item> {
500 if self.column_ids.is_empty() {
501 assert!(self.offsets.is_empty());
502 return None;
503 }
504
505 let id = self.column_ids.get_i32_le();
506
507 let offset_width = self.header.offset().width();
508 let get_offset = |offsets: &mut &[u8]| offsets.get_uint_le(offset_width) as usize;
509
510 let this_offset = get_offset(&mut self.offsets);
511 let next_offset = if self.offsets.is_empty() {
512 self.data.len()
513 } else {
514 let mut peek_offsets = self.offsets; get_offset(&mut peek_offsets)
516 };
517
518 let data = &self.data[this_offset..next_offset];
519
520 Some((id, data))
521 }
522}
523
524#[derive(Clone)]
527pub struct Deserializer<D: DataTypes = Arc<[DataType]>> {
528 mapping: ColumnMapping,
529 data_types: D,
530
531 default_row: Option<Vec<Datum>>,
535}
536
537#[derive(Clone)]
539enum ColumnMapping {
540 Small(SmallVec<[i32; COLUMN_ON_STACK]>),
542 Large(HashMap<i32, usize>),
544}
545
546impl ColumnMapping {
547 fn new(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> Self {
548 if column_ids.len() <= COLUMN_ON_STACK {
549 Self::Small(column_ids.map(|c| c.get_id()).collect())
550 } else {
551 Self::Large(
552 column_ids
553 .enumerate()
554 .map(|(i, c)| (c.get_id(), i))
555 .collect(),
556 )
557 }
558 }
559
560 fn get(&self, id: i32) -> Option<usize> {
561 match self {
562 ColumnMapping::Small(vec) => vec.iter().position(|&x| x == id),
563 ColumnMapping::Large(map) => map.get(&id).copied(),
564 }
565 }
566}
567
568impl Deserializer {
569 pub fn new(
570 column_ids: &[ColumnId],
571 schema: Arc<[DataType]>,
572 column_with_default: impl Iterator<Item = (usize, Datum)>,
573 ) -> Self {
574 assert_eq!(column_ids.len(), schema.len());
575 let mut default_row: Vec<Datum> = vec![None; schema.len()];
576 for (i, datum) in column_with_default {
577 default_row[i] = datum;
578 }
579 Self {
580 mapping: ColumnMapping::new(column_ids.iter().copied()),
581 data_types: schema,
582 default_row: Some(default_row),
583 }
584 }
585}
586
587impl Deserializer<StructType> {
588 pub fn from_struct(struct_type: StructType) -> Self {
592 Self {
593 mapping: ColumnMapping::new(struct_type.ids().unwrap()),
594 data_types: struct_type,
595 default_row: None,
596 }
597 }
598}
599
600impl<D: DataTypes> ValueRowDeserializer for Deserializer<D> {
601 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
602 let encoded_bytes = EncodedBytes::new(encoded_bytes)?;
603
604 let mut row =
605 (self.default_row.clone()).unwrap_or_else(|| vec![None; self.data_types.iter().len()]);
606
607 for (id, mut data) in encoded_bytes {
608 let Some(decoded_idx) = self.mapping.get(id) else {
609 continue;
610 };
611 let data_type = self.data_types.at(decoded_idx);
612
613 let datum = if data.is_empty() {
614 None
615 } else if data_type.can_alter() == Some(true) {
616 Some(new_serde::new_deserialize_scalar(data_type, &mut data)?)
617 } else {
618 Some(plain::deserialize_value(data_type, &mut data)?)
619 };
620
621 row[decoded_idx] = datum;
622 }
623
624 Ok(row)
625 }
626}
627
628#[derive(Clone)]
631pub struct ColumnAwareSerde {
632 pub serializer: Serializer,
633 pub deserializer: Deserializer,
634}
635
636impl ValueRowSerializer for ColumnAwareSerde {
637 fn serialize(&self, row: impl Row) -> Vec<u8> {
638 self.serializer.serialize(row)
639 }
640}
641
642impl ValueRowDeserializer for ColumnAwareSerde {
643 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
644 self.deserializer.deserialize(encoded_bytes)
645 }
646}
647
648pub fn try_drop_invalid_columns(
652 encoded_bytes: &[u8],
653 valid_column_ids: &HashSet<i32>,
654) -> Option<Vec<u8>> {
655 let encoded_bytes = EncodedBytes::new(encoded_bytes).ok()?;
656
657 let has_invalid_column = (encoded_bytes.clone()).any(|(id, _)| !valid_column_ids.contains(&id));
658 if !has_invalid_column {
659 return None;
660 }
661
662 let mut datums = Vec::with_capacity(valid_column_ids.len());
665 let mut column_ids = Vec::with_capacity(valid_column_ids.len());
666
667 for (id, data) in encoded_bytes {
668 if valid_column_ids.contains(&id) {
669 column_ids.push(ColumnId::new(id));
670 datums.push(if data.is_empty() { None } else { Some(data) });
671 }
672 }
673
674 let dummy_data_types = vec![DataType::Boolean; column_ids.len()];
677
678 let row_bytes = Serializer::new(&column_ids, dummy_data_types).serialize_raw(datums);
679 Some(row_bytes)
680}