risingwave_hummock_trace/
read.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::Read;
16
17use bincode::{config, decode_from_std_read};
18use byteorder::{BigEndian, ReadBytesExt};
19#[cfg(test)]
20use mockall::automock;
21
22use crate::error::{Result, TraceError};
23use crate::{MAGIC_BYTES, Record};
24#[cfg_attr(test, automock)]
25pub trait TraceReader {
26    fn read(&mut self) -> Result<Record>;
27    fn read_n(&mut self, n: usize) -> Result<Vec<Record>> {
28        let mut ops = Vec::with_capacity(n);
29        for _ in 0..n {
30            let op = self.read()?;
31            ops.push(op);
32        }
33        Ok(ops)
34    }
35}
36/// Deserializer decodes a record from memory
37#[cfg_attr(test, automock)]
38pub trait Deserializer<R: Read> {
39    /// consumes the reader and deserialize a record
40    fn deserialize(&self, reader: &mut R) -> Result<Record>;
41}
42
43/// Decodes bincode format serialized data
44pub struct BincodeDeserializer;
45
46impl<R: Read> Deserializer<R> for BincodeDeserializer {
47    fn deserialize(&self, reader: &mut R) -> Result<Record> {
48        let record = decode_from_std_read(reader, config::standard())?;
49        Ok(record)
50    }
51}
52
53pub struct TraceReaderImpl<R: Read, D: Deserializer<R>> {
54    reader: R,
55    deserializer: D,
56}
57
58impl<R: Read, D: Deserializer<R>> TraceReaderImpl<R, D> {
59    pub fn try_new(mut reader: R, deserializer: D) -> Result<Self> {
60        // Read the 32-bit unsigned integer from the reader using the BigEndian byte order.
61        let magic_bytes = reader.read_u32::<BigEndian>()?;
62
63        // Check if the magic bytes match the expected value.
64        if magic_bytes != MAGIC_BYTES {
65            Err(TraceError::MagicBytes {
66                expected: MAGIC_BYTES,
67                found: magic_bytes,
68            })
69        } else {
70            // Return the TraceReaderImpl instance containing the reader and deserializer.
71            Ok(Self {
72                reader,
73                deserializer,
74            })
75        }
76    }
77}
78
79impl<R: Read> TraceReaderImpl<R, BincodeDeserializer> {
80    pub fn new_bincode(reader: R) -> Result<Self> {
81        let deserializer = BincodeDeserializer {};
82        Self::try_new(reader, deserializer)
83    }
84}
85
86impl<R: Read, D: Deserializer<R>> TraceReader for TraceReaderImpl<R, D> {
87    fn read(&mut self) -> Result<Record> {
88        self.deserializer.deserialize(&mut self.reader)
89    }
90}
91
92#[cfg(test)]
93mod test {
94    use std::io::{Cursor, Read, Result, Write};
95    use std::mem::size_of;
96
97    use bincode::config::{self};
98    use bincode::encode_to_vec;
99    use bytes::Bytes;
100    use mockall::mock;
101    use risingwave_pb::common::Status;
102    use risingwave_pb::meta::SubscribeResponse;
103
104    use super::{TraceReader, TraceReaderImpl};
105    use crate::{
106        BincodeDeserializer, Deserializer, MAGIC_BYTES, MockDeserializer, Operation, Record,
107        TracedReadOptions, TracedSubResp,
108    };
109
110    mock! {
111        Reader{}
112        impl Read for Reader{
113            fn read(&mut self, buf: &mut [u8]) -> Result<usize>;
114        }
115    }
116
117    #[test]
118    fn test_bincode_deserialize() {
119        let deserializer = BincodeDeserializer {};
120        let op = Operation::get(
121            Bytes::from(vec![5, 5, 15, 6]),
122            Some(7564),
123            TracedReadOptions::for_test(0),
124        );
125        let expected = Record::new_local_none(54433, op);
126
127        let mut buf = Cursor::new(Vec::new());
128
129        let record_bytes = encode_to_vec(expected.clone(), config::standard()).unwrap();
130        let _ = buf.write(&record_bytes).unwrap();
131
132        buf.set_position(0);
133
134        let actual = deserializer.deserialize(&mut buf).unwrap();
135
136        assert_eq!(expected, actual);
137    }
138    #[test]
139    fn test_bincode_serialize_resp() {
140        let deserializer = BincodeDeserializer {};
141        let resp = TracedSubResp(SubscribeResponse {
142            status: Some(Status {
143                code: 0,
144                message: "abc".to_owned(),
145            }),
146            info: None,
147            operation: 1,
148            version: 100,
149        });
150        let op = Operation::MetaMessage(Box::new(resp));
151        let expected = Record::new_local_none(123, op);
152
153        let mut buf = Cursor::new(Vec::new());
154
155        let record_bytes = encode_to_vec(expected.clone(), config::standard()).unwrap();
156        let _ = buf.write(&record_bytes).unwrap();
157        buf.set_position(0);
158
159        let actual = deserializer.deserialize(&mut buf).unwrap();
160
161        assert_eq!(expected, actual);
162    }
163    #[test]
164    fn test_bincode_deserialize_many() {
165        let count = 5000;
166        let mut buf = Cursor::new(Vec::new());
167        let mut records = Vec::new();
168
169        for i in 0..count {
170            let key = Bytes::from(format!("key{}", i).as_bytes().to_vec());
171            let value = Bytes::from(format!("value{}", i).as_bytes().to_vec());
172            let op = Operation::insert(key, value, None);
173            let record = Record::new_local_none(i, op);
174            records.push(record.clone());
175            let record_bytes = encode_to_vec(record.clone(), config::standard()).unwrap();
176            let _ = buf.write(&record_bytes).unwrap();
177        }
178
179        buf.set_position(0);
180        let deserializer = BincodeDeserializer {};
181
182        for expected in records {
183            let actual = deserializer.deserialize(&mut buf).unwrap();
184            assert_eq!(expected, actual);
185        }
186
187        assert!(deserializer.deserialize(&mut buf).is_err());
188        // https://github.com/rust-lang/rust/pull/109174
189        assert!(buf.split().1.is_empty());
190    }
191
192    #[test]
193    fn test_read_records() {
194        let count = 5000;
195        let mut mock_reader = MockReader::new();
196        let mut mock_deserializer = MockDeserializer::new();
197
198        mock_reader.expect_read().times(1).returning(|b| {
199            b.clone_from_slice(&MAGIC_BYTES.to_be_bytes());
200            Ok(size_of::<u32>())
201        });
202
203        mock_reader.expect_read().returning(|b| Ok(b.len()));
204
205        let expected = Record::new_local_none(0, Operation::Finish);
206        let return_expected = expected.clone();
207
208        mock_deserializer
209            .expect_deserialize()
210            .times(count)
211            .returning(move |_| Ok(return_expected.clone()));
212
213        let mut trace_reader = TraceReaderImpl::try_new(mock_reader, mock_deserializer).unwrap();
214
215        for _ in 0..count {
216            let actual = trace_reader.read().unwrap();
217            assert_eq!(expected, actual);
218        }
219    }
220}