risingwave_hummock_trace/
write.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::Write;
16use std::mem::size_of;
17
18use bincode::{config, encode_into_std_write};
19#[cfg(test)]
20use mockall::{automock, mock};
21
22use super::record::Record;
23use crate::TraceError;
24use crate::error::Result;
25
26pub(crate) type MagicBytes = u32;
27pub(crate) static MAGIC_BYTES: MagicBytes = 0x484D5452; // HMTR
28
29#[cfg_attr(test, automock)]
30pub(crate) trait TraceWriter {
31    fn write(&mut self, record: Record) -> Result<usize>;
32    fn flush(&mut self) -> Result<()>;
33}
34
35/// Serializer serializes a record to std write.
36#[cfg_attr(test, automock)]
37pub(crate) trait Serializer<W: Write> {
38    fn serialize(&self, record: Record, buf: &mut W) -> Result<usize>;
39}
40
41pub(crate) struct BincodeSerializer;
42
43impl BincodeSerializer {
44    fn new() -> Self {
45        Self
46    }
47}
48
49impl<W: Write> Serializer<W> for BincodeSerializer {
50    fn serialize(&self, record: Record, writer: &mut W) -> Result<usize> {
51        let size = encode_into_std_write(record, writer, config::standard())?;
52        Ok(size)
53    }
54}
55
56pub(crate) struct TraceWriterImpl<W: Write, S: Serializer<W>> {
57    writer: W,
58    serializer: S,
59}
60
61impl<W: Write, S: Serializer<W>> TraceWriterImpl<W, S> {
62    pub(crate) fn try_new(writer: W, serializer: S) -> Result<Self> {
63        let mut writer = Self { writer, serializer };
64        writer.write_magic_bytes()?;
65        Ok(writer)
66    }
67
68    fn write_magic_bytes(&mut self) -> Result<()> {
69        let size = self.writer.write(&MAGIC_BYTES.to_be_bytes())?;
70        if size != size_of::<MagicBytes>() {
71            Err(TraceError::Other("failed to write magic bytes"))
72        } else {
73            Ok(())
74        }
75    }
76}
77
78impl<W: Write> TraceWriterImpl<W, BincodeSerializer> {
79    pub(crate) fn try_new_bincode(writer: W) -> Result<Self> {
80        Self::try_new(writer, BincodeSerializer::new())
81    }
82}
83
84impl<W: Write, S: Serializer<W>> TraceWriter for TraceWriterImpl<W, S> {
85    fn write(&mut self, record: Record) -> Result<usize> {
86        let size = self.serializer.serialize(record, &mut self.writer)?;
87        Ok(size)
88    }
89
90    fn flush(&mut self) -> Result<()> {
91        self.writer.flush()?;
92        Ok(())
93    }
94}
95
96impl<W: Write, S: Serializer<W>> Drop for TraceWriterImpl<W, S> {
97    fn drop(&mut self) {
98        self.flush().expect("failed to flush TraceWriterImpl");
99    }
100}
101
102#[cfg(test)]
103mod test {
104    use std::io::Cursor;
105
106    use bincode::{decode_from_slice, encode_to_vec};
107    use byteorder::{BigEndian, ReadBytesExt};
108    use bytes::Bytes;
109
110    use super::*;
111    use crate::{Operation, TracedReadOptions};
112
113    mock! {
114        Write{}
115        impl Write for Write{
116            fn write(&mut self, bytes: &[u8]) -> std::result::Result<usize, std::io::Error>;
117            fn flush(&mut self) -> std::result::Result<(), std::io::Error>;
118        }
119    }
120
121    #[test]
122    fn test_bincode_serialize() {
123        let op = Operation::get(
124            Bytes::from(vec![0, 1, 2, 3]),
125            Some(123),
126            TracedReadOptions::for_test(123),
127        );
128        let expected = Record::new_local_none(0, op);
129        let serializer = BincodeSerializer::new();
130        let mut buf = Vec::new();
131        let write_size = serializer.serialize(expected.clone(), &mut buf).unwrap();
132        assert_eq!(write_size, buf.len());
133
134        let (actual, read_size) = decode_from_slice(&buf, config::standard()).unwrap();
135
136        assert_eq!(write_size, read_size);
137        assert_eq!(expected, actual);
138    }
139
140    #[test]
141    fn test_new_writer_impl() {
142        // Create a Cursor that can be used as a mock writer.
143        let mut buf = Cursor::new(Vec::new());
144
145        {
146            // Create a TraceWriterImpl instance using the mock writer and a BincodeSerializer.
147            let mut writer = TraceWriterImpl::try_new_bincode(&mut buf).unwrap();
148
149            writer.flush().unwrap();
150        }
151        buf.set_position(0);
152        let magic_bytes = buf.read_u32::<BigEndian>().unwrap();
153        assert_eq!(magic_bytes, MAGIC_BYTES);
154    }
155
156    #[test]
157    fn test_writer_impl_write() {
158        let mut mock_writer = MockWrite::new();
159        let key = Bytes::from(vec![123]);
160        let value = Bytes::from(vec![234]);
161        let op = Operation::insert(key, value, None);
162        let record = Record::new_local_none(0, op);
163        let r_bytes = encode_to_vec(record.clone(), config::standard()).unwrap();
164        let r_len = r_bytes.len();
165
166        mock_writer
167            .expect_write()
168            .times(1)
169            .returning(|_| Ok(size_of::<u32>()));
170        mock_writer.expect_write().returning(|b| Ok(b.len()));
171
172        mock_writer.expect_flush().times(1).returning(|| Ok(()));
173
174        let mut mock_serializer = MockSerializer::new();
175
176        mock_serializer
177            .expect_serialize()
178            .times(1)
179            .returning(move |_, _| Ok(r_len));
180
181        let mut writer = TraceWriterImpl::try_new(mock_writer, mock_serializer).unwrap();
182
183        writer.write(record).unwrap();
184    }
185}