risingwave_hummock_trace/
record.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::{Bound, Deref};
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use bincode::error::{DecodeError, EncodeError};
19use bincode::{Decode, Encode};
20use bytes::Bytes;
21use prost::Message;
22use risingwave_pb::meta::SubscribeResponse;
23
24use crate::{
25    LocalStorageId, StorageType, TracedHummockReadEpoch, TracedInitOptions, TracedNewLocalOptions,
26    TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
27};
28
29pub type RecordId = u64;
30
31pub type RecordIdGenerator = UniqueIdGenerator<AtomicU64>;
32pub type ConcurrentIdGenerator = UniqueIdGenerator<AtomicU64>;
33
34pub trait UniqueId {
35    type Type;
36    fn inc(&self) -> Self::Type;
37}
38
39impl UniqueId for AtomicU64 {
40    type Type = u64;
41
42    fn inc(&self) -> Self::Type {
43        self.fetch_add(1, Ordering::Relaxed)
44    }
45}
46
47pub struct UniqueIdGenerator<T> {
48    id: T,
49}
50
51impl<T: UniqueId> UniqueIdGenerator<T> {
52    pub fn new(id: T) -> Self {
53        Self { id }
54    }
55
56    pub fn next(&self) -> T::Type {
57        self.id.inc()
58    }
59}
60
61#[derive(Encode, Decode, Debug, PartialEq, Clone)]
62pub struct Record {
63    pub storage_type: StorageType,
64    pub record_id: RecordId,
65    pub operation: Operation,
66}
67
68impl Record {
69    pub fn new(storage_type: StorageType, record_id: RecordId, operation: Operation) -> Self {
70        Self {
71            storage_type,
72            record_id,
73            operation,
74        }
75    }
76
77    pub fn storage_type(&self) -> &StorageType {
78        &self.storage_type
79    }
80
81    pub fn record_id(&self) -> RecordId {
82        self.record_id
83    }
84
85    pub fn operation(&self) -> &Operation {
86        &self.operation
87    }
88
89    pub fn is_iter_related(&self) -> bool {
90        matches!(
91            self.operation(),
92            Operation::Iter { .. } | Operation::IterNext(_)
93        )
94    }
95
96    #[cfg(test)]
97    pub(crate) fn new_local_none(record_id: RecordId, operation: Operation) -> Self {
98        Self::new(StorageType::Global, record_id, operation)
99    }
100}
101
102pub type TracedIterRange = (Bound<TracedBytes>, Bound<TracedBytes>);
103
104/// Operations represents Hummock operations
105#[derive(Encode, Decode, PartialEq, Debug, Clone)]
106pub enum Operation {
107    /// Get operation of Hummock.
108    Get {
109        /// Key to retrieve.
110        key: TracedBytes,
111        /// Optional epoch value.
112        epoch: Option<u64>,
113        /// Read options for the operation.
114        read_options: TracedReadOptions,
115    },
116
117    /// Insert operation of Hummock.
118    Insert {
119        /// Key to insert.
120        key: TracedBytes,
121        /// New value to insert.
122        new_val: TracedBytes,
123        /// Optional old value to replace.
124        old_val: Option<TracedBytes>,
125    },
126
127    /// Delete operation of Hummock.
128    Delete {
129        /// Key to delete.
130        key: TracedBytes,
131        /// Value to match for deletion.
132        old_val: TracedBytes,
133    },
134
135    /// Iter operation of Hummock.
136    Iter {
137        /// Key range for iteration.
138        key_range: TracedIterRange,
139        /// Optional epoch value.
140        epoch: Option<u64>,
141        /// Read options for the operation.
142        read_options: TracedReadOptions,
143    },
144
145    /// Iter.next operation of Hummock.
146    IterNext(RecordId),
147
148    /// Sync operation of Hummock.
149    Sync(Vec<(u64, Vec<u32>)>),
150
151    /// `MetaMessage` operation of Hummock.
152    MetaMessage(Box<TracedSubResp>),
153
154    /// Result operation of Hummock.
155    Result(OperationResult),
156
157    /// `NewLocalStorage` operation of Hummock.
158    NewLocalStorage(TracedNewLocalOptions, LocalStorageId),
159
160    /// `DropLocalStorage` operation of Hummock.
161    DropLocalStorage,
162
163    /// Init of a local storage
164    LocalStorageInit(TracedInitOptions),
165
166    /// Try wait epoch
167    TryWaitEpoch(TracedHummockReadEpoch, TracedTryWaitEpochOptions),
168
169    /// Seal current epoch
170    SealCurrentEpoch {
171        epoch: u64,
172        opts: TracedSealCurrentEpochOptions,
173    },
174
175    LocalStorageEpoch,
176
177    LocalStorageIsDirty,
178
179    TryFlush,
180
181    Flush,
182    /// Finish operation of Hummock.
183    Finish,
184}
185
186impl Operation {
187    pub fn get(key: Bytes, epoch: Option<u64>, read_options: TracedReadOptions) -> Operation {
188        Operation::Get {
189            key: key.into(),
190            epoch,
191            read_options,
192        }
193    }
194
195    pub fn insert(key: Bytes, new_val: Bytes, old_val: Option<Bytes>) -> Operation {
196        Operation::Insert {
197            key: key.into(),
198            new_val: new_val.into(),
199            old_val: old_val.map(|v| v.into()),
200        }
201    }
202}
203
204#[derive(PartialEq, Eq, Debug, Clone)]
205pub struct TracedBytes(Bytes);
206
207impl Deref for TracedBytes {
208    type Target = Bytes;
209
210    fn deref(&self) -> &Self::Target {
211        &self.0
212    }
213}
214
215impl Encode for TracedBytes {
216    fn encode<E: bincode::enc::Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
217        Encode::encode(&self.0.as_ref(), encoder)
218    }
219}
220
221impl Decode for TracedBytes {
222    fn decode<D: bincode::de::Decoder>(
223        decoder: &mut D,
224    ) -> Result<Self, bincode::error::DecodeError> {
225        let buf: Vec<u8> = Decode::decode(decoder)?;
226        let bytes = Bytes::from(buf);
227        Ok(Self(bytes))
228    }
229}
230
231impl<'de> bincode::BorrowDecode<'de> for TracedBytes {
232    fn borrow_decode<D: bincode::de::BorrowDecoder<'de>>(
233        decoder: &mut D,
234    ) -> core::result::Result<Self, bincode::error::DecodeError> {
235        let buf: Vec<u8> = Decode::decode(decoder)?;
236        let bytes = Bytes::from(buf);
237        Ok(Self(bytes))
238    }
239}
240
241impl From<Vec<u8>> for TracedBytes {
242    fn from(value: Vec<u8>) -> Self {
243        Self(Bytes::from(value))
244    }
245}
246
247impl From<Bytes> for TracedBytes {
248    fn from(value: Bytes) -> Self {
249        Self(value)
250    }
251}
252
253impl From<TracedBytes> for Bytes {
254    fn from(value: TracedBytes) -> Self {
255        value.0
256    }
257}
258/// `TraceResult` discards Error and only traces whether succeeded or not.
259/// Use Option rather than Result because it's overhead to serialize Error.
260#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
261pub enum TraceResult<T> {
262    Ok(T),
263    Err,
264}
265
266impl<T> TraceResult<T> {
267    pub fn is_ok(&self) -> bool {
268        matches!(*self, Self::Ok(_))
269    }
270}
271
272impl<T, E> From<std::result::Result<T, E>> for TraceResult<T> {
273    fn from(value: std::result::Result<T, E>) -> Self {
274        match value {
275            Ok(v) => Self::Ok(v),
276            Err(_) => Self::Err, // discard error
277        }
278    }
279}
280
281#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
282pub enum OperationResult {
283    Get(TraceResult<Option<TracedBytes>>),
284    Insert(TraceResult<()>),
285    Delete(TraceResult<()>),
286    TryFlush(TraceResult<()>),
287    Flush(TraceResult<usize>),
288    Iter(TraceResult<()>),
289    IterNext(TraceResult<Option<(TracedBytes, TracedBytes)>>),
290    Sync(TraceResult<usize>),
291    NotifyHummock(TraceResult<()>),
292    TryWaitEpoch(TraceResult<()>),
293    LocalStorageEpoch(TraceResult<u64>),
294    LocalStorageIsDirty(TraceResult<bool>),
295}
296
297#[derive(PartialEq, Debug, Clone)]
298pub struct TracedSubResp(pub SubscribeResponse);
299
300impl Encode for TracedSubResp {
301    fn encode<E: bincode::enc::Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
302        // SubscribeResponse and its implementation of Serialize is generated
303        // by prost and pbjson for protobuf mapping.
304        // Serialization methods like Bincode may not correctly serialize it.
305        // So we use prost::Message::encode
306        let mut buf = vec![];
307        self.0
308            .encode(&mut buf)
309            .map_err(|_| EncodeError::Other("failed to encode subscribeResponse"))?;
310        Encode::encode(&buf, encoder)
311    }
312}
313
314impl Decode for TracedSubResp {
315    fn decode<D: bincode::de::Decoder>(
316        decoder: &mut D,
317    ) -> Result<Self, bincode::error::DecodeError> {
318        let buf: Vec<u8> = Decode::decode(decoder)?;
319        let resp = Message::decode(&buf[..]).map_err(|_| {
320            DecodeError::OtherString("failed to decode subscribeResponse".to_owned())
321        })?;
322        Ok(Self(resp))
323    }
324}
325
326impl<'de> bincode::BorrowDecode<'de> for TracedSubResp {
327    fn borrow_decode<D: bincode::de::BorrowDecoder<'de>>(
328        decoder: &mut D,
329    ) -> core::result::Result<Self, bincode::error::DecodeError> {
330        let buf: Vec<u8> = Decode::decode(decoder)?;
331        let resp = Message::decode(&buf[..]).map_err(|_| {
332            DecodeError::OtherString("failed to decode subscribeResponse".to_owned())
333        })?;
334        Ok(Self(resp))
335    }
336}
337
338impl From<SubscribeResponse> for TracedSubResp {
339    fn from(value: SubscribeResponse) -> Self {
340        Self(value)
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use std::collections::HashSet;
347    use std::sync::Arc;
348
349    use parking_lot::Mutex;
350
351    use super::*;
352
353    // test atomic id
354    #[tokio::test(flavor = "multi_thread")]
355    async fn test_atomic_id() {
356        let r#gen = Arc::new(UniqueIdGenerator::new(AtomicU64::new(0)));
357        let mut handles = Vec::new();
358        let ids_lock = Arc::new(Mutex::new(HashSet::new()));
359        let count: u64 = 5000;
360
361        for _ in 0..count {
362            let ids = ids_lock.clone();
363            let r#gen = r#gen.clone();
364            handles.push(tokio::spawn(async move {
365                let id = r#gen.next();
366                ids.lock().insert(id);
367            }));
368        }
369
370        for handle in handles {
371            handle.await.unwrap();
372        }
373
374        let ids = ids_lock.lock();
375
376        for i in 0..count {
377            assert!(ids.contains(&i));
378        }
379    }
380
381    #[test]
382    fn test_record_is_iter_related() {
383        let iter_operation = Operation::Iter {
384            key_range: (Bound::Unbounded, Bound::Unbounded),
385            epoch: None,
386            read_options: TracedReadOptions::for_test(0),
387        };
388        let get_operation = Operation::Get {
389            key: TracedBytes(Bytes::from("test")),
390            epoch: None,
391            read_options: TracedReadOptions::for_test(0),
392        };
393
394        let iter_record = Record::new(StorageType::Global, 1, iter_operation);
395        let get_record = Record::new(StorageType::Global, 2, get_operation);
396
397        assert!(iter_record.is_iter_related());
398        assert!(!get_record.is_iter_related());
399    }
400}