1use std::collections::HashSet;
23use std::sync::Arc;
24
25use ahash::HashMap;
26use bitfield_struct::bitfield;
27use bytes::{Buf, BufMut};
28use rw_iter_util::ZipEqDebug;
29use smallvec::{SmallVec, smallvec};
30
31use super::error::ValueEncodingError;
32use super::{Result, ValueRowDeserializer, ValueRowSerializer};
33use crate::catalog::ColumnId;
34use crate::row::Row;
35use crate::types::{DataType, Datum, ScalarRefImpl, StructType, ToDatumRef};
36use crate::util::value_encoding as plain;
37
38mod new_serde {
43 use super::*;
44 use crate::array::{ListRef, ListValue, MapRef, MapValue, StructRef, StructValue};
45 use crate::types::{MapType, ScalarImpl, StructType, data_types};
46
47 fn new_serialize_datum(
51 data_type: &DataType,
52 datum_ref: impl ToDatumRef,
53 buf: &mut impl BufMut,
54 ) {
55 if let Some(d) = datum_ref.to_datum_ref() {
56 buf.put_u8(1);
57 new_serialize_scalar(data_type, d, buf)
58 } else {
59 buf.put_u8(0);
60 }
61 }
62
63 fn new_serialize_struct(struct_type: &StructType, value: StructRef<'_>, buf: &mut impl BufMut) {
67 let serializer = super::Serializer::from_struct(struct_type.clone()); let bytes = serializer.serialize(value); buf.put_u32_le(bytes.len() as _);
72 buf.put_slice(&bytes);
73 }
74
75 fn new_serialize_list(inner_type: &DataType, value: ListRef<'_>, buf: &mut impl BufMut) {
77 let elems = value.iter();
78 buf.put_u32_le(elems.len() as u32);
79
80 elems.for_each(|field_value| {
81 new_serialize_datum(inner_type, field_value, buf);
82 });
83 }
84
85 fn new_serialize_map(map_type: &MapType, value: MapRef<'_>, buf: &mut impl BufMut) {
90 let elems = value.iter();
91 buf.put_u32_le(elems.len() as u32);
92
93 elems.for_each(|(k, v)| {
94 new_serialize_scalar(map_type.key(), k, buf);
95 new_serialize_datum(map_type.value(), v, buf);
96 });
97 }
98
99 pub fn new_serialize_scalar(
101 data_type: &DataType,
102 value: ScalarRefImpl<'_>,
103 buf: &mut impl BufMut,
104 ) {
105 match value {
106 ScalarRefImpl::Struct(s) => new_serialize_struct(data_type.as_struct(), s, buf),
107 ScalarRefImpl::List(l) => new_serialize_list(data_type.as_list_element_type(), l, buf),
108 ScalarRefImpl::Map(m) => new_serialize_map(data_type.as_map(), m, buf),
109
110 _ => plain::serialize_scalar(value, buf),
111 }
112 }
113
114 fn new_inner_deserialize_datum(data: &mut &[u8], ty: &DataType) -> Result<Datum> {
118 let null_tag = data.get_u8();
119 match null_tag {
120 0 => Ok(None),
121 1 => Some(new_deserialize_scalar(ty, data)).transpose(),
122 _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
123 }
124 }
125
126 fn new_deserialize_struct(struct_def: &StructType, data: &mut &[u8]) -> Result<ScalarImpl> {
130 let deserializer = super::Deserializer::from_struct(struct_def.clone()); let encoded_len = data.get_u32_le() as usize;
132
133 let (struct_data, remaining) = data.split_at(encoded_len);
134 *data = remaining;
135 let fields = deserializer.deserialize(struct_data)?;
136
137 Ok(ScalarImpl::Struct(StructValue::new(fields)))
138 }
139
140 fn new_deserialize_list(item_type: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
142 let len = data.get_u32_le();
143 let mut builder = item_type.create_array_builder(len as usize);
144 for _ in 0..len {
145 builder.append(new_inner_deserialize_datum(data, item_type)?);
146 }
147 Ok(ScalarImpl::List(ListValue::new(builder.finish())))
148 }
149
150 fn new_deserialize_map(map_type: &MapType, data: &mut &[u8]) -> Result<ScalarImpl> {
155 let len = data.get_u32_le();
156 let mut builder = map_type
157 .clone() .into_struct()
159 .create_array_builder(len as usize);
160 for _ in 0..len {
161 let key = new_deserialize_scalar(map_type.key(), data)?;
162 let value = new_inner_deserialize_datum(data, map_type.value())?;
163 let entry = StructValue::new(vec![Some(key), value]);
164 builder.append(Some(ScalarImpl::Struct(entry)));
165 }
166 Ok(ScalarImpl::Map(MapValue::from_entries(ListValue::new(
167 builder.finish(),
168 ))))
169 }
170
171 pub fn new_deserialize_scalar(ty: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
173 Ok(match ty {
174 DataType::Struct(struct_def) => new_deserialize_struct(struct_def, data)?,
175 DataType::List(item_type) => new_deserialize_list(item_type, data)?,
176 DataType::Map(map_type) => new_deserialize_map(map_type, data)?,
177 DataType::Vector(_) => plain::deserialize_value(ty, data)?,
178
179 data_types::simple!() => plain::deserialize_value(ty, data)?,
180 })
181 }
182}
183
184const COLUMN_ON_STACK: usize = 8;
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189#[repr(u8)]
190enum OffsetWidth {
191 Offset8 = 0b01,
193 Offset16 = 0b10,
195 Offset32 = 0b11,
197}
198
199impl OffsetWidth {
200 const fn width(self) -> usize {
202 match self {
203 OffsetWidth::Offset8 => 1,
204 OffsetWidth::Offset16 => 2,
205 OffsetWidth::Offset32 => 4,
206 }
207 }
208
209 const fn into_bits(self) -> u8 {
210 self as u8
211 }
212
213 const fn from_bits(bits: u8) -> Self {
214 match bits {
215 0b01 => OffsetWidth::Offset8,
216 0b10 => OffsetWidth::Offset16,
217 0b11 => OffsetWidth::Offset32,
218 _ => panic!("invalid offset width bits"),
219 }
220 }
221}
222
223#[bitfield(u8, order = Msb)]
232#[derive(PartialEq, Eq)]
233struct Header {
234 #[bits(1, default = true, access = RO)]
238 magic: bool,
239
240 #[bits(5)]
241 _reserved: u8,
242
243 #[bits(2, default = OffsetWidth::Offset8)]
245 offset: OffsetWidth,
246}
247
248struct RowEncoding {
250 header: Header,
251 offsets: SmallVec<[u8; COLUMN_ON_STACK * 2]>,
252 data: Vec<u8>,
253}
254
255trait Encode {
257 fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>);
258}
259
260impl<T> Encode for T
261where
262 T: ToDatumRef,
263{
264 fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>) {
265 if let Some(v) = self.to_datum_ref() {
266 if data_type.can_alter() == Some(true) {
268 new_serde::new_serialize_scalar(data_type, v, data);
269 } else {
270 plain::serialize_scalar(v, data);
271 }
272 }
273 }
274}
275
276impl Encode for Option<&[u8]> {
277 fn encode_to(self, _data_type: &DataType, data: &mut Vec<u8>) {
278 if let Some(v) = self {
280 data.extend(v);
281 }
282 }
283}
284
285impl RowEncoding {
286 fn new() -> Self {
287 RowEncoding {
288 header: Header::new(),
289 offsets: Default::default(),
290 data: Default::default(),
291 }
292 }
293
294 fn set_offsets(&mut self, usize_offsets: &[usize]) {
295 debug_assert!(
296 self.offsets.is_empty(),
297 "should not set offsets multiple times"
298 );
299
300 let max_offset = usize_offsets.last().copied().unwrap_or(0);
302
303 const U8_MAX: usize = u8::MAX as usize;
304 const U16_MAX: usize = u16::MAX as usize;
305 const U32_MAX: usize = u32::MAX as usize;
306
307 let offset_width = match max_offset {
308 _n @ ..=U8_MAX => OffsetWidth::Offset8,
309 _n @ ..=U16_MAX => OffsetWidth::Offset16,
310 _n @ ..=U32_MAX => OffsetWidth::Offset32,
311 _ => panic!("encoding length {} exceeds u32", max_offset),
312 };
313 self.header.set_offset(offset_width);
314
315 self.offsets
316 .resize(usize_offsets.len() * offset_width.width(), 0);
317
318 let mut offsets_buf = &mut self.offsets[..];
319 for &offset in usize_offsets {
320 offsets_buf.put_uint_le(offset as u64, offset_width.width());
321 }
322 }
323
324 fn encode<T: Encode>(
325 &mut self,
326 datums: impl IntoIterator<Item = T>,
327 data_types: impl IntoIterator<Item = &DataType>,
328 ) {
329 debug_assert!(
330 self.data.is_empty(),
331 "should not encode one RowEncoding object multiple times."
332 );
333 let datums = datums.into_iter();
334 let mut offset_usize =
335 SmallVec::<[usize; COLUMN_ON_STACK]>::with_capacity(datums.size_hint().0);
336 for (datum, data_type) in datums.zip_eq_debug(data_types) {
337 offset_usize.push(self.data.len());
338 datum.encode_to(data_type, &mut self.data);
339 }
340 self.set_offsets(&offset_usize);
341 }
342}
343
344mod data_types {
345 use crate::types::{DataType, StructType};
346
347 pub trait DataTypes: Clone {
349 fn iter(&self) -> impl ExactSizeIterator<Item = &DataType>;
350 fn at(&self, index: usize) -> &DataType;
351 }
352
353 impl<T> DataTypes for T
354 where
355 T: AsRef<[DataType]> + Clone,
356 {
357 fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
358 self.as_ref().iter()
359 }
360
361 fn at(&self, index: usize) -> &DataType {
362 &self.as_ref()[index]
363 }
364 }
365
366 impl DataTypes for StructType {
367 fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
368 self.types()
369 }
370
371 fn at(&self, index: usize) -> &DataType {
372 self.type_at(index)
373 }
374 }
375}
376use data_types::DataTypes;
377
378#[derive(Clone)]
381pub struct Serializer<D: DataTypes = Vec<DataType>> {
382 encoded_column_ids: EncodedColumnIds,
383 data_types: D,
384}
385
386type EncodedColumnIds = SmallVec<[u8; COLUMN_ON_STACK * 4]>;
387
388fn encode_column_ids(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> EncodedColumnIds {
389 let mut encoded_column_ids = smallvec![0; column_ids.len() * 4];
391 let mut buf = &mut encoded_column_ids[..];
392 for id in column_ids {
393 buf.put_i32_le(id.get_id());
394 }
395 encoded_column_ids
396}
397
398impl Serializer {
399 pub fn new(column_ids: &[ColumnId], data_types: impl IntoIterator<Item = DataType>) -> Self {
401 Self {
402 encoded_column_ids: encode_column_ids(column_ids.iter().copied()),
403 data_types: data_types.into_iter().collect(),
404 }
405 }
406}
407
408impl Serializer<StructType> {
409 pub fn from_struct(struct_type: StructType) -> Self {
413 Self {
414 encoded_column_ids: encode_column_ids(struct_type.ids().unwrap()),
415 data_types: struct_type,
416 }
417 }
418}
419
420impl<D: DataTypes> Serializer<D> {
421 fn datum_num(&self) -> usize {
422 self.encoded_column_ids.len() / 4
423 }
424
425 fn serialize_raw<T: Encode>(&self, datums: impl IntoIterator<Item = T>) -> Vec<u8> {
426 let mut encoding = RowEncoding::new();
427 encoding.encode(datums, self.data_types.iter());
428 self.finish(encoding)
429 }
430
431 fn finish(&self, encoding: RowEncoding) -> Vec<u8> {
432 let mut row_bytes = Vec::with_capacity(
433 5 + self.encoded_column_ids.len() + encoding.offsets.len() + encoding.data.len(), );
435 row_bytes.put_u8(encoding.header.into_bits());
436 row_bytes.put_u32_le(self.datum_num() as u32);
437 row_bytes.extend(&self.encoded_column_ids);
438 row_bytes.extend(&encoding.offsets);
439 row_bytes.extend(&encoding.data);
440
441 row_bytes
442 }
443}
444
445impl<D: DataTypes> ValueRowSerializer for Serializer<D> {
446 fn serialize(&self, row: impl Row) -> Vec<u8> {
448 assert_eq!(row.len(), self.datum_num());
449 self.serialize_raw(row.iter())
450 }
451}
452
453#[derive(Clone)]
457struct EncodedBytes<'a> {
458 header: Header,
459
460 column_ids: &'a [u8],
463 offsets: &'a [u8],
464 data: &'a [u8],
465}
466
467impl<'a> EncodedBytes<'a> {
468 fn new(mut encoded_bytes: &'a [u8]) -> Result<Self> {
469 let header = Header::from_bits(encoded_bytes.get_u8());
470 if !header.magic() {
471 return Err(ValueEncodingError::InvalidFlag(header.into_bits()));
472 }
473 let offset_bytes = header.offset().width();
474
475 let datum_num = encoded_bytes.get_u32_le() as usize;
476 let offsets_start_idx = 4 * datum_num;
477 let data_start_idx = offsets_start_idx + datum_num * offset_bytes;
478
479 Ok(EncodedBytes {
480 header,
481 column_ids: &encoded_bytes[..offsets_start_idx],
482 offsets: &encoded_bytes[offsets_start_idx..data_start_idx],
483 data: &encoded_bytes[data_start_idx..],
484 })
485 }
486}
487
488impl<'a> Iterator for EncodedBytes<'a> {
489 type Item = (i32, &'a [u8]);
490
491 fn next(&mut self) -> Option<Self::Item> {
492 if self.column_ids.is_empty() {
493 assert!(self.offsets.is_empty());
494 return None;
495 }
496
497 let id = self.column_ids.get_i32_le();
498
499 let offset_width = self.header.offset().width();
500 let get_offset = |offsets: &mut &[u8]| offsets.get_uint_le(offset_width) as usize;
501
502 let this_offset = get_offset(&mut self.offsets);
503 let next_offset = if self.offsets.is_empty() {
504 self.data.len()
505 } else {
506 let mut peek_offsets = self.offsets; get_offset(&mut peek_offsets)
508 };
509
510 let data = &self.data[this_offset..next_offset];
511
512 Some((id, data))
513 }
514}
515
516#[derive(Clone)]
519pub struct Deserializer<D: DataTypes = Arc<[DataType]>> {
520 mapping: ColumnMapping,
521 data_types: D,
522
523 default_row: Option<Vec<Datum>>,
527}
528
529#[derive(Clone)]
531enum ColumnMapping {
532 Small(SmallVec<[i32; COLUMN_ON_STACK]>),
534 Large(HashMap<i32, usize>),
536}
537
538impl ColumnMapping {
539 fn new(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> Self {
540 if column_ids.len() <= COLUMN_ON_STACK {
541 Self::Small(column_ids.map(|c| c.get_id()).collect())
542 } else {
543 Self::Large(
544 column_ids
545 .enumerate()
546 .map(|(i, c)| (c.get_id(), i))
547 .collect(),
548 )
549 }
550 }
551
552 fn get(&self, id: i32) -> Option<usize> {
553 match self {
554 ColumnMapping::Small(vec) => vec.iter().position(|&x| x == id),
555 ColumnMapping::Large(map) => map.get(&id).copied(),
556 }
557 }
558}
559
560impl Deserializer {
561 pub fn new(
562 column_ids: &[ColumnId],
563 schema: Arc<[DataType]>,
564 column_with_default: impl Iterator<Item = (usize, Datum)>,
565 ) -> Self {
566 assert_eq!(column_ids.len(), schema.len());
567 let mut default_row: Vec<Datum> = vec![None; schema.len()];
568 for (i, datum) in column_with_default {
569 default_row[i] = datum;
570 }
571 Self {
572 mapping: ColumnMapping::new(column_ids.iter().copied()),
573 data_types: schema,
574 default_row: Some(default_row),
575 }
576 }
577}
578
579impl Deserializer<StructType> {
580 pub fn from_struct(struct_type: StructType) -> Self {
584 Self {
585 mapping: ColumnMapping::new(struct_type.ids().unwrap()),
586 data_types: struct_type,
587 default_row: None,
588 }
589 }
590}
591
592impl<D: DataTypes> ValueRowDeserializer for Deserializer<D> {
593 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
594 let encoded_bytes = EncodedBytes::new(encoded_bytes)?;
595
596 let mut row =
597 (self.default_row.clone()).unwrap_or_else(|| vec![None; self.data_types.iter().len()]);
598
599 for (id, mut data) in encoded_bytes {
600 let Some(decoded_idx) = self.mapping.get(id) else {
601 continue;
602 };
603 let data_type = self.data_types.at(decoded_idx);
604
605 let datum = if data.is_empty() {
606 None
607 } else if data_type.can_alter() == Some(true) {
608 Some(new_serde::new_deserialize_scalar(data_type, &mut data)?)
609 } else {
610 Some(plain::deserialize_value(data_type, &mut data)?)
611 };
612
613 row[decoded_idx] = datum;
614 }
615
616 Ok(row)
617 }
618}
619
620#[derive(Clone)]
623pub struct ColumnAwareSerde {
624 pub serializer: Serializer,
625 pub deserializer: Deserializer,
626}
627
628impl ValueRowSerializer for ColumnAwareSerde {
629 fn serialize(&self, row: impl Row) -> Vec<u8> {
630 self.serializer.serialize(row)
631 }
632}
633
634impl ValueRowDeserializer for ColumnAwareSerde {
635 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
636 self.deserializer.deserialize(encoded_bytes)
637 }
638}
639
640pub fn try_drop_invalid_columns(
644 encoded_bytes: &[u8],
645 valid_column_ids: &HashSet<i32>,
646) -> Option<Vec<u8>> {
647 let encoded_bytes = EncodedBytes::new(encoded_bytes).ok()?;
648
649 let has_invalid_column = (encoded_bytes.clone()).any(|(id, _)| !valid_column_ids.contains(&id));
650 if !has_invalid_column {
651 return None;
652 }
653
654 let mut datums = Vec::with_capacity(valid_column_ids.len());
657 let mut column_ids = Vec::with_capacity(valid_column_ids.len());
658
659 for (id, data) in encoded_bytes {
660 if valid_column_ids.contains(&id) {
661 column_ids.push(ColumnId::new(id));
662 datums.push(if data.is_empty() { None } else { Some(data) });
663 }
664 }
665
666 let dummy_data_types = vec![DataType::Boolean; column_ids.len()];
669
670 let row_bytes = Serializer::new(&column_ids, dummy_data_types).serialize_raw(datums);
671 Some(row_bytes)
672}