risingwave_hummock_trace/
record.rs1use std::ops::{Bound, Deref};
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use bincode::error::{DecodeError, EncodeError};
19use bincode::{Decode, Encode};
20use bytes::Bytes;
21use prost::Message;
22use risingwave_pb::meta::SubscribeResponse;
23
24use crate::{
25 LocalStorageId, StorageType, TracedHummockReadEpoch, TracedInitOptions, TracedNewLocalOptions,
26 TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
27};
28
29pub type RecordId = u64;
30
31pub type RecordIdGenerator = UniqueIdGenerator<AtomicU64>;
32pub type ConcurrentIdGenerator = UniqueIdGenerator<AtomicU64>;
33
34pub trait UniqueId {
35 type Type;
36 fn inc(&self) -> Self::Type;
37}
38
39impl UniqueId for AtomicU64 {
40 type Type = u64;
41
42 fn inc(&self) -> Self::Type {
43 self.fetch_add(1, Ordering::Relaxed)
44 }
45}
46
47pub struct UniqueIdGenerator<T> {
48 id: T,
49}
50
51impl<T: UniqueId> UniqueIdGenerator<T> {
52 pub fn new(id: T) -> Self {
53 Self { id }
54 }
55
56 pub fn next(&self) -> T::Type {
57 self.id.inc()
58 }
59}
60
61#[derive(Encode, Decode, Debug, PartialEq, Clone)]
62pub struct Record {
63 pub storage_type: StorageType,
64 pub record_id: RecordId,
65 pub operation: Operation,
66}
67
68impl Record {
69 pub fn new(storage_type: StorageType, record_id: RecordId, operation: Operation) -> Self {
70 Self {
71 storage_type,
72 record_id,
73 operation,
74 }
75 }
76
77 pub fn storage_type(&self) -> &StorageType {
78 &self.storage_type
79 }
80
81 pub fn record_id(&self) -> RecordId {
82 self.record_id
83 }
84
85 pub fn operation(&self) -> &Operation {
86 &self.operation
87 }
88
89 pub fn is_iter_related(&self) -> bool {
90 matches!(
91 self.operation(),
92 Operation::Iter { .. } | Operation::IterNext(_)
93 )
94 }
95
96 #[cfg(test)]
97 pub(crate) fn new_local_none(record_id: RecordId, operation: Operation) -> Self {
98 Self::new(StorageType::Global, record_id, operation)
99 }
100}
101
102pub type TracedIterRange = (Bound<TracedBytes>, Bound<TracedBytes>);
103
104#[derive(Encode, Decode, PartialEq, Debug, Clone)]
106pub enum Operation {
107 Get {
109 key: TracedBytes,
111 epoch: Option<u64>,
113 read_options: TracedReadOptions,
115 },
116
117 Insert {
119 key: TracedBytes,
121 new_val: TracedBytes,
123 old_val: Option<TracedBytes>,
125 },
126
127 Delete {
129 key: TracedBytes,
131 old_val: TracedBytes,
133 },
134
135 Iter {
137 key_range: TracedIterRange,
139 epoch: Option<u64>,
141 read_options: TracedReadOptions,
143 },
144
145 IterNext(RecordId),
147
148 Sync(Vec<(u64, Vec<u32>)>),
150
151 MetaMessage(Box<TracedSubResp>),
153
154 Result(OperationResult),
156
157 NewLocalStorage(TracedNewLocalOptions, LocalStorageId),
159
160 DropLocalStorage,
162
163 LocalStorageInit(TracedInitOptions),
165
166 TryWaitEpoch(TracedHummockReadEpoch, TracedTryWaitEpochOptions),
168
169 SealCurrentEpoch {
171 epoch: u64,
172 opts: TracedSealCurrentEpochOptions,
173 },
174
175 TryFlush,
176
177 Flush,
178 Finish,
180}
181
182impl Operation {
183 pub fn get(key: Bytes, epoch: Option<u64>, read_options: TracedReadOptions) -> Operation {
184 Operation::Get {
185 key: key.into(),
186 epoch,
187 read_options,
188 }
189 }
190
191 pub fn insert(key: Bytes, new_val: Bytes, old_val: Option<Bytes>) -> Operation {
192 Operation::Insert {
193 key: key.into(),
194 new_val: new_val.into(),
195 old_val: old_val.map(|v| v.into()),
196 }
197 }
198}
199
200#[derive(PartialEq, Eq, Debug, Clone)]
201pub struct TracedBytes(Bytes);
202
203impl Deref for TracedBytes {
204 type Target = Bytes;
205
206 fn deref(&self) -> &Self::Target {
207 &self.0
208 }
209}
210
211impl Encode for TracedBytes {
212 fn encode<E: bincode::enc::Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
213 Encode::encode(&self.0.as_ref(), encoder)
214 }
215}
216
217impl Decode for TracedBytes {
218 fn decode<D: bincode::de::Decoder>(
219 decoder: &mut D,
220 ) -> Result<Self, bincode::error::DecodeError> {
221 let buf: Vec<u8> = Decode::decode(decoder)?;
222 let bytes = Bytes::from(buf);
223 Ok(Self(bytes))
224 }
225}
226
227impl<'de> bincode::BorrowDecode<'de> for TracedBytes {
228 fn borrow_decode<D: bincode::de::BorrowDecoder<'de>>(
229 decoder: &mut D,
230 ) -> core::result::Result<Self, bincode::error::DecodeError> {
231 let buf: Vec<u8> = Decode::decode(decoder)?;
232 let bytes = Bytes::from(buf);
233 Ok(Self(bytes))
234 }
235}
236
237impl From<Vec<u8>> for TracedBytes {
238 fn from(value: Vec<u8>) -> Self {
239 Self(Bytes::from(value))
240 }
241}
242
243impl From<Bytes> for TracedBytes {
244 fn from(value: Bytes) -> Self {
245 Self(value)
246 }
247}
248
249impl From<TracedBytes> for Bytes {
250 fn from(value: TracedBytes) -> Self {
251 value.0
252 }
253}
254#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
257pub enum TraceResult<T> {
258 Ok(T),
259 Err,
260}
261
262impl<T> TraceResult<T> {
263 pub fn is_ok(&self) -> bool {
264 matches!(*self, Self::Ok(_))
265 }
266}
267
268impl<T, E> From<std::result::Result<T, E>> for TraceResult<T> {
269 fn from(value: std::result::Result<T, E>) -> Self {
270 match value {
271 Ok(v) => Self::Ok(v),
272 Err(_) => Self::Err, }
274 }
275}
276
277#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone)]
278pub enum OperationResult {
279 Get(TraceResult<Option<TracedBytes>>),
280 Insert(TraceResult<()>),
281 Delete(TraceResult<()>),
282 TryFlush(TraceResult<()>),
283 Flush(TraceResult<usize>),
284 Iter(TraceResult<()>),
285 IterNext(TraceResult<Option<(TracedBytes, TracedBytes)>>),
286 Sync(TraceResult<usize>),
287 NotifyHummock(TraceResult<()>),
288 TryWaitEpoch(TraceResult<()>),
289}
290
291#[derive(PartialEq, Debug, Clone)]
292pub struct TracedSubResp(pub SubscribeResponse);
293
294impl Encode for TracedSubResp {
295 fn encode<E: bincode::enc::Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
296 let mut buf = vec![];
301 self.0
302 .encode(&mut buf)
303 .map_err(|_| EncodeError::Other("failed to encode subscribeResponse"))?;
304 Encode::encode(&buf, encoder)
305 }
306}
307
308impl Decode for TracedSubResp {
309 fn decode<D: bincode::de::Decoder>(
310 decoder: &mut D,
311 ) -> Result<Self, bincode::error::DecodeError> {
312 let buf: Vec<u8> = Decode::decode(decoder)?;
313 let resp = Message::decode(&buf[..]).map_err(|_| {
314 DecodeError::OtherString("failed to decode subscribeResponse".to_owned())
315 })?;
316 Ok(Self(resp))
317 }
318}
319
320impl<'de> bincode::BorrowDecode<'de> for TracedSubResp {
321 fn borrow_decode<D: bincode::de::BorrowDecoder<'de>>(
322 decoder: &mut D,
323 ) -> core::result::Result<Self, bincode::error::DecodeError> {
324 let buf: Vec<u8> = Decode::decode(decoder)?;
325 let resp = Message::decode(&buf[..]).map_err(|_| {
326 DecodeError::OtherString("failed to decode subscribeResponse".to_owned())
327 })?;
328 Ok(Self(resp))
329 }
330}
331
332impl From<SubscribeResponse> for TracedSubResp {
333 fn from(value: SubscribeResponse) -> Self {
334 Self(value)
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use std::collections::HashSet;
341 use std::sync::Arc;
342
343 use parking_lot::Mutex;
344
345 use super::*;
346
347 #[tokio::test(flavor = "multi_thread")]
349 async fn test_atomic_id() {
350 let r#gen = Arc::new(UniqueIdGenerator::new(AtomicU64::new(0)));
351 let mut handles = Vec::new();
352 let ids_lock = Arc::new(Mutex::new(HashSet::new()));
353 let count: u64 = 5000;
354
355 for _ in 0..count {
356 let ids = ids_lock.clone();
357 let r#gen = r#gen.clone();
358 handles.push(tokio::spawn(async move {
359 let id = r#gen.next();
360 ids.lock().insert(id);
361 }));
362 }
363
364 for handle in handles {
365 handle.await.unwrap();
366 }
367
368 let ids = ids_lock.lock();
369
370 for i in 0..count {
371 assert!(ids.contains(&i));
372 }
373 }
374
375 #[test]
376 fn test_record_is_iter_related() {
377 let iter_operation = Operation::Iter {
378 key_range: (Bound::Unbounded, Bound::Unbounded),
379 epoch: None,
380 read_options: TracedReadOptions::for_test(0),
381 };
382 let get_operation = Operation::Get {
383 key: TracedBytes(Bytes::from("test")),
384 epoch: None,
385 read_options: TracedReadOptions::for_test(0),
386 };
387
388 let iter_record = Record::new(StorageType::Global, 1, iter_operation);
389 let get_record = Record::new(StorageType::Global, 2, get_operation);
390
391 assert!(iter_record.is_iter_related());
392 assert!(!get_record.is_iter_related());
393 }
394}