risingwave_hummock_trace/
record.rs1use std::ops::{Bound, Deref};
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use bincode::error::{DecodeError, EncodeError};
19use bincode::{Decode, Encode};
20use bytes::Bytes;
21use prost::Message;
22use risingwave_pb::meta::SubscribeResponse;
23
24use crate::{
25 LocalStorageId, StorageType, TracedHummockReadEpoch, TracedInitOptions, TracedNewLocalOptions,
26 TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
27};
28
29pub type RecordId = u64;
30
31pub type RecordIdGenerator = UniqueIdGenerator<AtomicU64>;
32pub type ConcurrentIdGenerator = UniqueIdGenerator<AtomicU64>;
33
34pub trait UniqueId {
35 type Type;
36 fn inc(&self) -> Self::Type;
37}
38
39impl UniqueId for AtomicU64 {
40 type Type = u64;
41
42 fn inc(&self) -> Self::Type {
43 self.fetch_add(1, Ordering::Relaxed)
44 }
45}
46
47pub struct UniqueIdGenerator<T> {
48 id: T,
49}
50
51impl<T: UniqueId> UniqueIdGenerator<T> {
52 pub fn new(id: T) -> Self {
53 Self { id }
54 }
55
56 pub fn next(&self) -> T::Type {
57 self.id.inc()
58 }
59}
60
61#[derive(Encode, Decode, Debug, PartialEq, Clone)]
62pub struct Record {
63 pub storage_type: StorageType,
64 pub record_id: RecordId,
65 pub operation: Operation,
66}
67
68impl Record {
69 pub fn new(storage_type: StorageType, record_id: RecordId, operation: Operation) -> Self {
70 Self {
71 storage_type,
72 record_id,
73 operation,
74 }
75 }
76
77 pub fn storage_type(&self) -> &StorageType {
78 &self.storage_type
79 }
80
81 pub fn record_id(&self) -> RecordId {
82 self.record_id
83 }
84
85 pub fn operation(&self) -> &Operation {
86 &self.operation
87 }
88
89 pub fn is_iter_related(&self) -> bool {
90 matches!(
91 self.operation(),
92 Operation::Iter { .. } | Operation::IterNext(_)
93 )
94 }
95
96 #[cfg(test)]
97 pub(crate) fn new_local_none(record_id: RecordId, operation: Operation) -> Self {
98 Self::new(StorageType::Global, record_id, operation)
99 }
100}
101
102pub type TracedIterRange = (Bound<TracedBytes>, Bound<TracedBytes>);
103
104#[derive(Encode, Decode, PartialEq, Debug, Clone)]
106pub enum Operation {
107 Get {
109 key: TracedBytes,
111 epoch: Option<u64>,
113 read_options: TracedReadOptions,
115 },
116
117 Insert {
119 key: TracedBytes,
121 new_val: TracedBytes,
123 old_val: Option<TracedBytes>,
125 },
126
127 Delete {
129 key: TracedBytes,
131 old_val: TracedBytes,
133 },
134
135 Iter {
137 key_range: TracedIterRange,
139 epoch: Option<u64>,
141 read_options: TracedReadOptions,
143 },
144
145 IterNext(RecordId),
147
148 Sync(Vec<(u64, Vec<u32>)>),
150
151 MetaMessage(Box<TracedSubResp>),
153
154 Result(OperationResult),
156
157 NewLocalStorage(TracedNewLocalOptions, LocalStorageId),
159
160 DropLocalStorage,
162
163 LocalStorageInit(TracedInitOptions),
165
166 TryWaitEpoch(TracedHummockReadEpoch, TracedTryWaitEpochOptions),
168
169 SealCurrentEpoch {
171 epoch: u64,
172 opts: TracedSealCurrentEpochOptions,
173 },
174
175 LocalStorageEpoch,
176
177 LocalStorageIsDirty,
178
179 TryFlush,
180
181 Flush,
182 Finish,
184}
185
186impl Operation {
187 pub fn get(key: Bytes, epoch: Option<u64>, read_options: TracedReadOptions) -> Operation {
188 Operation::Get {
189 key: key.into(),
190 epoch,
191 read_options,
192 }
193 }
194
195 pub fn insert(key: Bytes, new_val: Bytes, old_val: Option<Bytes>) -> Operation {
196 Operation::Insert {
197 key: key.into(),
198 new_val: new_val.into(),
199 old_val: old_val.map(|v| v.into()),
200 }
201 }
202}
203
204#[derive(PartialEq, Eq, Debug, Clone)]
205pub struct TracedBytes(Bytes);
206
207impl Deref for TracedBytes {
208 type Target = Bytes;
209
210 fn deref(&self) -> &Self::Target {
211 &self.0
212 }
213}
214
215impl Encode for TracedBytes {
216 fn encode<E: bincode::enc::Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
217 Encode::encode(&self.0.as_ref(), encoder)
218 }
219}
220
221impl Decode for TracedBytes {
222 fn decode<D: bincode::de::Decoder>(
223 decoder: &mut D,
224 ) -> Result<Self, bincode::error::DecodeError> {
225 let buf: Vec<u8> = Decode::decode(decoder)?;
226 let bytes = Bytes::from(buf);
227 Ok(Self(bytes))
228 }
229}
230
231impl<'de> bincode::BorrowDecode<'de> for TracedBytes {
232 fn borrow_decode<D: bincode::de::BorrowDecoder<'de>>(
233 decoder: &mut D,
234 ) -> core::result::Result<Self, bincode::error::DecodeError> {
235 let buf: Vec<u8> = Decode::decode(decoder)?;
236 let bytes = Bytes::from(buf);
237 Ok(Self(bytes))
238 }
239}
240
241impl From<Vec<u8>> for TracedBytes {
242 fn from(value: Vec<u8>) -> Self {
243 Self(Bytes::from(value))
244 }
245}
246
247impl From<Bytes> for TracedBytes {
248 fn from(value: Bytes) -> Self {
249 Self(value)
250 }
251}
252
253impl From<TracedBytes> for Bytes {
254 fn from(value: TracedBytes) -> Self {
255 value.0
256 }
257}
258#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
261pub enum TraceResult<T> {
262 Ok(T),
263 Err,
264}
265
266impl<T> TraceResult<T> {
267 pub fn is_ok(&self) -> bool {
268 matches!(*self, Self::Ok(_))
269 }
270}
271
272impl<T, E> From<std::result::Result<T, E>> for TraceResult<T> {
273 fn from(value: std::result::Result<T, E>) -> Self {
274 match value {
275 Ok(v) => Self::Ok(v),
276 Err(_) => Self::Err, }
278 }
279}
280
281#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
282pub enum OperationResult {
283 Get(TraceResult<Option<TracedBytes>>),
284 Insert(TraceResult<()>),
285 Delete(TraceResult<()>),
286 TryFlush(TraceResult<()>),
287 Flush(TraceResult<usize>),
288 Iter(TraceResult<()>),
289 IterNext(TraceResult<Option<(TracedBytes, TracedBytes)>>),
290 Sync(TraceResult<usize>),
291 NotifyHummock(TraceResult<()>),
292 TryWaitEpoch(TraceResult<()>),
293 LocalStorageEpoch(TraceResult<u64>),
294 LocalStorageIsDirty(TraceResult<bool>),
295}
296
297#[derive(PartialEq, Debug, Clone)]
298pub struct TracedSubResp(pub SubscribeResponse);
299
300impl Encode for TracedSubResp {
301 fn encode<E: bincode::enc::Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
302 let mut buf = vec![];
307 self.0
308 .encode(&mut buf)
309 .map_err(|_| EncodeError::Other("failed to encode subscribeResponse"))?;
310 Encode::encode(&buf, encoder)
311 }
312}
313
314impl Decode for TracedSubResp {
315 fn decode<D: bincode::de::Decoder>(
316 decoder: &mut D,
317 ) -> Result<Self, bincode::error::DecodeError> {
318 let buf: Vec<u8> = Decode::decode(decoder)?;
319 let resp = Message::decode(&buf[..]).map_err(|_| {
320 DecodeError::OtherString("failed to decode subscribeResponse".to_owned())
321 })?;
322 Ok(Self(resp))
323 }
324}
325
326impl<'de> bincode::BorrowDecode<'de> for TracedSubResp {
327 fn borrow_decode<D: bincode::de::BorrowDecoder<'de>>(
328 decoder: &mut D,
329 ) -> core::result::Result<Self, bincode::error::DecodeError> {
330 let buf: Vec<u8> = Decode::decode(decoder)?;
331 let resp = Message::decode(&buf[..]).map_err(|_| {
332 DecodeError::OtherString("failed to decode subscribeResponse".to_owned())
333 })?;
334 Ok(Self(resp))
335 }
336}
337
338impl From<SubscribeResponse> for TracedSubResp {
339 fn from(value: SubscribeResponse) -> Self {
340 Self(value)
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use std::collections::HashSet;
347 use std::sync::Arc;
348
349 use parking_lot::Mutex;
350
351 use super::*;
352
353 #[tokio::test(flavor = "multi_thread")]
355 async fn test_atomic_id() {
356 let r#gen = Arc::new(UniqueIdGenerator::new(AtomicU64::new(0)));
357 let mut handles = Vec::new();
358 let ids_lock = Arc::new(Mutex::new(HashSet::new()));
359 let count: u64 = 5000;
360
361 for _ in 0..count {
362 let ids = ids_lock.clone();
363 let r#gen = r#gen.clone();
364 handles.push(tokio::spawn(async move {
365 let id = r#gen.next();
366 ids.lock().insert(id);
367 }));
368 }
369
370 for handle in handles {
371 handle.await.unwrap();
372 }
373
374 let ids = ids_lock.lock();
375
376 for i in 0..count {
377 assert!(ids.contains(&i));
378 }
379 }
380
381 #[test]
382 fn test_record_is_iter_related() {
383 let iter_operation = Operation::Iter {
384 key_range: (Bound::Unbounded, Bound::Unbounded),
385 epoch: None,
386 read_options: TracedReadOptions::for_test(0),
387 };
388 let get_operation = Operation::Get {
389 key: TracedBytes(Bytes::from("test")),
390 epoch: None,
391 read_options: TracedReadOptions::for_test(0),
392 };
393
394 let iter_record = Record::new(StorageType::Global, 1, iter_operation);
395 let get_record = Record::new(StorageType::Global, 2, get_operation);
396
397 assert!(iter_record.is_iter_related());
398 assert!(!get_record.is_iter_related());
399 }
400}