risingwave_hummock_trace/
read.rs1use std::io::Read;
16
17use bincode::{config, decode_from_std_read};
18use byteorder::{BigEndian, ReadBytesExt};
19#[cfg(test)]
20use mockall::automock;
21
22use crate::error::{Result, TraceError};
23use crate::{MAGIC_BYTES, Record};
24#[cfg_attr(test, automock)]
25pub trait TraceReader {
26 fn read(&mut self) -> Result<Record>;
27 fn read_n(&mut self, n: usize) -> Result<Vec<Record>> {
28 let mut ops = Vec::with_capacity(n);
29 for _ in 0..n {
30 let op = self.read()?;
31 ops.push(op);
32 }
33 Ok(ops)
34 }
35}
36#[cfg_attr(test, automock)]
38pub trait Deserializer<R: Read> {
39 fn deserialize(&self, reader: &mut R) -> Result<Record>;
41}
42
43pub struct BincodeDeserializer;
45
46impl<R: Read> Deserializer<R> for BincodeDeserializer {
47 fn deserialize(&self, reader: &mut R) -> Result<Record> {
48 let record = decode_from_std_read(reader, config::standard())?;
49 Ok(record)
50 }
51}
52
53pub struct TraceReaderImpl<R: Read, D: Deserializer<R>> {
54 reader: R,
55 deserializer: D,
56}
57
58impl<R: Read, D: Deserializer<R>> TraceReaderImpl<R, D> {
59 pub fn try_new(mut reader: R, deserializer: D) -> Result<Self> {
60 let magic_bytes = reader.read_u32::<BigEndian>()?;
62
63 if magic_bytes != MAGIC_BYTES {
65 Err(TraceError::MagicBytes {
66 expected: MAGIC_BYTES,
67 found: magic_bytes,
68 })
69 } else {
70 Ok(Self {
72 reader,
73 deserializer,
74 })
75 }
76 }
77}
78
79impl<R: Read> TraceReaderImpl<R, BincodeDeserializer> {
80 pub fn new_bincode(reader: R) -> Result<Self> {
81 let deserializer = BincodeDeserializer {};
82 Self::try_new(reader, deserializer)
83 }
84}
85
86impl<R: Read, D: Deserializer<R>> TraceReader for TraceReaderImpl<R, D> {
87 fn read(&mut self) -> Result<Record> {
88 self.deserializer.deserialize(&mut self.reader)
89 }
90}
91
92#[cfg(test)]
93mod test {
94 use std::io::{Cursor, Read, Result, Write};
95 use std::mem::size_of;
96
97 use bincode::config::{self};
98 use bincode::encode_to_vec;
99 use bytes::Bytes;
100 use mockall::mock;
101 use risingwave_pb::common::Status;
102 use risingwave_pb::meta::SubscribeResponse;
103
104 use super::{TraceReader, TraceReaderImpl};
105 use crate::{
106 BincodeDeserializer, Deserializer, MAGIC_BYTES, MockDeserializer, Operation, Record,
107 TracedReadOptions, TracedSubResp,
108 };
109
110 mock! {
111 Reader{}
112 impl Read for Reader{
113 fn read(&mut self, buf: &mut [u8]) -> Result<usize>;
114 }
115 }
116
117 #[test]
118 fn test_bincode_deserialize() {
119 let deserializer = BincodeDeserializer {};
120 let op = Operation::get(
121 Bytes::from(vec![5, 5, 15, 6]),
122 Some(7564),
123 TracedReadOptions::for_test(0),
124 );
125 let expected = Record::new_local_none(54433, op);
126
127 let mut buf = Cursor::new(Vec::new());
128
129 let record_bytes = encode_to_vec(expected.clone(), config::standard()).unwrap();
130 let _ = buf.write(&record_bytes).unwrap();
131
132 buf.set_position(0);
133
134 let actual = deserializer.deserialize(&mut buf).unwrap();
135
136 assert_eq!(expected, actual);
137 }
138 #[test]
139 fn test_bincode_serialize_resp() {
140 let deserializer = BincodeDeserializer {};
141 let resp = TracedSubResp(SubscribeResponse {
142 status: Some(Status {
143 code: 0,
144 message: "abc".to_owned(),
145 }),
146 info: None,
147 operation: 1,
148 version: 100,
149 });
150 let op = Operation::MetaMessage(Box::new(resp));
151 let expected = Record::new_local_none(123, op);
152
153 let mut buf = Cursor::new(Vec::new());
154
155 let record_bytes = encode_to_vec(expected.clone(), config::standard()).unwrap();
156 let _ = buf.write(&record_bytes).unwrap();
157 buf.set_position(0);
158
159 let actual = deserializer.deserialize(&mut buf).unwrap();
160
161 assert_eq!(expected, actual);
162 }
163 #[test]
164 fn test_bincode_deserialize_many() {
165 let count = 5000;
166 let mut buf = Cursor::new(Vec::new());
167 let mut records = Vec::new();
168
169 for i in 0..count {
170 let key = Bytes::from(format!("key{}", i).as_bytes().to_vec());
171 let value = Bytes::from(format!("value{}", i).as_bytes().to_vec());
172 let op = Operation::insert(key, value, None);
173 let record = Record::new_local_none(i, op);
174 records.push(record.clone());
175 let record_bytes = encode_to_vec(record.clone(), config::standard()).unwrap();
176 let _ = buf.write(&record_bytes).unwrap();
177 }
178
179 buf.set_position(0);
180 let deserializer = BincodeDeserializer {};
181
182 for expected in records {
183 let actual = deserializer.deserialize(&mut buf).unwrap();
184 assert_eq!(expected, actual);
185 }
186
187 assert!(deserializer.deserialize(&mut buf).is_err());
188 assert!(buf.split().1.is_empty());
190 }
191
192 #[test]
193 fn test_read_records() {
194 let count = 5000;
195 let mut mock_reader = MockReader::new();
196 let mut mock_deserializer = MockDeserializer::new();
197
198 mock_reader.expect_read().times(1).returning(|b| {
199 b.clone_from_slice(&MAGIC_BYTES.to_be_bytes());
200 Ok(size_of::<u32>())
201 });
202
203 mock_reader.expect_read().returning(|b| Ok(b.len()));
204
205 let expected = Record::new_local_none(0, Operation::Finish);
206 let return_expected = expected.clone();
207
208 mock_deserializer
209 .expect_deserialize()
210 .times(count)
211 .returning(move |_| Ok(return_expected.clone()));
212
213 let mut trace_reader = TraceReaderImpl::try_new(mock_reader, mock_deserializer).unwrap();
214
215 for _ in 0..count {
216 let actual = trace_reader.read().unwrap();
217 assert_eq!(expected, actual);
218 }
219 }
220}