1use std::collections::HashSet;
23use std::sync::Arc;
24
25use ahash::HashMap;
26use bitfield_struct::bitfield;
27use bytes::{Buf, BufMut};
28use rw_iter_util::ZipEqDebug;
29use smallvec::{SmallVec, smallvec};
30
31use super::error::ValueEncodingError;
32use super::{Result, ValueRowDeserializer, ValueRowSerializer};
33use crate::catalog::ColumnId;
34use crate::row::Row;
35use crate::types::{DataType, Datum, ScalarRefImpl, StructType, ToDatumRef};
36use crate::util::value_encoding as plain;
37
38mod new_serde {
43 use super::*;
44 use crate::array::{ListRef, ListValue, MapRef, MapValue, StructRef, StructValue};
45 use crate::types::{MapType, ScalarImpl, StructType, data_types};
46
47 fn new_serialize_datum(
51 data_type: &DataType,
52 datum_ref: impl ToDatumRef,
53 buf: &mut impl BufMut,
54 ) {
55 if let Some(d) = datum_ref.to_datum_ref() {
56 buf.put_u8(1);
57 new_serialize_scalar(data_type, d, buf)
58 } else {
59 buf.put_u8(0);
60 }
61 }
62
63 fn new_serialize_struct(struct_type: &StructType, value: StructRef<'_>, buf: &mut impl BufMut) {
67 let serializer = super::Serializer::from_struct(struct_type.clone()); let bytes = serializer.serialize(value); buf.put_u32_le(bytes.len() as _);
72 buf.put_slice(&bytes);
73 }
74
75 fn new_serialize_list(inner_type: &DataType, value: ListRef<'_>, buf: &mut impl BufMut) {
77 let elems = value.iter();
78 buf.put_u32_le(elems.len() as u32);
79
80 elems.for_each(|field_value| {
81 new_serialize_datum(inner_type, field_value, buf);
82 });
83 }
84
85 fn new_serialize_map(map_type: &MapType, value: MapRef<'_>, buf: &mut impl BufMut) {
90 let elems = value.iter();
91 buf.put_u32_le(elems.len() as u32);
92
93 elems.for_each(|(k, v)| {
94 new_serialize_scalar(map_type.key(), k, buf);
95 new_serialize_datum(map_type.value(), v, buf);
96 });
97 }
98
99 pub fn new_serialize_scalar(
101 data_type: &DataType,
102 value: ScalarRefImpl<'_>,
103 buf: &mut impl BufMut,
104 ) {
105 match value {
106 ScalarRefImpl::Struct(s) => new_serialize_struct(data_type.as_struct(), s, buf),
107 ScalarRefImpl::List(l) => new_serialize_list(data_type.as_list_element_type(), l, buf),
108 ScalarRefImpl::Map(m) => new_serialize_map(data_type.as_map(), m, buf),
109
110 _ => plain::serialize_scalar(value, buf),
111 }
112 }
113
114 fn new_inner_deserialize_datum(data: &mut &[u8], ty: &DataType) -> Result<Datum> {
118 let null_tag = data.get_u8();
119 match null_tag {
120 0 => Ok(None),
121 1 => Some(new_deserialize_scalar(ty, data)).transpose(),
122 _ => Err(ValueEncodingError::InvalidTagEncoding(null_tag)),
123 }
124 }
125
126 fn new_deserialize_struct(struct_def: &StructType, data: &mut &[u8]) -> Result<ScalarImpl> {
130 let deserializer = super::Deserializer::from_struct(struct_def.clone()); let encoded_len = data.get_u32_le() as usize;
132
133 let (struct_data, remaining) = data.split_at(encoded_len);
134 *data = remaining;
135 let fields = deserializer.deserialize(struct_data)?;
136
137 Ok(ScalarImpl::Struct(StructValue::new(fields)))
138 }
139
140 fn new_deserialize_list(item_type: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
142 let len = data.get_u32_le();
143 let mut builder = item_type.create_array_builder(len as usize);
144 for _ in 0..len {
145 builder.append(new_inner_deserialize_datum(data, item_type)?);
146 }
147 Ok(ScalarImpl::List(ListValue::new(builder.finish())))
148 }
149
150 fn new_deserialize_map(map_type: &MapType, data: &mut &[u8]) -> Result<ScalarImpl> {
155 let len = data.get_u32_le();
156 let mut builder = map_type
157 .clone() .into_struct()
159 .create_array_builder(len as usize);
160 for _ in 0..len {
161 let key = new_deserialize_scalar(map_type.key(), data)?;
162 let value = new_inner_deserialize_datum(data, map_type.value())?;
163 let entry = StructValue::new(vec![Some(key), value]);
164 builder.append(Some(ScalarImpl::Struct(entry)));
165 }
166 Ok(ScalarImpl::Map(MapValue::from_entries(ListValue::new(
167 builder.finish(),
168 ))))
169 }
170
171 pub fn new_deserialize_scalar(ty: &DataType, data: &mut &[u8]) -> Result<ScalarImpl> {
173 Ok(match ty {
174 DataType::Struct(struct_def) => new_deserialize_struct(struct_def, data)?,
175 DataType::List(item_type) => new_deserialize_list(item_type, data)?,
176 DataType::Map(map_type) => new_deserialize_map(map_type, data)?,
177 DataType::Vector(_) => todo!("VECTOR_PLACEHOLDER"),
178
179 data_types::simple!() => plain::deserialize_value(ty, data)?,
180 })
181 }
182}
183
184const COLUMN_ON_STACK: usize = 8;
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189#[repr(u8)]
190enum OffsetWidth {
191 Offset8 = 0b01,
193 Offset16 = 0b10,
195 Offset32 = 0b11,
197}
198
199impl OffsetWidth {
200 const fn width(self) -> usize {
202 match self {
203 OffsetWidth::Offset8 => 1,
204 OffsetWidth::Offset16 => 2,
205 OffsetWidth::Offset32 => 4,
206 }
207 }
208
209 const fn into_bits(self) -> u8 {
210 self as u8
211 }
212
213 const fn from_bits(bits: u8) -> Self {
214 match bits {
215 0b01 => OffsetWidth::Offset8,
216 0b10 => OffsetWidth::Offset16,
217 0b11 => OffsetWidth::Offset32,
218 _ => panic!("invalid offset width bits"),
219 }
220 }
221}
222
223#[bitfield(u8, order = Msb)]
232#[derive(PartialEq, Eq)]
233struct Header {
234 #[bits(1, default = true, access = RO)]
238 magic: bool,
239
240 #[bits(5)]
241 _reserved: u8,
242
243 #[bits(2, default = OffsetWidth::Offset8)]
245 offset: OffsetWidth,
246}
247
248struct RowEncoding {
250 header: Header,
251 offsets: SmallVec<[u8; COLUMN_ON_STACK * 2]>,
252 data: Vec<u8>,
253}
254
255trait Encode {
257 fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>);
258}
259
260impl<T> Encode for T
261where
262 T: ToDatumRef,
263{
264 fn encode_to(self, data_type: &DataType, data: &mut Vec<u8>) {
265 if let Some(v) = self.to_datum_ref() {
266 if data_type.can_alter() == Some(true) {
268 new_serde::new_serialize_scalar(data_type, v, data);
269 } else {
270 plain::serialize_scalar(v, data);
271 }
272 }
273 }
274}
275
276impl Encode for Option<&[u8]> {
277 fn encode_to(self, _data_type: &DataType, data: &mut Vec<u8>) {
278 if let Some(v) = self {
280 data.extend(v);
281 }
282 }
283}
284
285impl RowEncoding {
286 fn new() -> Self {
287 RowEncoding {
288 header: Header::new(),
289 offsets: Default::default(),
290 data: Default::default(),
291 }
292 }
293
294 fn set_offsets(&mut self, usize_offsets: &[usize]) {
295 debug_assert!(
296 self.offsets.is_empty(),
297 "should not set offsets multiple times"
298 );
299
300 let max_offset = usize_offsets.last().copied().unwrap_or(0);
302
303 let offset_width = match max_offset {
304 _n @ ..=const { u8::MAX as usize } => OffsetWidth::Offset8,
305 _n @ ..=const { u16::MAX as usize } => OffsetWidth::Offset16,
306 _n @ ..=const { u32::MAX as usize } => OffsetWidth::Offset32,
307 _ => panic!("encoding length {} exceeds u32", max_offset),
308 };
309 self.header.set_offset(offset_width);
310
311 self.offsets
312 .resize(usize_offsets.len() * offset_width.width(), 0);
313
314 let mut offsets_buf = &mut self.offsets[..];
315 for &offset in usize_offsets {
316 offsets_buf.put_uint_le(offset as u64, offset_width.width());
317 }
318 }
319
320 fn encode<T: Encode>(
321 &mut self,
322 datums: impl IntoIterator<Item = T>,
323 data_types: impl IntoIterator<Item = &DataType>,
324 ) {
325 debug_assert!(
326 self.data.is_empty(),
327 "should not encode one RowEncoding object multiple times."
328 );
329 let datums = datums.into_iter();
330 let mut offset_usize =
331 SmallVec::<[usize; COLUMN_ON_STACK]>::with_capacity(datums.size_hint().0);
332 for (datum, data_type) in datums.zip_eq_debug(data_types) {
333 offset_usize.push(self.data.len());
334 datum.encode_to(data_type, &mut self.data);
335 }
336 self.set_offsets(&offset_usize);
337 }
338}
339
340mod data_types {
341 use crate::types::{DataType, StructType};
342
343 pub trait DataTypes: Clone {
345 fn iter(&self) -> impl ExactSizeIterator<Item = &DataType>;
346 fn at(&self, index: usize) -> &DataType;
347 }
348
349 impl<T> DataTypes for T
350 where
351 T: AsRef<[DataType]> + Clone,
352 {
353 fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
354 self.as_ref().iter()
355 }
356
357 fn at(&self, index: usize) -> &DataType {
358 &self.as_ref()[index]
359 }
360 }
361
362 impl DataTypes for StructType {
363 fn iter(&self) -> impl ExactSizeIterator<Item = &DataType> {
364 self.types()
365 }
366
367 fn at(&self, index: usize) -> &DataType {
368 self.type_at(index)
369 }
370 }
371}
372use data_types::DataTypes;
373
374#[derive(Clone)]
377pub struct Serializer<D: DataTypes = Vec<DataType>> {
378 encoded_column_ids: EncodedColumnIds,
379 data_types: D,
380}
381
382type EncodedColumnIds = SmallVec<[u8; COLUMN_ON_STACK * 4]>;
383
384fn encode_column_ids(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> EncodedColumnIds {
385 let mut encoded_column_ids = smallvec![0; column_ids.len() * 4];
387 let mut buf = &mut encoded_column_ids[..];
388 for id in column_ids {
389 buf.put_i32_le(id.get_id());
390 }
391 encoded_column_ids
392}
393
394impl Serializer {
395 pub fn new(column_ids: &[ColumnId], data_types: impl IntoIterator<Item = DataType>) -> Self {
397 Self {
398 encoded_column_ids: encode_column_ids(column_ids.iter().copied()),
399 data_types: data_types.into_iter().collect(),
400 }
401 }
402}
403
404impl Serializer<StructType> {
405 pub fn from_struct(struct_type: StructType) -> Self {
409 Self {
410 encoded_column_ids: encode_column_ids(struct_type.ids().unwrap()),
411 data_types: struct_type,
412 }
413 }
414}
415
416impl<D: DataTypes> Serializer<D> {
417 fn datum_num(&self) -> usize {
418 self.encoded_column_ids.len() / 4
419 }
420
421 fn serialize_raw<T: Encode>(&self, datums: impl IntoIterator<Item = T>) -> Vec<u8> {
422 let mut encoding = RowEncoding::new();
423 encoding.encode(datums, self.data_types.iter());
424 self.finish(encoding)
425 }
426
427 fn finish(&self, encoding: RowEncoding) -> Vec<u8> {
428 let mut row_bytes = Vec::with_capacity(
429 5 + self.encoded_column_ids.len() + encoding.offsets.len() + encoding.data.len(), );
431 row_bytes.put_u8(encoding.header.into_bits());
432 row_bytes.put_u32_le(self.datum_num() as u32);
433 row_bytes.extend(&self.encoded_column_ids);
434 row_bytes.extend(&encoding.offsets);
435 row_bytes.extend(&encoding.data);
436
437 row_bytes
438 }
439}
440
441impl<D: DataTypes> ValueRowSerializer for Serializer<D> {
442 fn serialize(&self, row: impl Row) -> Vec<u8> {
444 assert_eq!(row.len(), self.datum_num());
445 self.serialize_raw(row.iter())
446 }
447}
448
449#[derive(Clone)]
453struct EncodedBytes<'a> {
454 header: Header,
455
456 column_ids: &'a [u8],
459 offsets: &'a [u8],
460 data: &'a [u8],
461}
462
463impl<'a> EncodedBytes<'a> {
464 fn new(mut encoded_bytes: &'a [u8]) -> Result<Self> {
465 let header = Header::from_bits(encoded_bytes.get_u8());
466 if !header.magic() {
467 return Err(ValueEncodingError::InvalidFlag(header.into_bits()));
468 }
469 let offset_bytes = header.offset().width();
470
471 let datum_num = encoded_bytes.get_u32_le() as usize;
472 let offsets_start_idx = 4 * datum_num;
473 let data_start_idx = offsets_start_idx + datum_num * offset_bytes;
474
475 Ok(EncodedBytes {
476 header,
477 column_ids: &encoded_bytes[..offsets_start_idx],
478 offsets: &encoded_bytes[offsets_start_idx..data_start_idx],
479 data: &encoded_bytes[data_start_idx..],
480 })
481 }
482}
483
484impl<'a> Iterator for EncodedBytes<'a> {
485 type Item = (i32, &'a [u8]);
486
487 fn next(&mut self) -> Option<Self::Item> {
488 if self.column_ids.is_empty() {
489 assert!(self.offsets.is_empty());
490 return None;
491 }
492
493 let id = self.column_ids.get_i32_le();
494
495 let offset_width = self.header.offset().width();
496 let get_offset = |offsets: &mut &[u8]| offsets.get_uint_le(offset_width) as usize;
497
498 let this_offset = get_offset(&mut self.offsets);
499 let next_offset = if self.offsets.is_empty() {
500 self.data.len()
501 } else {
502 let mut peek_offsets = self.offsets; get_offset(&mut peek_offsets)
504 };
505
506 let data = &self.data[this_offset..next_offset];
507
508 Some((id, data))
509 }
510}
511
512#[derive(Clone)]
515pub struct Deserializer<D: DataTypes = Arc<[DataType]>> {
516 mapping: ColumnMapping,
517 data_types: D,
518
519 default_row: Option<Vec<Datum>>,
523}
524
525#[derive(Clone)]
527enum ColumnMapping {
528 Small(SmallVec<[i32; COLUMN_ON_STACK]>),
530 Large(HashMap<i32, usize>),
532}
533
534impl ColumnMapping {
535 fn new(column_ids: impl ExactSizeIterator<Item = ColumnId>) -> Self {
536 if column_ids.len() <= COLUMN_ON_STACK {
537 Self::Small(column_ids.map(|c| c.get_id()).collect())
538 } else {
539 Self::Large(
540 column_ids
541 .enumerate()
542 .map(|(i, c)| (c.get_id(), i))
543 .collect(),
544 )
545 }
546 }
547
548 fn get(&self, id: i32) -> Option<usize> {
549 match self {
550 ColumnMapping::Small(vec) => vec.iter().position(|&x| x == id),
551 ColumnMapping::Large(map) => map.get(&id).copied(),
552 }
553 }
554}
555
556impl Deserializer {
557 pub fn new(
558 column_ids: &[ColumnId],
559 schema: Arc<[DataType]>,
560 column_with_default: impl Iterator<Item = (usize, Datum)>,
561 ) -> Self {
562 assert_eq!(column_ids.len(), schema.len());
563 let mut default_row: Vec<Datum> = vec![None; schema.len()];
564 for (i, datum) in column_with_default {
565 default_row[i] = datum;
566 }
567 Self {
568 mapping: ColumnMapping::new(column_ids.iter().copied()),
569 data_types: schema,
570 default_row: Some(default_row),
571 }
572 }
573}
574
575impl Deserializer<StructType> {
576 pub fn from_struct(struct_type: StructType) -> Self {
580 Self {
581 mapping: ColumnMapping::new(struct_type.ids().unwrap()),
582 data_types: struct_type,
583 default_row: None,
584 }
585 }
586}
587
588impl<D: DataTypes> ValueRowDeserializer for Deserializer<D> {
589 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
590 let encoded_bytes = EncodedBytes::new(encoded_bytes)?;
591
592 let mut row =
593 (self.default_row.clone()).unwrap_or_else(|| vec![None; self.data_types.iter().len()]);
594
595 for (id, mut data) in encoded_bytes {
596 let Some(decoded_idx) = self.mapping.get(id) else {
597 continue;
598 };
599 let data_type = self.data_types.at(decoded_idx);
600
601 let datum = if data.is_empty() {
602 None
603 } else if data_type.can_alter() == Some(true) {
604 Some(new_serde::new_deserialize_scalar(data_type, &mut data)?)
605 } else {
606 Some(plain::deserialize_value(data_type, &mut data)?)
607 };
608
609 row[decoded_idx] = datum;
610 }
611
612 Ok(row)
613 }
614}
615
616#[derive(Clone)]
619pub struct ColumnAwareSerde {
620 pub serializer: Serializer,
621 pub deserializer: Deserializer,
622}
623
624impl ValueRowSerializer for ColumnAwareSerde {
625 fn serialize(&self, row: impl Row) -> Vec<u8> {
626 self.serializer.serialize(row)
627 }
628}
629
630impl ValueRowDeserializer for ColumnAwareSerde {
631 fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
632 self.deserializer.deserialize(encoded_bytes)
633 }
634}
635
636pub fn try_drop_invalid_columns(
640 encoded_bytes: &[u8],
641 valid_column_ids: &HashSet<i32>,
642) -> Option<Vec<u8>> {
643 let encoded_bytes = EncodedBytes::new(encoded_bytes).ok()?;
644
645 let has_invalid_column = (encoded_bytes.clone()).any(|(id, _)| !valid_column_ids.contains(&id));
646 if !has_invalid_column {
647 return None;
648 }
649
650 let mut datums = Vec::with_capacity(valid_column_ids.len());
653 let mut column_ids = Vec::with_capacity(valid_column_ids.len());
654
655 for (id, data) in encoded_bytes {
656 if valid_column_ids.contains(&id) {
657 column_ids.push(ColumnId::new(id));
658 datums.push(if data.is_empty() { None } else { Some(data) });
659 }
660 }
661
662 let dummy_data_types = vec![DataType::Boolean; column_ids.len()];
665
666 let row_bytes = Serializer::new(&column_ids, dummy_data_types).serialize_raw(datums);
667 Some(row_bytes)
668}