1use std::collections::HashSet;
23use std::sync::Arc;
24
25use ahash::HashMap;
26use bitfield_struct::bitfield;
27use bytes::{Buf, BufMut};
28use rw_iter_util::ZipEqDebug;
29use smallvec::{SmallVec, smallvec};
30
31use super::error::ValueEncodingError;
32use super::{Result, ValueRowDeserializer, ValueRowSerializer};
33use crate::catalog::ColumnId;
34use crate::row::Row;
35use crate::types::{DataType, Datum, ScalarRefImpl, StructType, ToDatumRef};
36use crate::util::value_encoding as plain;
37
38mod new_serde {
43 use super::*;
44 use crate::array::{ListRef, ListValue, MapRef, MapValue, StructRef, StructValue};
45 use crate::types::{ListType, MapType, ScalarImpl, StructType, data_types};
46
47 fn new_serialize_datum(
51 data_type: &DataType,
52 datum_ref: impl ToDatumRef,
53 buf: &mut impl BufMut,
54 ) {
55 if let Some(d) = datum_ref.to_datum_ref() {
56 buf.put_u8(1);
57 new_serialize_scalar(data_type, d, buf)
58 } else {
59 buf.put_u8(0);
60 }
61 }
62
63 fn new_serialize_struct(struct_type: &StructType, value: StructRef<'_>, buf: &mut impl BufMut) {
67 let serializer = super::Serializer::from_struct(struct_type.clone()); let bytes = serializer.serialize(value); buf.put_u32_le(bytes.len() as _);
72 buf.put_slice(&bytes);
73 }
74
75 fn new_serialize_list(list_type: &ListType, value: ListRef<'_>, buf: &mut impl BufMut) {
77 let elems = value.iter();
78 buf.put_u32_le(elems.len() as u32);
79
80 elems.for_each(|field_value| {
81 new_serialize_datum(list_type.elem(), field_value, buf);
82 });
83 }
84
85 fn new_serialize_map(map_type: &MapType, value: MapRef<'_>, buf: &mut impl BufMut) {
90 let elems = value.iter();
91 buf.put_u32_le(elems.len() as u32);
92
93 elems.for_each(|(k, v)| {
94 new_serialize_scalar(map_type.key(), k, buf);
95 new_serialize_datum(map_type.value(), v, buf);
96 });
97 }
98
99 pub fn new_serialize_scalar(
101 data_type: &DataType,
102 value: ScalarRefImpl<'_>,
103 buf: &mut impl BufMut,
104 ) {
105 match value {
106 ScalarRefImpl::Struct(s) => new_serialize_struct(data_type.as_struct(), s, buf),
107 ScalarRefImpl::List(l) => new_serialize_list(data_type.as_list(), l, buf),
108 ScalarRefImpl::Map(m) => new_serialize_map(data_type.as_map(), m, buf),
109
110 _ => plain::serialize_scalar(value, buf),
111 }
112 }
113
114 fn new_inner_deserialize_datum(data: &mut &[u8], ty: &DataType) -> Result<Datum> {
118 let null_tag = data.get_u8();
119 match null_tag {
120 0 => Ok(None),
121 1 => Some(new_deserialize_scalar(ty, data)).transpose(),
122 _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
123 }
124 }
125
126 fn new_deserialize_struct(struct_def: &StructType, data: &mut &[u8]) -> Result<ScalarImpl> {
130 let deserializer = super::Deserializer::from_struct(struct_def.clone()); let encoded_len = data.get_u32_le() as usize;
132
133 let (struct_data, remaining) = data.split_at(encoded_len);
134 *data = remaining;
135 let fields = deserializer.deserialize(struct_data)?;
136
137 Ok(ScalarImpl::Struct(StructValue::new(fields)))
138 }
139
140 fn new_deserialize_list(list_type: &ListType, data: &mut &[u8]) -> Result<ScalarImpl> {
142 let elem_type = list_type.elem();
143 let len = data.get_u32_le();
144 let mut builder = elem_type.create_array_builder(len as usize);
145 for _ in 0..len {
146 builder.append(new_inner_deserialize_datum(data, elem_type)?);
147 }
148 Ok(ScalarImpl::List(ListValue::new(builder.finish())))
149 }
150
151 fn new_deserialize_map(map_type: &MapType, data: &mut &[u8]) -> Result<ScalarImpl> {
156 let len = data.get_u32_le();
157 let mut builder = map_type
158 .clone() .into_struct()
160 .create_array_builder(len as usize);
161 for _ in 0..len {
162 let key = new_deserialize_scalar(map_type.key(), data)?;
163 let value = new_inner_deserialize_datum(data, map_type.value())?;
164 let entry = StructValue::new(vec![Some(key), value]);
165 builder.append(Some(ScalarImpl::Struct(entry)));
166 }
167 Ok(ScalarImpl::Map(MapValue::from_entries(ListValue::new(
168 builder.finish(),
169 ))))
170 }
171
172 pub fn new_deserialize_scalar(ty: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
174 Ok(match ty {
175 DataType::Struct(struct_def) => new_deserialize_struct(struct_def, data)?,
176 DataType::List(list_type) => new_deserialize_list(list_type, data)?,
177 DataType::Map(map_type) => new_deserialize_map(map_type, data)?,
178 data_types::simple!() => plain::deserialize_value(ty, data)?,
179 })
180 }
181}
182
183const COLUMN_ON_STACK: usize = 8;
185
186#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188#[repr(u8)]
189enum OffsetWidth {
190 Offset8 = 0b01,
192 Offset16 = 0b10,
194 Offset32 = 0b11,
196}
197
198impl OffsetWidth {
199 const fn width(self) -> usize {
201 match self {
202 OffsetWidth::Offset8 => 1,
203 OffsetWidth::Offset16 => 2,
204 OffsetWidth::Offset32 => 4,
205 }
206 }
207
208 const fn into_bits(self) -> u8 {
209 self as u8
210 }
211
212 const fn from_bits(bits: u8) -> Self {
213 match bits {
214 0b01 => OffsetWidth::Offset8,
215 0b10 => OffsetWidth::Offset16,
216 0b11 => OffsetWidth::Offset32,
217 _ => panic!("invalid offset width bits"),
218 }
219 }
220}
221
222#[bitfield(u8, order = Msb)]
231#[derive(PartialEq, Eq)]
232struct Header {
233 #[bits(1, default = true, access = RO)]
237 magic: bool,
238
239 #[bits(5)]
240 _reserved: u8,
241
242 #[bits(2, default = OffsetWidth::Offset8)]
244 offset: OffsetWidth,
245}
246
247struct RowEncoding {
249 header: Header,
250 offsets: SmallVec<[u8; COLUMN_ON_STACK * 2]>,
251 data: Vec<u8>,
252}
253
254trait Encode {
256 fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>);
257}
258
259impl<T> Encode for T
260where
261 T: ToDatumRef,
262{
263 fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>) {
264 if let Some(v) = self.to_datum_ref() {
265 let curr_len = data.len();
266
267 if data_type.can_alter() == Some(true) {
269 new_serde::new_serialize_scalar(data_type, v, data);
270 } else {
271 plain::serialize_scalar(v, data);
272 }
273
274 debug_assert_ne!(
277 data.len(),
278 curr_len,
279 "scalar ({v:?}) should not be encoded to empty bytes, as it will be indistinguishable from NULL",
280 );
281 }
282 }
283}
284
285impl Encode for Option<&[u8]> {
286 fn encode_to(self, _data_type: &DataType, data: &mut Vec<u8>) {
287 if let Some(v) = self {
289 data.extend(v);
290 }
291 }
292}
293
294impl RowEncoding {
295 fn new() -> Self {
296 RowEncoding {
297 header: Header::new(),
298 offsets: Default::default(),
299 data: Default::default(),
300 }
301 }
302
303 fn set_offsets(&mut self, usize_offsets: &[usize]) {
304 debug_assert!(
305 self.offsets.is_empty(),
306 "should not set offsets multiple times"
307 );
308
309 let max_offset = usize_offsets.last().copied().unwrap_or(0);
311
312 const U8_MAX: usize = u8::MAX as usize;
313 const U16_MAX: usize = u16::MAX as usize;
314 const U32_MAX: usize = u32::MAX as usize;
315
316 let offset_width = match max_offset {
317 _n @ ..=U8_MAX => OffsetWidth::Offset8,
318 _n @ ..=U16_MAX => OffsetWidth::Offset16,
319 _n @ ..=U32_MAX => OffsetWidth::Offset32,
320 _ => panic!("encoding length {} exceeds u32", max_offset),
321 };
322 self.header.set_offset(offset_width);
323
324 self.offsets
325 .resize(usize_offsets.len() * offset_width.width(), 0);
326
327 let mut offsets_buf = &mut self.offsets[..];
328 for &offset in usize_offsets {
329 offsets_buf.put_uint_le(offset as u64, offset_width.width());
330 }
331 }
332
333 fn encode<T: Encode>(
334 &mut self,
335 datums: impl IntoIterator<Item = T>,
336 data_types: impl IntoIterator<Item = &DataType>,
337 ) {
338 debug_assert!(
339 self.data.is_empty(),
340 "should not encode one RowEncoding object multiple times."
341 );
342 let datums = datums.into_iter();
343 let mut offset_usize =
344 SmallVec::<[usize; COLUMN_ON_STACK]>::with_capacity(datums.size_hint().0);
345 for (datum, data_type) in datums.zip_eq_debug(data_types) {
346 offset_usize.push(self.data.len());
347 datum.encode_to(data_type, &mut self.data);
348 }
349 self.set_offsets(&offset_usize);
350 }
351}
352
353mod data_types {
354 use crate::types::{DataType, StructType};
355
356 pub trait DataTypes: Clone {
358 fn iter(&self) -> impl ExactSizeIterator<Item = &DataType>;
359 fn at(&self, index: usize) -> &DataType;
360 }
361
362 impl<T> DataTypes for T
363 where
364 T: AsRef<[DataType]> + Clone,
365 {
366 fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
367 self.as_ref().iter()
368 }
369
370 fn at(&self, index: usize) -> &DataType {
371 &self.as_ref()[index]
372 }
373 }
374
375 impl DataTypes for StructType {
376 fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
377 self.types()
378 }
379
380 fn at(&self, index: usize) -> &DataType {
381 self.type_at(index)
382 }
383 }
384}
385use data_types::DataTypes;
386
387#[derive(Clone)]
390pub struct Serializer<D: DataTypes = Vec<DataType>> {
391 encoded_column_ids: EncodedColumnIds,
392 data_types: D,
393}
394
395type EncodedColumnIds = SmallVec<[u8; COLUMN_ON_STACK * 4]>;
396
397fn encode_column_ids(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> EncodedColumnIds {
398 let mut encoded_column_ids = smallvec![0; column_ids.len() * 4];
400 let mut buf = &mut encoded_column_ids[..];
401 for id in column_ids {
402 buf.put_i32_le(id.get_id());
403 }
404 encoded_column_ids
405}
406
407impl Serializer {
408 pub fn new(column_ids: &[ColumnId], data_types: impl IntoIterator<Item = DataType>) -> Self {
410 Self {
411 encoded_column_ids: encode_column_ids(column_ids.iter().copied()),
412 data_types: data_types.into_iter().collect(),
413 }
414 }
415}
416
417impl Serializer<StructType> {
418 pub fn from_struct(struct_type: StructType) -> Self {
422 Self {
423 encoded_column_ids: encode_column_ids(struct_type.ids().unwrap()),
424 data_types: struct_type,
425 }
426 }
427}
428
429impl<D: DataTypes> Serializer<D> {
430 fn datum_num(&self) -> usize {
431 self.encoded_column_ids.len() / 4
432 }
433
434 fn serialize_raw<T: Encode>(&self, datums: impl IntoIterator<Item = T>) -> Vec<u8> {
435 let mut encoding = RowEncoding::new();
436 encoding.encode(datums, self.data_types.iter());
437 self.finish(encoding)
438 }
439
440 fn finish(&self, encoding: RowEncoding) -> Vec<u8> {
441 let mut row_bytes = Vec::with_capacity(
442 5 + self.encoded_column_ids.len() + encoding.offsets.len() + encoding.data.len(), );
444 row_bytes.put_u8(encoding.header.into_bits());
445 row_bytes.put_u32_le(self.datum_num() as u32);
446 row_bytes.extend(&self.encoded_column_ids);
447 row_bytes.extend(&encoding.offsets);
448 row_bytes.extend(&encoding.data);
449
450 row_bytes
451 }
452}
453
454impl<D: DataTypes> ValueRowSerializer for Serializer<D> {
455 fn serialize(&self, row: impl Row) -> Vec<u8> {
457 assert_eq!(row.len(), self.datum_num());
458 self.serialize_raw(row.iter())
459 }
460}
461
462#[derive(Clone)]
466struct EncodedBytes<'a> {
467 header: Header,
468
469 column_ids: &'a [u8],
472 offsets: &'a [u8],
473 data: &'a [u8],
474}
475
476impl<'a> EncodedBytes<'a> {
477 fn new(mut encoded_bytes: &'a [u8]) -> Result<Self> {
478 let header = Header::from_bits(encoded_bytes.get_u8());
479 if !header.magic() {
480 return Err(ValueEncodingError::InvalidFlag(header.into_bits()));
481 }
482 let offset_bytes = header.offset().width();
483
484 let datum_num = encoded_bytes.get_u32_le() as usize;
485 let offsets_start_idx = 4 * datum_num;
486 let data_start_idx = offsets_start_idx + datum_num * offset_bytes;
487
488 Ok(EncodedBytes {
489 header,
490 column_ids: &encoded_bytes[..offsets_start_idx],
491 offsets: &encoded_bytes[offsets_start_idx..data_start_idx],
492 data: &encoded_bytes[data_start_idx..],
493 })
494 }
495}
496
497impl<'a> Iterator for EncodedBytes<'a> {
498 type Item = (i32, &'a [u8]);
499
500 fn next(&mut self) -> Option<Self::Item> {
501 if self.column_ids.is_empty() {
502 assert!(self.offsets.is_empty());
503 return None;
504 }
505
506 let id = self.column_ids.get_i32_le();
507
508 let offset_width = self.header.offset().width();
509 let get_offset = |offsets: &mut &[u8]| offsets.get_uint_le(offset_width) as usize;
510
511 let this_offset = get_offset(&mut self.offsets);
512 let next_offset = if self.offsets.is_empty() {
513 self.data.len()
514 } else {
515 let mut peek_offsets = self.offsets; get_offset(&mut peek_offsets)
517 };
518
519 let data = &self.data[this_offset..next_offset];
520
521 Some((id, data))
522 }
523}
524
525#[derive(Clone)]
528pub struct Deserializer<D: DataTypes = Arc<[DataType]>> {
529 mapping: ColumnMapping,
530 data_types: D,
531
532 default_row: Option<Vec<Datum>>,
536}
537
538#[derive(Clone)]
540enum ColumnMapping {
541 Small(SmallVec<[i32; COLUMN_ON_STACK]>),
543 Large(HashMap<i32, usize>),
545}
546
547impl ColumnMapping {
548 fn new(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> Self {
549 if column_ids.len() <= COLUMN_ON_STACK {
550 Self::Small(column_ids.map(|c| c.get_id()).collect())
551 } else {
552 Self::Large(
553 column_ids
554 .enumerate()
555 .map(|(i, c)| (c.get_id(), i))
556 .collect(),
557 )
558 }
559 }
560
561 fn get(&self, id: i32) -> Option<usize> {
562 match self {
563 ColumnMapping::Small(vec) => vec.iter().position(|&x| x == id),
564 ColumnMapping::Large(map) => map.get(&id).copied(),
565 }
566 }
567}
568
569impl Deserializer {
570 pub fn new(
571 column_ids: &[ColumnId],
572 schema: Arc<[DataType]>,
573 column_with_default: impl Iterator<Item = (usize, Datum)>,
574 ) -> Self {
575 assert_eq!(column_ids.len(), schema.len());
576 let mut default_row: Vec<Datum> = vec![None; schema.len()];
577 for (i, datum) in column_with_default {
578 default_row[i] = datum;
579 }
580 Self {
581 mapping: ColumnMapping::new(column_ids.iter().copied()),
582 data_types: schema,
583 default_row: Some(default_row),
584 }
585 }
586}
587
588impl Deserializer<StructType> {
589 pub fn from_struct(struct_type: StructType) -> Self {
593 Self {
594 mapping: ColumnMapping::new(struct_type.ids().unwrap()),
595 data_types: struct_type,
596 default_row: None,
597 }
598 }
599}
600
601impl<D: DataTypes> ValueRowDeserializer for Deserializer<D> {
602 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
603 let encoded_bytes = EncodedBytes::new(encoded_bytes)?;
604
605 let mut row =
606 (self.default_row.clone()).unwrap_or_else(|| vec![None; self.data_types.iter().len()]);
607
608 for (id, mut data) in encoded_bytes {
609 let Some(decoded_idx) = self.mapping.get(id) else {
610 continue;
611 };
612 let data_type = self.data_types.at(decoded_idx);
613
614 let datum = if data.is_empty() {
615 None
616 } else if data_type.can_alter() == Some(true) {
617 Some(new_serde::new_deserialize_scalar(data_type, &mut data)?)
618 } else {
619 Some(plain::deserialize_value(data_type, &mut data)?)
620 };
621
622 row[decoded_idx] = datum;
623 }
624
625 Ok(row)
626 }
627}
628
629#[derive(Clone)]
632pub struct ColumnAwareSerde {
633 pub serializer: Serializer,
634 pub deserializer: Deserializer,
635}
636
637impl ValueRowSerializer for ColumnAwareSerde {
638 fn serialize(&self, row: impl Row) -> Vec<u8> {
639 self.serializer.serialize(row)
640 }
641}
642
643impl ValueRowDeserializer for ColumnAwareSerde {
644 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
645 self.deserializer.deserialize(encoded_bytes)
646 }
647}
648
649pub fn try_drop_invalid_columns(
653 encoded_bytes: &[u8],
654 valid_column_ids: &HashSet<i32>,
655) -> Option<Vec<u8>> {
656 let encoded_bytes = EncodedBytes::new(encoded_bytes).ok()?;
657
658 let has_invalid_column = (encoded_bytes.clone()).any(|(id, _)| !valid_column_ids.contains(&id));
659 if !has_invalid_column {
660 return None;
661 }
662
663 let mut datums = Vec::with_capacity(valid_column_ids.len());
666 let mut column_ids = Vec::with_capacity(valid_column_ids.len());
667
668 for (id, data) in encoded_bytes {
669 if valid_column_ids.contains(&id) {
670 column_ids.push(ColumnId::new(id));
671 datums.push(if data.is_empty() { None } else { Some(data) });
672 }
673 }
674
675 let dummy_data_types = vec![DataType::Boolean; column_ids.len()];
678
679 let row_bytes = Serializer::new(&column_ids, dummy_data_types).serialize_raw(datums);
680 Some(row_bytes)
681}