risingwave_common/util/value_encoding/
column_aware_row_encoding.rsuse std::collections::HashSet;
use std::sync::Arc;
use ahash::HashMap;
use bitflags::bitflags;
use super::*;
use crate::catalog::ColumnId;
bitflags! {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct Flag: u8 {
const EMPTY = 0b_1000_0000;
const OFFSET8 = 0b01;
const OFFSET16 = 0b10;
const OFFSET32 = 0b11;
}
}
struct RowEncoding {
flag: Flag,
offsets: Vec<u8>,
buf: Vec<u8>,
}
impl RowEncoding {
fn new() -> Self {
RowEncoding {
flag: Flag::EMPTY,
offsets: vec![],
buf: vec![],
}
}
fn set_offsets(&mut self, usize_offsets: &[usize], max_offset: usize) {
debug_assert!(self.offsets.is_empty());
match max_offset {
_n @ ..=const { u8::MAX as usize } => {
self.flag |= Flag::OFFSET8;
usize_offsets
.iter()
.for_each(|m| self.offsets.put_u8(*m as u8));
}
_n @ ..=const { u16::MAX as usize } => {
self.flag |= Flag::OFFSET16;
usize_offsets
.iter()
.for_each(|m| self.offsets.put_u16_le(*m as u16));
}
_n @ ..=const { u32::MAX as usize } => {
self.flag |= Flag::OFFSET32;
usize_offsets
.iter()
.for_each(|m| self.offsets.put_u32_le(*m as u32));
}
_ => unreachable!("encoding length exceeds u32"),
}
}
fn encode(&mut self, datum_refs: impl Iterator<Item = impl ToDatumRef>) {
debug_assert!(
self.buf.is_empty(),
"should not encode one RowEncoding object multiple times."
);
let mut offset_usize = vec![];
for datum in datum_refs {
offset_usize.push(self.buf.len());
if let Some(v) = datum.to_datum_ref() {
serialize_scalar(v, &mut self.buf);
}
}
let max_offset = *offset_usize
.last()
.expect("should encode at least one column");
self.set_offsets(&offset_usize, max_offset);
}
fn encode_slice<'a>(&mut self, datum_refs: impl Iterator<Item = Option<&'a [u8]>>) {
debug_assert!(
self.buf.is_empty(),
"should not encode one RowEncoding object multiple times."
);
let mut offset_usize = vec![];
for datum in datum_refs {
offset_usize.push(self.buf.len());
if let Some(v) = datum {
self.buf.put_slice(v);
}
}
let max_offset = *offset_usize
.last()
.expect("should encode at least one column");
self.set_offsets(&offset_usize, max_offset);
}
}
#[derive(Clone)]
pub struct Serializer {
encoded_column_ids: Vec<u8>,
datum_num: u32,
}
impl Serializer {
pub fn new(column_ids: &[ColumnId]) -> Self {
let mut encoded_column_ids = Vec::with_capacity(column_ids.len() * 4);
for id in column_ids {
encoded_column_ids.put_i32_le(id.get_id());
}
let datum_num = column_ids.len() as u32;
Self {
encoded_column_ids,
datum_num,
}
}
fn serialize_inner(&self, encoding: RowEncoding) -> Vec<u8> {
let mut row_bytes = Vec::with_capacity(
5 + self.encoded_column_ids.len() + encoding.offsets.len() + encoding.buf.len(), );
row_bytes.put_u8(encoding.flag.bits());
row_bytes.put_u32_le(self.datum_num);
row_bytes.extend(&self.encoded_column_ids);
row_bytes.extend(&encoding.offsets);
row_bytes.extend(&encoding.buf);
row_bytes
}
}
impl ValueRowSerializer for Serializer {
fn serialize(&self, row: impl Row) -> Vec<u8> {
assert_eq!(row.len(), self.datum_num as usize);
let mut encoding = RowEncoding::new();
encoding.encode(row.iter());
self.serialize_inner(encoding)
}
}
#[derive(Clone)]
pub struct Deserializer {
required_column_ids: HashMap<i32, usize>,
schema: Arc<[DataType]>,
default_row: Vec<Datum>,
}
impl Deserializer {
pub fn new(
column_ids: &[ColumnId],
schema: Arc<[DataType]>,
column_with_default: impl Iterator<Item = (usize, Datum)>,
) -> Self {
assert_eq!(column_ids.len(), schema.len());
let mut default_row: Vec<Datum> = vec![None; schema.len()];
for (i, datum) in column_with_default {
default_row[i] = datum;
}
Self {
required_column_ids: column_ids
.iter()
.enumerate()
.map(|(i, c)| (c.get_id(), i))
.collect::<HashMap<_, _>>(),
schema,
default_row,
}
}
}
impl ValueRowDeserializer for Deserializer {
fn deserialize(&self, mut encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
let flag = Flag::from_bits(encoded_bytes.get_u8()).expect("should be a valid flag");
let offset_bytes = match flag - Flag::EMPTY {
Flag::OFFSET8 => 1,
Flag::OFFSET16 => 2,
Flag::OFFSET32 => 4,
_ => return Err(ValueEncodingError::InvalidFlag(flag.bits())),
};
let datum_num = encoded_bytes.get_u32_le() as usize;
let offsets_start_idx = 4 * datum_num;
let data_start_idx = offsets_start_idx + datum_num * offset_bytes;
let offsets = &encoded_bytes[offsets_start_idx..data_start_idx];
let data = &encoded_bytes[data_start_idx..];
let mut row = self.default_row.clone();
for i in 0..datum_num {
let this_id = encoded_bytes.get_i32_le();
if let Some(&decoded_idx) = self.required_column_ids.get(&this_id) {
let this_offset_start_idx = i * offset_bytes;
let mut this_offset_slice =
&offsets[this_offset_start_idx..(this_offset_start_idx + offset_bytes)];
let this_offset = deserialize_width(offset_bytes, &mut this_offset_slice);
let data = if i + 1 < datum_num {
let mut next_offset_slice = &offsets[(this_offset_start_idx + offset_bytes)
..(this_offset_start_idx + 2 * offset_bytes)];
let next_offset = deserialize_width(offset_bytes, &mut next_offset_slice);
if this_offset == next_offset {
None
} else {
let mut data_slice = &data[this_offset..next_offset];
Some(deserialize_value(
&self.schema[decoded_idx],
&mut data_slice,
)?)
}
} else if this_offset == data.len() {
None
} else {
let mut data_slice = &data[this_offset..];
Some(deserialize_value(
&self.schema[decoded_idx],
&mut data_slice,
)?)
};
row[decoded_idx] = data;
}
}
Ok(row)
}
}
fn deserialize_width(len: usize, data: &mut impl Buf) -> usize {
match len {
1 => data.get_u8() as usize,
2 => data.get_u16_le() as usize,
4 => data.get_u32_le() as usize,
_ => unreachable!("Width's len should be either 1, 2, or 4"),
}
}
#[derive(Clone)]
pub struct ColumnAwareSerde {
pub serializer: Serializer,
pub deserializer: Deserializer,
}
impl ValueRowSerializer for ColumnAwareSerde {
fn serialize(&self, row: impl Row) -> Vec<u8> {
self.serializer.serialize(row)
}
}
impl ValueRowDeserializer for ColumnAwareSerde {
fn deserialize(&self, encoded_bytes: &[u8]) -> Result<Vec<Datum>> {
self.deserializer.deserialize(encoded_bytes)
}
}
pub fn try_drop_invalid_columns(
mut encoded_bytes: &[u8],
valid_column_ids: &HashSet<i32>,
) -> Option<Vec<u8>> {
let flag = Flag::from_bits(encoded_bytes.get_u8()).expect("should be a valid flag");
let datum_num = encoded_bytes.get_u32_le() as usize;
let mut is_column_dropped = false;
let mut encoded_bytes_copy = encoded_bytes;
for _ in 0..datum_num {
let this_id = encoded_bytes_copy.get_i32_le();
if !valid_column_ids.contains(&this_id) {
is_column_dropped = true;
break;
}
}
if !is_column_dropped {
return None;
}
let offset_bytes = match flag - Flag::EMPTY {
Flag::OFFSET8 => 1,
Flag::OFFSET16 => 2,
Flag::OFFSET32 => 4,
_ => panic!("invalid flag {}", flag.bits()),
};
let offsets_start_idx = 4 * datum_num;
let data_start_idx = offsets_start_idx + datum_num * offset_bytes;
let offsets = &encoded_bytes[offsets_start_idx..data_start_idx];
let data = &encoded_bytes[data_start_idx..];
let mut datums: Vec<Option<&[u8]>> = Vec::with_capacity(valid_column_ids.len());
let mut column_ids = Vec::with_capacity(valid_column_ids.len());
for i in 0..datum_num {
let this_id = encoded_bytes.get_i32_le();
if valid_column_ids.contains(&this_id) {
column_ids.push(this_id);
let this_offset_start_idx = i * offset_bytes;
let mut this_offset_slice =
&offsets[this_offset_start_idx..(this_offset_start_idx + offset_bytes)];
let this_offset = deserialize_width(offset_bytes, &mut this_offset_slice);
let data = if i + 1 < datum_num {
let mut next_offset_slice = &offsets[(this_offset_start_idx + offset_bytes)
..(this_offset_start_idx + 2 * offset_bytes)];
let next_offset = deserialize_width(offset_bytes, &mut next_offset_slice);
if this_offset == next_offset {
None
} else {
let data_slice = &data[this_offset..next_offset];
Some(data_slice)
}
} else if this_offset == data.len() {
None
} else {
let data_slice = &data[this_offset..];
Some(data_slice)
};
datums.push(data);
}
}
if column_ids.is_empty() {
return None;
}
let mut encoding = RowEncoding::new();
encoding.encode_slice(datums.into_iter());
let mut encoded_column_ids = Vec::with_capacity(column_ids.len() * 4);
let datum_num = column_ids.len() as u32;
for id in column_ids {
encoded_column_ids.put_i32_le(id);
}
let mut row_bytes = Vec::with_capacity(
5 + encoded_column_ids.len() + encoding.offsets.len() + encoding.buf.len(), );
row_bytes.put_u8(encoding.flag.bits());
row_bytes.put_u32_le(datum_num);
row_bytes.extend(&encoded_column_ids);
row_bytes.extend(&encoding.offsets);
row_bytes.extend(&encoding.buf);
Some(row_bytes)
}