risingwave_hummock_trace/
record.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::{Bound, Deref};
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use bincode::error::{DecodeError, EncodeError};
19use bincode::{Decode, Encode};
20use bytes::Bytes;
21use prost::Message;
22use risingwave_pb::meta::SubscribeResponse;
23
24use crate::{
25    LocalStorageId, StorageType, TracedHummockReadEpoch, TracedInitOptions, TracedNewLocalOptions,
26    TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
27};
28
29pub type RecordId = u64;
30
31pub type RecordIdGenerator = UniqueIdGenerator<AtomicU64>;
32pub type ConcurrentIdGenerator = UniqueIdGenerator<AtomicU64>;
33
34pub trait UniqueId {
35    type Type;
36    fn inc(&self) -> Self::Type;
37}
38
39impl UniqueId for AtomicU64 {
40    type Type = u64;
41
42    fn inc(&self) -> Self::Type {
43        self.fetch_add(1, Ordering::Relaxed)
44    }
45}
46
47pub struct UniqueIdGenerator<T> {
48    id: T,
49}
50
51impl<T: UniqueId> UniqueIdGenerator<T> {
52    pub fn new(id: T) -> Self {
53        Self { id }
54    }
55
56    pub fn next(&self) -> T::Type {
57        self.id.inc()
58    }
59}
60
61#[derive(Encode, Decode, Debug, PartialEq, Clone)]
62pub struct Record {
63    pub storage_type: StorageType,
64    pub record_id: RecordId,
65    pub operation: Operation,
66}
67
68impl Record {
69    pub fn new(storage_type: StorageType, record_id: RecordId, operation: Operation) -> Self {
70        Self {
71            storage_type,
72            record_id,
73            operation,
74        }
75    }
76
77    pub fn storage_type(&self) -> &StorageType {
78        &self.storage_type
79    }
80
81    pub fn record_id(&self) -> RecordId {
82        self.record_id
83    }
84
85    pub fn operation(&self) -> &Operation {
86        &self.operation
87    }
88
89    pub fn is_iter_related(&self) -> bool {
90        matches!(
91            self.operation(),
92            Operation::Iter { .. } | Operation::IterNext(_)
93        )
94    }
95
96    #[cfg(test)]
97    pub(crate) fn new_local_none(record_id: RecordId, operation: Operation) -> Self {
98        Self::new(StorageType::Global, record_id, operation)
99    }
100}
101
102pub type TracedIterRange = (Bound<TracedBytes>, Bound<TracedBytes>);
103
104/// Operations represents Hummock operations
105#[derive(Encode, Decode, PartialEq, Debug, Clone)]
106pub enum Operation {
107    /// Get operation of Hummock.
108    Get {
109        /// Key to retrieve.
110        key: TracedBytes,
111        /// Optional epoch value.
112        epoch: Option<u64>,
113        /// Read options for the operation.
114        read_options: TracedReadOptions,
115    },
116
117    /// Insert operation of Hummock.
118    Insert {
119        /// Key to insert.
120        key: TracedBytes,
121        /// New value to insert.
122        new_val: TracedBytes,
123        /// Optional old value to replace.
124        old_val: Option<TracedBytes>,
125    },
126
127    /// Delete operation of Hummock.
128    Delete {
129        /// Key to delete.
130        key: TracedBytes,
131        /// Value to match for deletion.
132        old_val: TracedBytes,
133    },
134
135    /// Iter operation of Hummock.
136    Iter {
137        /// Key range for iteration.
138        key_range: TracedIterRange,
139        /// Optional epoch value.
140        epoch: Option<u64>,
141        /// Read options for the operation.
142        read_options: TracedReadOptions,
143    },
144
145    /// Iter.next operation of Hummock.
146    IterNext(RecordId),
147
148    /// Sync operation of Hummock.
149    Sync(Vec<(u64, Vec<u32>)>),
150
151    /// `MetaMessage` operation of Hummock.
152    MetaMessage(Box<TracedSubResp>),
153
154    /// Result operation of Hummock.
155    Result(OperationResult),
156
157    /// `NewLocalStorage` operation of Hummock.
158    NewLocalStorage(TracedNewLocalOptions, LocalStorageId),
159
160    /// `DropLocalStorage` operation of Hummock.
161    DropLocalStorage,
162
163    /// Init of a local storage
164    LocalStorageInit(TracedInitOptions),
165
166    /// Try wait epoch
167    TryWaitEpoch(TracedHummockReadEpoch, TracedTryWaitEpochOptions),
168
169    /// Seal current epoch
170    SealCurrentEpoch {
171        epoch: u64,
172        opts: TracedSealCurrentEpochOptions,
173    },
174
175    TryFlush,
176
177    Flush,
178    /// Finish operation of Hummock.
179    Finish,
180}
181
182impl Operation {
183    pub fn get(key: Bytes, epoch: Option<u64>, read_options: TracedReadOptions) -> Operation {
184        Operation::Get {
185            key: key.into(),
186            epoch,
187            read_options,
188        }
189    }
190
191    pub fn insert(key: Bytes, new_val: Bytes, old_val: Option<Bytes>) -> Operation {
192        Operation::Insert {
193            key: key.into(),
194            new_val: new_val.into(),
195            old_val: old_val.map(|v| v.into()),
196        }
197    }
198}
199
200#[derive(PartialEq, Eq, Debug, Clone)]
201pub struct TracedBytes(Bytes);
202
203impl Deref for TracedBytes {
204    type Target = Bytes;
205
206    fn deref(&self) -> &Self::Target {
207        &self.0
208    }
209}
210
211impl Encode for TracedBytes {
212    fn encode<E: bincode::enc::Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
213        Encode::encode(&self.0.as_ref(), encoder)
214    }
215}
216
217impl Decode for TracedBytes {
218    fn decode<D: bincode::de::Decoder>(
219        decoder: &mut D,
220    ) -> Result<Self, bincode::error::DecodeError> {
221        let buf: Vec<u8> = Decode::decode(decoder)?;
222        let bytes = Bytes::from(buf);
223        Ok(Self(bytes))
224    }
225}
226
227impl<'de> bincode::BorrowDecode<'de> for TracedBytes {
228    fn borrow_decode<D: bincode::de::BorrowDecoder<'de>>(
229        decoder: &mut D,
230    ) -> core::result::Result<Self, bincode::error::DecodeError> {
231        let buf: Vec<u8> = Decode::decode(decoder)?;
232        let bytes = Bytes::from(buf);
233        Ok(Self(bytes))
234    }
235}
236
237impl From<Vec<u8>> for TracedBytes {
238    fn from(value: Vec<u8>) -> Self {
239        Self(Bytes::from(value))
240    }
241}
242
243impl From<Bytes> for TracedBytes {
244    fn from(value: Bytes) -> Self {
245        Self(value)
246    }
247}
248
249impl From<TracedBytes> for Bytes {
250    fn from(value: TracedBytes) -> Self {
251        value.0
252    }
253}
254/// `TraceResult` discards Error and only traces whether succeeded or not.
255/// Use Option rather than Result because it's overhead to serialize Error.
256#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
257pub enum TraceResult<T> {
258    Ok(T),
259    Err,
260}
261
262impl<T> TraceResult<T> {
263    pub fn is_ok(&self) -> bool {
264        matches!(*self, Self::Ok(_))
265    }
266}
267
268impl<T, E> From<std::result::Result<T, E>> for TraceResult<T> {
269    fn from(value: std::result::Result<T, E>) -> Self {
270        match value {
271            Ok(v) => Self::Ok(v),
272            Err(_) => Self::Err, // discard error
273        }
274    }
275}
276
277#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
278pub enum OperationResult {
279    Get(TraceResult<Option<TracedBytes>>),
280    Insert(TraceResult<()>),
281    Delete(TraceResult<()>),
282    TryFlush(TraceResult<()>),
283    Flush(TraceResult<usize>),
284    Iter(TraceResult<()>),
285    IterNext(TraceResult<Option<(TracedBytes, TracedBytes)>>),
286    Sync(TraceResult<usize>),
287    NotifyHummock(TraceResult<()>),
288    TryWaitEpoch(TraceResult<()>),
289}
290
291#[derive(PartialEq, Debug, Clone)]
292pub struct TracedSubResp(pub SubscribeResponse);
293
294impl Encode for TracedSubResp {
295    fn encode<E: bincode::enc::Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
296        // SubscribeResponse and its implementation of Serialize is generated
297        // by prost and pbjson for protobuf mapping.
298        // Serialization methods like Bincode may not correctly serialize it.
299        // So we use prost::Message::encode
300        let mut buf = vec![];
301        self.0
302            .encode(&mut buf)
303            .map_err(|_| EncodeError::Other("failed to encode subscribeResponse"))?;
304        Encode::encode(&buf, encoder)
305    }
306}
307
308impl Decode for TracedSubResp {
309    fn decode<D: bincode::de::Decoder>(
310        decoder: &mut D,
311    ) -> Result<Self, bincode::error::DecodeError> {
312        let buf: Vec<u8> = Decode::decode(decoder)?;
313        let resp = Message::decode(&buf[..]).map_err(|_| {
314            DecodeError::OtherString("failed to decode subscribeResponse".to_owned())
315        })?;
316        Ok(Self(resp))
317    }
318}
319
320impl<'de> bincode::BorrowDecode<'de> for TracedSubResp {
321    fn borrow_decode<D: bincode::de::BorrowDecoder<'de>>(
322        decoder: &mut D,
323    ) -> core::result::Result<Self, bincode::error::DecodeError> {
324        let buf: Vec<u8> = Decode::decode(decoder)?;
325        let resp = Message::decode(&buf[..]).map_err(|_| {
326            DecodeError::OtherString("failed to decode subscribeResponse".to_owned())
327        })?;
328        Ok(Self(resp))
329    }
330}
331
332impl From<SubscribeResponse> for TracedSubResp {
333    fn from(value: SubscribeResponse) -> Self {
334        Self(value)
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use std::collections::HashSet;
341    use std::sync::Arc;
342
343    use parking_lot::Mutex;
344
345    use super::*;
346
347    // test atomic id
348    #[tokio::test(flavor = "multi_thread")]
349    async fn test_atomic_id() {
350        let r#gen = Arc::new(UniqueIdGenerator::new(AtomicU64::new(0)));
351        let mut handles = Vec::new();
352        let ids_lock = Arc::new(Mutex::new(HashSet::new()));
353        let count: u64 = 5000;
354
355        for _ in 0..count {
356            let ids = ids_lock.clone();
357            let r#gen = r#gen.clone();
358            handles.push(tokio::spawn(async move {
359                let id = r#gen.next();
360                ids.lock().insert(id);
361            }));
362        }
363
364        for handle in handles {
365            handle.await.unwrap();
366        }
367
368        let ids = ids_lock.lock();
369
370        for i in 0..count {
371            assert!(ids.contains(&i));
372        }
373    }
374
375    #[test]
376    fn test_record_is_iter_related() {
377        let iter_operation = Operation::Iter {
378            key_range: (Bound::Unbounded, Bound::Unbounded),
379            epoch: None,
380            read_options: TracedReadOptions::for_test(0),
381        };
382        let get_operation = Operation::Get {
383            key: TracedBytes(Bytes::from("test")),
384            epoch: None,
385            read_options: TracedReadOptions::for_test(0),
386        };
387
388        let iter_record = Record::new(StorageType::Global, 1, iter_operation);
389        let get_record = Record::new(StorageType::Global, 2, get_operation);
390
391        assert!(iter_record.is_iter_related());
392        assert!(!get_record.is_iter_related());
393    }
394}