risingwave_hummock_trace/
read.rsuse std::io::Read;
use bincode::{config, decode_from_std_read};
use byteorder::{BigEndian, ReadBytesExt};
#[cfg(test)]
use mockall::automock;
use crate::error::{Result, TraceError};
use crate::{Record, MAGIC_BYTES};
#[cfg_attr(test, automock)]
pub trait TraceReader {
fn read(&mut self) -> Result<Record>;
fn read_n(&mut self, n: usize) -> Result<Vec<Record>> {
let mut ops = Vec::with_capacity(n);
for _ in 0..n {
let op = self.read()?;
ops.push(op);
}
Ok(ops)
}
}
#[cfg_attr(test, automock)]
pub trait Deserializer<R: Read> {
fn deserialize(&self, reader: &mut R) -> Result<Record>;
}
pub struct BincodeDeserializer;
impl<R: Read> Deserializer<R> for BincodeDeserializer {
fn deserialize(&self, reader: &mut R) -> Result<Record> {
let record = decode_from_std_read(reader, config::standard())?;
Ok(record)
}
}
pub struct TraceReaderImpl<R: Read, D: Deserializer<R>> {
reader: R,
deserializer: D,
}
impl<R: Read, D: Deserializer<R>> TraceReaderImpl<R, D> {
pub fn try_new(mut reader: R, deserializer: D) -> Result<Self> {
let magic_bytes = reader.read_u32::<BigEndian>()?;
if magic_bytes != MAGIC_BYTES {
Err(TraceError::MagicBytes {
expected: MAGIC_BYTES,
found: magic_bytes,
})
} else {
Ok(Self {
reader,
deserializer,
})
}
}
}
impl<R: Read> TraceReaderImpl<R, BincodeDeserializer> {
pub fn new_bincode(reader: R) -> Result<Self> {
let deserializer = BincodeDeserializer {};
Self::try_new(reader, deserializer)
}
}
impl<R: Read, D: Deserializer<R>> TraceReader for TraceReaderImpl<R, D> {
fn read(&mut self) -> Result<Record> {
self.deserializer.deserialize(&mut self.reader)
}
}
#[cfg(test)]
mod test {
use std::io::{Cursor, Read, Result, Write};
use std::mem::size_of;
use bincode::config::{self};
use bincode::encode_to_vec;
use bytes::Bytes;
use mockall::mock;
use risingwave_pb::common::Status;
use risingwave_pb::meta::SubscribeResponse;
use super::{TraceReader, TraceReaderImpl};
use crate::{
BincodeDeserializer, Deserializer, MockDeserializer, Operation, Record, TracedReadOptions,
TracedSubResp, MAGIC_BYTES,
};
mock! {
Reader{}
impl Read for Reader{
fn read(&mut self, buf: &mut [u8]) -> Result<usize>;
}
}
#[test]
fn test_bincode_deserialize() {
let deserializer = BincodeDeserializer {};
let op = Operation::get(
Bytes::from(vec![5, 5, 15, 6]),
Some(7564),
TracedReadOptions::for_test(0),
);
let expected = Record::new_local_none(54433, op);
let mut buf = Cursor::new(Vec::new());
let record_bytes = encode_to_vec(expected.clone(), config::standard()).unwrap();
let _ = buf.write(&record_bytes).unwrap();
buf.set_position(0);
let actual = deserializer.deserialize(&mut buf).unwrap();
assert_eq!(expected, actual);
}
#[test]
fn test_bincode_serialize_resp() {
let deserializer = BincodeDeserializer {};
let resp = TracedSubResp(SubscribeResponse {
status: Some(Status {
code: 0,
message: "abc".to_string(),
}),
info: None,
operation: 1,
version: 100,
});
let op = Operation::MetaMessage(Box::new(resp));
let expected = Record::new_local_none(123, op);
let mut buf = Cursor::new(Vec::new());
let record_bytes = encode_to_vec(expected.clone(), config::standard()).unwrap();
let _ = buf.write(&record_bytes).unwrap();
buf.set_position(0);
let actual = deserializer.deserialize(&mut buf).unwrap();
assert_eq!(expected, actual);
}
#[test]
fn test_bincode_deserialize_many() {
let count = 5000;
let mut buf = Cursor::new(Vec::new());
let mut records = Vec::new();
for i in 0..count {
let key = Bytes::from(format!("key{}", i).as_bytes().to_vec());
let value = Bytes::from(format!("value{}", i).as_bytes().to_vec());
let op = Operation::insert(key, value, None);
let record = Record::new_local_none(i, op);
records.push(record.clone());
let record_bytes = encode_to_vec(record.clone(), config::standard()).unwrap();
let _ = buf.write(&record_bytes).unwrap();
}
buf.set_position(0);
let deserializer = BincodeDeserializer {};
for expected in records {
let actual = deserializer.deserialize(&mut buf).unwrap();
assert_eq!(expected, actual);
}
assert!(deserializer.deserialize(&mut buf).is_err());
assert!(buf.split().1.is_empty());
}
#[test]
fn test_read_records() {
let count = 5000;
let mut mock_reader = MockReader::new();
let mut mock_deserializer = MockDeserializer::new();
mock_reader.expect_read().times(1).returning(|b| {
b.clone_from_slice(&MAGIC_BYTES.to_be_bytes());
Ok(size_of::<u32>())
});
mock_reader.expect_read().returning(|b| Ok(b.len()));
let expected = Record::new_local_none(0, Operation::Finish);
let return_expected = expected.clone();
mock_deserializer
.expect_deserialize()
.times(count)
.returning(move |_| Ok(return_expected.clone()));
let mut trace_reader = TraceReaderImpl::try_new(mock_reader, mock_deserializer).unwrap();
for _ in 0..count {
let actual = trace_reader.read().unwrap();
assert_eq!(expected, actual);
}
}
}