1use std::collections::HashSet;
23use std::sync::Arc;
24
25use ahash::HashMap;
26use bitfield_struct::bitfield;
27use bytes::{Buf, BufMut};
28use rw_iter_util::ZipEqDebug;
29use smallvec::{SmallVec, smallvec};
30
31use super::error::ValueEncodingError;
32use super::{Result, ValueRowDeserializer, ValueRowSerializer};
33use crate::catalog::ColumnId;
34use crate::row::Row;
35use crate::types::{DataType, Datum, ScalarRefImpl, StructType, ToDatumRef};
36use crate::util::value_encoding as plain;
37
38mod new_serde {
43 use super::*;
44 use crate::array::{ListRef, ListValue, MapRef, MapValue, StructRef, StructValue};
45 use crate::types::{MapType, ScalarImpl, StructType, data_types};
46
47 fn new_serialize_datum(
51 data_type: &DataType,
52 datum_ref: impl ToDatumRef,
53 buf: &mut impl BufMut,
54 ) {
55 if let Some(d) = datum_ref.to_datum_ref() {
56 buf.put_u8(1);
57 new_serialize_scalar(data_type, d, buf)
58 } else {
59 buf.put_u8(0);
60 }
61 }
62
63 fn new_serialize_struct(struct_type: &StructType, value: StructRef<'_>, buf: &mut impl BufMut) {
67 let serializer = super::Serializer::from_struct(struct_type.clone()); let bytes = serializer.serialize(value); buf.put_u32_le(bytes.len() as _);
72 buf.put_slice(&bytes);
73 }
74
75 fn new_serialize_list(inner_type: &DataType, value: ListRef<'_>, buf: &mut impl BufMut) {
77 let elems = value.iter();
78 buf.put_u32_le(elems.len() as u32);
79
80 elems.for_each(|field_value| {
81 new_serialize_datum(inner_type, field_value, buf);
82 });
83 }
84
85 fn new_serialize_map(map_type: &MapType, value: MapRef<'_>, buf: &mut impl BufMut) {
90 let elems = value.iter();
91 buf.put_u32_le(elems.len() as u32);
92
93 elems.for_each(|(k, v)| {
94 new_serialize_scalar(map_type.key(), k, buf);
95 new_serialize_datum(map_type.value(), v, buf);
96 });
97 }
98
99 pub fn new_serialize_scalar(
101 data_type: &DataType,
102 value: ScalarRefImpl<'_>,
103 buf: &mut impl BufMut,
104 ) {
105 match value {
106 ScalarRefImpl::Struct(s) => new_serialize_struct(data_type.as_struct(), s, buf),
107 ScalarRefImpl::List(l) => new_serialize_list(data_type.as_list(), l, buf),
108 ScalarRefImpl::Map(m) => new_serialize_map(data_type.as_map(), m, buf),
109
110 _ => plain::serialize_scalar(value, buf),
111 }
112 }
113
114 fn new_inner_deserialize_datum(data: &mut &[u8], ty: &DataType) -> Result<Datum> {
118 let null_tag = data.get_u8();
119 match null_tag {
120 0 => Ok(None),
121 1 => Some(new_deserialize_scalar(ty, data)).transpose(),
122 _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
123 }
124 }
125
126 fn new_deserialize_struct(struct_def: &StructType, data: &mut &[u8]) -> Result<ScalarImpl> {
130 let deserializer = super::Deserializer::from_struct(struct_def.clone()); let encoded_len = data.get_u32_le() as usize;
132
133 let (struct_data, remaining) = data.split_at(encoded_len);
134 *data = remaining;
135 let fields = deserializer.deserialize(struct_data)?;
136
137 Ok(ScalarImpl::Struct(StructValue::new(fields)))
138 }
139
140 fn new_deserialize_list(item_type: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
142 let len = data.get_u32_le();
143 let mut builder = item_type.create_array_builder(len as usize);
144 for _ in 0..len {
145 builder.append(new_inner_deserialize_datum(data, item_type)?);
146 }
147 Ok(ScalarImpl::List(ListValue::new(builder.finish())))
148 }
149
150 fn new_deserialize_map(map_type: &MapType, data: &mut &[u8]) -> Result<ScalarImpl> {
155 let len = data.get_u32_le();
156 let mut builder = map_type
157 .clone() .into_struct()
159 .create_array_builder(len as usize);
160 for _ in 0..len {
161 let key = new_deserialize_scalar(map_type.key(), data)?;
162 let value = new_inner_deserialize_datum(data, map_type.value())?;
163 let entry = StructValue::new(vec![Some(key), value]);
164 builder.append(Some(ScalarImpl::Struct(entry)));
165 }
166 Ok(ScalarImpl::Map(MapValue::from_entries(ListValue::new(
167 builder.finish(),
168 ))))
169 }
170
171 pub fn new_deserialize_scalar(ty: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
173 Ok(match ty {
174 DataType::Struct(struct_def) => new_deserialize_struct(struct_def, data)?,
175 DataType::List(item_type) => new_deserialize_list(item_type, data)?,
176 DataType::Map(map_type) => new_deserialize_map(map_type, data)?,
177
178 data_types::simple!() => plain::deserialize_value(ty, data)?,
179 })
180 }
181}
182
183const COLUMN_ON_STACK: usize = 8;
185
186#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188#[repr(u8)]
189enum OffsetWidth {
190 Offset8 = 0b01,
192 Offset16 = 0b10,
194 Offset32 = 0b11,
196}
197
198impl OffsetWidth {
199 const fn width(self) -> usize {
201 match self {
202 OffsetWidth::Offset8 => 1,
203 OffsetWidth::Offset16 => 2,
204 OffsetWidth::Offset32 => 4,
205 }
206 }
207
208 const fn into_bits(self) -> u8 {
209 self as u8
210 }
211
212 const fn from_bits(bits: u8) -> Self {
213 match bits {
214 0b01 => OffsetWidth::Offset8,
215 0b10 => OffsetWidth::Offset16,
216 0b11 => OffsetWidth::Offset32,
217 _ => panic!("invalid offset width bits"),
218 }
219 }
220}
221
222#[bitfield(u8, order = Msb)]
231#[derive(PartialEq, Eq)]
232struct Header {
233 #[bits(1, default = true, access = RO)]
237 magic: bool,
238
239 #[bits(5)]
240 _reserved: u8,
241
242 #[bits(2, default = OffsetWidth::Offset8)]
244 offset: OffsetWidth,
245}
246
247struct RowEncoding {
249 header: Header,
250 offsets: SmallVec<[u8; COLUMN_ON_STACK * 2]>,
251 data: Vec<u8>,
252}
253
254trait Encode {
256 fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>);
257}
258
259impl<T> Encode for T
260where
261 T: ToDatumRef,
262{
263 fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>) {
264 if let Some(v) = self.to_datum_ref() {
265 if data_type.can_alter() == Some(true) {
267 new_serde::new_serialize_scalar(data_type, v, data);
268 } else {
269 plain::serialize_scalar(v, data);
270 }
271 }
272 }
273}
274
275impl Encode for Option<&[u8]> {
276 fn encode_to(self, _data_type: &DataType, data: &mut Vec<u8>) {
277 if let Some(v) = self {
279 data.extend(v);
280 }
281 }
282}
283
284impl RowEncoding {
285 fn new() -> Self {
286 RowEncoding {
287 header: Header::new(),
288 offsets: Default::default(),
289 data: Default::default(),
290 }
291 }
292
293 fn set_offsets(&mut self, usize_offsets: &[usize]) {
294 debug_assert!(
295 self.offsets.is_empty(),
296 "should not set offsets multiple times"
297 );
298
299 let max_offset = usize_offsets.last().copied().unwrap_or(0);
301
302 let offset_width = match max_offset {
303 _n @ ..=const { u8::MAX as usize } => OffsetWidth::Offset8,
304 _n @ ..=const { u16::MAX as usize } => OffsetWidth::Offset16,
305 _n @ ..=const { u32::MAX as usize } => OffsetWidth::Offset32,
306 _ => panic!("encoding length {} exceeds u32", max_offset),
307 };
308 self.header.set_offset(offset_width);
309
310 self.offsets
311 .resize(usize_offsets.len() * offset_width.width(), 0);
312
313 let mut offsets_buf = &mut self.offsets[..];
314 for &offset in usize_offsets {
315 offsets_buf.put_uint_le(offset as u64, offset_width.width());
316 }
317 }
318
319 fn encode<T: Encode>(
320 &mut self,
321 datums: impl IntoIterator<Item = T>,
322 data_types: impl IntoIterator<Item = &DataType>,
323 ) {
324 debug_assert!(
325 self.data.is_empty(),
326 "should not encode one RowEncoding object multiple times."
327 );
328 let datums = datums.into_iter();
329 let mut offset_usize =
330 SmallVec::<[usize; COLUMN_ON_STACK]>::with_capacity(datums.size_hint().0);
331 for (datum, data_type) in datums.zip_eq_debug(data_types) {
332 offset_usize.push(self.data.len());
333 datum.encode_to(data_type, &mut self.data);
334 }
335 self.set_offsets(&offset_usize);
336 }
337}
338
339mod data_types {
340 use crate::types::{DataType, StructType};
341
342 pub trait DataTypes: Clone {
344 fn iter(&self) -> impl ExactSizeIterator<Item = &DataType>;
345 fn at(&self, index: usize) -> &DataType;
346 }
347
348 impl<T> DataTypes for T
349 where
350 T: AsRef<[DataType]> + Clone,
351 {
352 fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
353 self.as_ref().iter()
354 }
355
356 fn at(&self, index: usize) -> &DataType {
357 &self.as_ref()[index]
358 }
359 }
360
361 impl DataTypes for StructType {
362 fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
363 self.types()
364 }
365
366 fn at(&self, index: usize) -> &DataType {
367 self.type_at(index)
368 }
369 }
370}
371use data_types::DataTypes;
372
373#[derive(Clone)]
376pub struct Serializer<D: DataTypes = Vec<DataType>> {
377 encoded_column_ids: EncodedColumnIds,
378 data_types: D,
379}
380
381type EncodedColumnIds = SmallVec<[u8; COLUMN_ON_STACK * 4]>;
382
383fn encode_column_ids(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> EncodedColumnIds {
384 let mut encoded_column_ids = smallvec![0; column_ids.len() * 4];
386 let mut buf = &mut encoded_column_ids[..];
387 for id in column_ids {
388 buf.put_i32_le(id.get_id());
389 }
390 encoded_column_ids
391}
392
393impl Serializer {
394 pub fn new(column_ids: &[ColumnId], data_types: impl IntoIterator<Item = DataType>) -> Self {
396 Self {
397 encoded_column_ids: encode_column_ids(column_ids.iter().copied()),
398 data_types: data_types.into_iter().collect(),
399 }
400 }
401}
402
403impl Serializer<StructType> {
404 pub fn from_struct(struct_type: StructType) -> Self {
408 Self {
409 encoded_column_ids: encode_column_ids(struct_type.ids().unwrap()),
410 data_types: struct_type,
411 }
412 }
413}
414
415impl<D: DataTypes> Serializer<D> {
416 fn datum_num(&self) -> usize {
417 self.encoded_column_ids.len() / 4
418 }
419
420 fn serialize_raw<T: Encode>(&self, datums: impl IntoIterator<Item = T>) -> Vec<u8> {
421 let mut encoding = RowEncoding::new();
422 encoding.encode(datums, self.data_types.iter());
423 self.finish(encoding)
424 }
425
426 fn finish(&self, encoding: RowEncoding) -> Vec<u8> {
427 let mut row_bytes = Vec::with_capacity(
428 5 + self.encoded_column_ids.len() + encoding.offsets.len() + encoding.data.len(), );
430 row_bytes.put_u8(encoding.header.into_bits());
431 row_bytes.put_u32_le(self.datum_num() as u32);
432 row_bytes.extend(&self.encoded_column_ids);
433 row_bytes.extend(&encoding.offsets);
434 row_bytes.extend(&encoding.data);
435
436 row_bytes
437 }
438}
439
440impl<D: DataTypes> ValueRowSerializer for Serializer<D> {
441 fn serialize(&self, row: impl Row) -> Vec<u8> {
443 assert_eq!(row.len(), self.datum_num());
444 self.serialize_raw(row.iter())
445 }
446}
447
448#[derive(Clone)]
452struct EncodedBytes<'a> {
453 header: Header,
454
455 column_ids: &'a [u8],
458 offsets: &'a [u8],
459 data: &'a [u8],
460}
461
462impl<'a> EncodedBytes<'a> {
463 fn new(mut encoded_bytes: &'a [u8]) -> Result<Self> {
464 let header = Header::from_bits(encoded_bytes.get_u8());
465 if !header.magic() {
466 return Err(ValueEncodingError::InvalidFlag(header.into_bits()));
467 }
468 let offset_bytes = header.offset().width();
469
470 let datum_num = encoded_bytes.get_u32_le() as usize;
471 let offsets_start_idx = 4 * datum_num;
472 let data_start_idx = offsets_start_idx + datum_num * offset_bytes;
473
474 Ok(EncodedBytes {
475 header,
476 column_ids: &encoded_bytes[..offsets_start_idx],
477 offsets: &encoded_bytes[offsets_start_idx..data_start_idx],
478 data: &encoded_bytes[data_start_idx..],
479 })
480 }
481}
482
483impl<'a> Iterator for EncodedBytes<'a> {
484 type Item = (i32, &'a [u8]);
485
486 fn next(&mut self) -> Option<Self::Item> {
487 if self.column_ids.is_empty() {
488 assert!(self.offsets.is_empty());
489 return None;
490 }
491
492 let id = self.column_ids.get_i32_le();
493
494 let offset_width = self.header.offset().width();
495 let get_offset = |offsets: &mut &[u8]| offsets.get_uint_le(offset_width) as usize;
496
497 let this_offset = get_offset(&mut self.offsets);
498 let next_offset = if self.offsets.is_empty() {
499 self.data.len()
500 } else {
501 let mut peek_offsets = self.offsets; get_offset(&mut peek_offsets)
503 };
504
505 let data = &self.data[this_offset..next_offset];
506
507 Some((id, data))
508 }
509}
510
511#[derive(Clone)]
514pub struct Deserializer<D: DataTypes = Arc<[DataType]>> {
515 mapping: ColumnMapping,
516 data_types: D,
517
518 default_row: Option<Vec<Datum>>,
522}
523
524#[derive(Clone)]
526enum ColumnMapping {
527 Small(SmallVec<[i32; COLUMN_ON_STACK]>),
529 Large(HashMap<i32, usize>),
531}
532
533impl ColumnMapping {
534 fn new(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> Self {
535 if column_ids.len() <= COLUMN_ON_STACK {
536 Self::Small(column_ids.map(|c| c.get_id()).collect())
537 } else {
538 Self::Large(
539 column_ids
540 .enumerate()
541 .map(|(i, c)| (c.get_id(), i))
542 .collect(),
543 )
544 }
545 }
546
547 fn get(&self, id: i32) -> Option<usize> {
548 match self {
549 ColumnMapping::Small(vec) => vec.iter().position(|&x| x == id),
550 ColumnMapping::Large(map) => map.get(&id).copied(),
551 }
552 }
553}
554
555impl Deserializer {
556 pub fn new(
557 column_ids: &[ColumnId],
558 schema: Arc<[DataType]>,
559 column_with_default: impl Iterator<Item = (usize, Datum)>,
560 ) -> Self {
561 assert_eq!(column_ids.len(), schema.len());
562 let mut default_row: Vec<Datum> = vec![None; schema.len()];
563 for (i, datum) in column_with_default {
564 default_row[i] = datum;
565 }
566 Self {
567 mapping: ColumnMapping::new(column_ids.iter().copied()),
568 data_types: schema,
569 default_row: Some(default_row),
570 }
571 }
572}
573
574impl Deserializer<StructType> {
575 pub fn from_struct(struct_type: StructType) -> Self {
579 Self {
580 mapping: ColumnMapping::new(struct_type.ids().unwrap()),
581 data_types: struct_type,
582 default_row: None,
583 }
584 }
585}
586
587impl<D: DataTypes> ValueRowDeserializer for Deserializer<D> {
588 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
589 let encoded_bytes = EncodedBytes::new(encoded_bytes)?;
590
591 let mut row =
592 (self.default_row.clone()).unwrap_or_else(|| vec![None; self.data_types.iter().len()]);
593
594 for (id, mut data) in encoded_bytes {
595 let Some(decoded_idx) = self.mapping.get(id) else {
596 continue;
597 };
598 let data_type = self.data_types.at(decoded_idx);
599
600 let datum = if data.is_empty() {
601 None
602 } else if data_type.can_alter() == Some(true) {
603 Some(new_serde::new_deserialize_scalar(data_type, &mut data)?)
604 } else {
605 Some(plain::deserialize_value(data_type, &mut data)?)
606 };
607
608 row[decoded_idx] = datum;
609 }
610
611 Ok(row)
612 }
613}
614
615#[derive(Clone)]
618pub struct ColumnAwareSerde {
619 pub serializer: Serializer,
620 pub deserializer: Deserializer,
621}
622
623impl ValueRowSerializer for ColumnAwareSerde {
624 fn serialize(&self, row: impl Row) -> Vec<u8> {
625 self.serializer.serialize(row)
626 }
627}
628
629impl ValueRowDeserializer for ColumnAwareSerde {
630 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
631 self.deserializer.deserialize(encoded_bytes)
632 }
633}
634
635pub fn try_drop_invalid_columns(
639 encoded_bytes: &[u8],
640 valid_column_ids: &HashSet<i32>,
641) -> Option<Vec<u8>> {
642 let encoded_bytes = EncodedBytes::new(encoded_bytes).ok()?;
643
644 let has_invalid_column = (encoded_bytes.clone()).any(|(id, _)| !valid_column_ids.contains(&id));
645 if !has_invalid_column {
646 return None;
647 }
648
649 let mut datums = Vec::with_capacity(valid_column_ids.len());
652 let mut column_ids = Vec::with_capacity(valid_column_ids.len());
653
654 for (id, data) in encoded_bytes {
655 if valid_column_ids.contains(&id) {
656 column_ids.push(ColumnId::new(id));
657 datums.push(if data.is_empty() { None } else { Some(data) });
658 }
659 }
660
661 let dummy_data_types = vec![DataType::Boolean; column_ids.len()];
664
665 let row_bytes = Serializer::new(&column_ids, dummy_data_types).serialize_raw(datums);
666 Some(row_bytes)
667}