risingwave_hummock_trace/
write.rs1use std::io::Write;
16use std::mem::size_of;
17
18use bincode::{config, encode_into_std_write};
19#[cfg(test)]
20use mockall::{automock, mock};
21
22use super::record::Record;
23use crate::TraceError;
24use crate::error::Result;
25
26pub(crate) type MagicBytes = u32;
27pub(crate) static MAGIC_BYTES: MagicBytes = 0x484D5452; #[cfg_attr(test, automock)]
30pub(crate) trait TraceWriter {
31 fn write(&mut self, record: Record) -> Result<usize>;
32 fn flush(&mut self) -> Result<()>;
33}
34
35#[cfg_attr(test, automock)]
37pub(crate) trait Serializer<W: Write> {
38 fn serialize(&self, record: Record, buf: &mut W) -> Result<usize>;
39}
40
41pub(crate) struct BincodeSerializer;
42
43impl BincodeSerializer {
44 fn new() -> Self {
45 Self
46 }
47}
48
49impl<W: Write> Serializer<W> for BincodeSerializer {
50 fn serialize(&self, record: Record, writer: &mut W) -> Result<usize> {
51 let size = encode_into_std_write(record, writer, config::standard())?;
52 Ok(size)
53 }
54}
55
56pub(crate) struct TraceWriterImpl<W: Write, S: Serializer<W>> {
57 writer: W,
58 serializer: S,
59}
60
61impl<W: Write, S: Serializer<W>> TraceWriterImpl<W, S> {
62 pub(crate) fn try_new(writer: W, serializer: S) -> Result<Self> {
63 let mut writer = Self { writer, serializer };
64 writer.write_magic_bytes()?;
65 Ok(writer)
66 }
67
68 fn write_magic_bytes(&mut self) -> Result<()> {
69 let size = self.writer.write(&MAGIC_BYTES.to_be_bytes())?;
70 if size != size_of::<MagicBytes>() {
71 Err(TraceError::Other("failed to write magic bytes"))
72 } else {
73 Ok(())
74 }
75 }
76}
77
78impl<W: Write> TraceWriterImpl<W, BincodeSerializer> {
79 pub(crate) fn try_new_bincode(writer: W) -> Result<Self> {
80 Self::try_new(writer, BincodeSerializer::new())
81 }
82}
83
84impl<W: Write, S: Serializer<W>> TraceWriter for TraceWriterImpl<W, S> {
85 fn write(&mut self, record: Record) -> Result<usize> {
86 let size = self.serializer.serialize(record, &mut self.writer)?;
87 Ok(size)
88 }
89
90 fn flush(&mut self) -> Result<()> {
91 self.writer.flush()?;
92 Ok(())
93 }
94}
95
96impl<W: Write, S: Serializer<W>> Drop for TraceWriterImpl<W, S> {
97 fn drop(&mut self) {
98 self.flush().expect("failed to flush TraceWriterImpl");
99 }
100}
101
102#[cfg(test)]
103mod test {
104 use std::io::Cursor;
105
106 use bincode::{decode_from_slice, encode_to_vec};
107 use byteorder::{BigEndian, ReadBytesExt};
108 use bytes::Bytes;
109
110 use super::*;
111 use crate::{Operation, TracedReadOptions};
112
113 mock! {
114 Write{}
115 impl Write for Write{
116 fn write(&mut self, bytes: &[u8]) -> std::result::Result<usize, std::io::Error>;
117 fn flush(&mut self) -> std::result::Result<(), std::io::Error>;
118 }
119 }
120
121 #[test]
122 fn test_bincode_serialize() {
123 let op = Operation::get(
124 Bytes::from(vec![0, 1, 2, 3]),
125 Some(123),
126 TracedReadOptions::for_test(123),
127 );
128 let expected = Record::new_local_none(0, op);
129 let serializer = BincodeSerializer::new();
130 let mut buf = Vec::new();
131 let write_size = serializer.serialize(expected.clone(), &mut buf).unwrap();
132 assert_eq!(write_size, buf.len());
133
134 let (actual, read_size) = decode_from_slice(&buf, config::standard()).unwrap();
135
136 assert_eq!(write_size, read_size);
137 assert_eq!(expected, actual);
138 }
139
140 #[test]
141 fn test_new_writer_impl() {
142 let mut buf = Cursor::new(Vec::new());
144
145 {
146 let mut writer = TraceWriterImpl::try_new_bincode(&mut buf).unwrap();
148
149 writer.flush().unwrap();
150 }
151 buf.set_position(0);
152 let magic_bytes = buf.read_u32::<BigEndian>().unwrap();
153 assert_eq!(magic_bytes, MAGIC_BYTES);
154 }
155
156 #[test]
157 fn test_writer_impl_write() {
158 let mut mock_writer = MockWrite::new();
159 let key = Bytes::from(vec![123]);
160 let value = Bytes::from(vec![234]);
161 let op = Operation::insert(key, value, None);
162 let record = Record::new_local_none(0, op);
163 let r_bytes = encode_to_vec(record.clone(), config::standard()).unwrap();
164 let r_len = r_bytes.len();
165
166 mock_writer
167 .expect_write()
168 .times(1)
169 .returning(|_| Ok(size_of::<u32>()));
170 mock_writer.expect_write().returning(|b| Ok(b.len()));
171
172 mock_writer.expect_flush().times(1).returning(|| Ok(()));
173
174 let mut mock_serializer = MockSerializer::new();
175
176 mock_serializer
177 .expect_serialize()
178 .times(1)
179 .returning(move |_, _| Ok(r_len));
180
181 let mut writer = TraceWriterImpl::try_new(mock_writer, mock_serializer).unwrap();
182
183 writer.write(record).unwrap();
184 }
185}