risingwave_hummock_trace/
collector.rs1use std::collections::HashSet;
16use std::env;
17use std::fs::{OpenOptions, create_dir_all};
18use std::io::BufWriter;
19use std::ops::Bound;
20use std::path::Path;
21use std::sync::LazyLock;
22use std::sync::atomic::AtomicU64;
23
24use bincode::{Decode, Encode};
25use bytes::Bytes;
26use parking_lot::Mutex;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
29use risingwave_pb::meta::SubscribeResponse;
30use tokio::runtime::Runtime;
31use tokio::sync::mpsc::{
32 UnboundedReceiver as Receiver, UnboundedSender as Sender, unbounded_channel as channel,
33};
34use tokio::task_local;
35
36use crate::write::{TraceWriter, TraceWriterImpl};
37use crate::{
38 ConcurrentIdGenerator, Operation, OperationResult, Record, RecordId, RecordIdGenerator,
39 TracedInitOptions, TracedNewLocalOptions, TracedReadOptions, TracedSealCurrentEpochOptions,
40 TracedSubResp, TracedTryWaitEpochOptions, UniqueIdGenerator,
41};
42
43static GLOBAL_COLLECTOR: LazyLock<GlobalCollector> = LazyLock::new(GlobalCollector::new);
45
46static GLOBAL_RECORD_ID: LazyLock<RecordIdGenerator> =
48 LazyLock::new(|| UniqueIdGenerator::new(AtomicU64::new(0)));
49
50static SHOULD_USE_TRACE: LazyLock<bool> = LazyLock::new(set_should_use_trace);
52
53pub static CONCURRENT_ID: LazyLock<ConcurrentIdGenerator> =
55 LazyLock::new(|| UniqueIdGenerator::new(AtomicU64::new(0)));
56
57static TRACE_RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
59
60pub const USE_TRACE: &str = "USE_HM_TRACE"; const LOG_PATH: &str = "HM_TRACE_PATH"; const DEFAULT_PATH: &str = ".trace/hummock.ht"; const WRITER_BUFFER_SIZE: usize = 1024; pub fn should_use_trace() -> bool {
67 *SHOULD_USE_TRACE
68}
69
70fn set_should_use_trace() -> bool {
72 match std::env::var(USE_TRACE) {
73 Ok(v) => v.parse().unwrap_or(false),
74 Err(_) => false,
75 }
76}
77
78pub fn init_collector() {
80 TRACE_RT.spawn(async move {
81 let path = env::var(LOG_PATH).unwrap_or_else(|_| DEFAULT_PATH.to_owned());
82 let path = Path::new(&path);
83 tracing::info!("Hummock Tracing log path {}", path.to_string_lossy());
84
85 if let Some(parent) = path.parent() {
86 if !parent.exists() {
87 create_dir_all(parent).unwrap();
88 }
89 }
90 let f = OpenOptions::new()
91 .write(true)
92 .truncate(true)
93 .create(true)
94 .open(path)
95 .expect("failed to open log file");
96 let writer = BufWriter::with_capacity(WRITER_BUFFER_SIZE, f);
97 let writer = TraceWriterImpl::try_new_bincode(writer).unwrap();
98 GlobalCollector::run(writer);
99 });
100}
101
102struct GlobalCollector {
105 tx: Sender<RecordMsg>,
106 rx: Mutex<Option<Receiver<RecordMsg>>>,
107}
108
109impl GlobalCollector {
110 fn new() -> Self {
111 let (tx, rx) = channel();
112 Self {
113 tx,
114 rx: Mutex::new(Some(rx)),
115 }
116 }
117
118 fn run(mut writer: impl TraceWriter + Send + 'static) -> tokio::task::JoinHandle<()> {
119 tokio::task::spawn_blocking(move || {
120 let mut rx = GLOBAL_COLLECTOR.rx.lock().take().unwrap();
121 while let Some(Some(r)) = rx.blocking_recv() {
122 writer.write(r).expect("failed to write hummock trace");
123 }
124 writer.flush().expect("failed to flush hummock trace");
125 })
126 }
127
128 fn finish(&self) {
129 self.tx.send(None).expect("failed to finish worker");
130 }
131
132 fn tx(&self) -> Sender<RecordMsg> {
133 self.tx.clone()
134 }
135}
136
137impl Drop for GlobalCollector {
138 fn drop(&mut self) {
139 if !self.tx.is_closed() {
141 self.finish();
142 }
143 }
144}
145
146#[must_use = "TraceSpan Lifetime is important"]
150#[derive(Clone)]
151pub struct TraceSpan {
152 tx: Sender<RecordMsg>,
153 id: RecordId,
154 storage_type: StorageType,
155}
156
157#[must_use = "TraceSpan Lifetime is important"]
158#[derive(Clone)]
159pub struct MayTraceSpan(Option<TraceSpan>);
160
161impl From<Option<TraceSpan>> for MayTraceSpan {
162 fn from(value: Option<TraceSpan>) -> Self {
163 Self(value)
164 }
165}
166
167impl MayTraceSpan {
168 pub fn may_send_result(&self, res: OperationResult) {
169 if let Some(span) = &self.0 {
170 span.send_result(res)
171 }
172 }
173
174 pub fn may_send_op(&self, op: Operation) {
175 if let Some(span) = &self.0 {
176 span.send(op)
177 }
178 }
179
180 pub fn may_send_iter_next(&self) {
181 if let Some(span) = &self.0 {
182 span.send(Operation::IterNext(span.id))
183 }
184 }
185}
186
187impl TraceSpan {
188 pub fn new(tx: Sender<RecordMsg>, id: RecordId, storage_type: StorageType) -> Self {
189 Self {
190 tx,
191 id,
192 storage_type,
193 }
194 }
195
196 pub fn new_global_op(op: Operation, storage_type: StorageType) -> MayTraceSpan {
197 match should_use_trace() {
198 true => Some(Self::new_to_global(op, storage_type)).into(),
199 false => None.into(),
200 }
201 }
202
203 pub fn new_epoch_span(storage_type: StorageType) -> MayTraceSpan {
204 Self::new_global_op(Operation::LocalStorageEpoch, storage_type)
205 }
206
207 pub fn new_is_dirty_span(storage_type: StorageType) -> MayTraceSpan {
208 Self::new_global_op(Operation::LocalStorageIsDirty, storage_type)
209 }
210
211 pub fn new_seal_current_epoch_span(
212 epoch: u64,
213 opts: TracedSealCurrentEpochOptions,
214 storage_type: StorageType,
215 ) -> MayTraceSpan {
216 Self::new_global_op(Operation::SealCurrentEpoch { epoch, opts }, storage_type)
217 }
218
219 pub fn new_try_wait_epoch_span(
220 epoch: HummockReadEpoch,
221 options: TracedTryWaitEpochOptions,
222 ) -> MayTraceSpan {
223 Self::new_global_op(
224 Operation::TryWaitEpoch(epoch.into(), options),
225 StorageType::Global,
226 )
227 }
228
229 pub fn new_get_span(
230 key: Bytes,
231 epoch: Option<u64>,
232 read_options: TracedReadOptions,
233 storage_type: StorageType,
234 ) -> MayTraceSpan {
235 Self::new_global_op(Operation::get(key, epoch, read_options), storage_type)
236 }
237
238 pub fn new_iter_span(
239 key_range: (Bound<Bytes>, Bound<Bytes>),
240 epoch: Option<u64>,
241 read_options: TracedReadOptions,
242 storage_type: StorageType,
243 ) -> MayTraceSpan {
244 Self::new_global_op(
245 Operation::Iter {
246 key_range: (
247 key_range.0.as_ref().map(|v| v.clone().into()),
248 key_range.1.as_ref().map(|v| v.clone().into()),
249 ),
250 epoch,
251 read_options,
252 },
253 storage_type,
254 )
255 }
256
257 pub fn new_insert_span(
258 key: Bytes,
259 new_val: Bytes,
260 old_val: Option<Bytes>,
261 storage_type: StorageType,
262 ) -> MayTraceSpan {
263 Self::new_global_op(
264 Operation::Insert {
265 key: key.into(),
266 new_val: new_val.into(),
267 old_val: old_val.map(|b| b.into()),
268 },
269 storage_type,
270 )
271 }
272
273 pub fn new_delete_span(key: Bytes, old_val: Bytes, storage_type: StorageType) -> MayTraceSpan {
274 Self::new_global_op(
275 Operation::Delete {
276 key: key.into(),
277 old_val: old_val.into(),
278 },
279 storage_type,
280 )
281 }
282
283 pub fn new_sync_span(
284 sync_table_epochs: &Vec<(HummockEpoch, HashSet<TableId>)>,
285 storage_type: StorageType,
286 ) -> MayTraceSpan {
287 Self::new_global_op(
288 Operation::Sync(
289 sync_table_epochs
290 .iter()
291 .map(|(epoch, table_ids)| {
292 (
293 *epoch,
294 table_ids.iter().map(|table_id| table_id.table_id).collect(),
295 )
296 })
297 .collect(),
298 ),
299 storage_type,
300 )
301 }
302
303 pub fn new_local_storage_span(
304 option: TracedNewLocalOptions,
305 storage_type: StorageType,
306 local_storage_id: u64,
307 ) -> MayTraceSpan {
308 Self::new_global_op(
309 Operation::NewLocalStorage(option, local_storage_id),
310 storage_type,
311 )
312 }
313
314 pub fn new_drop_storage_span(storage_type: StorageType) -> MayTraceSpan {
315 Self::new_global_op(Operation::DropLocalStorage, storage_type)
316 }
317
318 pub fn new_flush_span(storage_type: StorageType) -> MayTraceSpan {
319 Self::new_global_op(Operation::Flush, storage_type)
320 }
321
322 pub fn new_try_flush_span(storage_type: StorageType) -> MayTraceSpan {
323 Self::new_global_op(Operation::TryFlush, storage_type)
324 }
325
326 pub fn new_meta_message_span(resp: SubscribeResponse) -> MayTraceSpan {
327 Self::new_global_op(
328 Operation::MetaMessage(Box::new(TracedSubResp::from(resp))),
329 StorageType::Global,
330 )
331 }
332
333 pub fn new_local_storage_init_span(
334 options: TracedInitOptions,
335 storage_type: StorageType,
336 ) -> MayTraceSpan {
337 Self::new_global_op(Operation::LocalStorageInit(options), storage_type)
338 }
339
340 pub fn send(&self, op: Operation) {
341 self.tx
342 .send(Some(Record::new(*self.storage_type(), self.id(), op)))
343 .expect("failed to log record");
344 }
345
346 pub fn send_result(&self, res: OperationResult) {
347 self.send(Operation::Result(res));
348 }
349
350 pub fn finish(&self) {
351 self.send(Operation::Finish);
352 }
353
354 pub fn id(&self) -> RecordId {
355 self.id
356 }
357
358 fn storage_type(&self) -> &StorageType {
359 &self.storage_type
360 }
361
362 pub fn new_to_global(op: Operation, storage_type: StorageType) -> Self {
364 let span = TraceSpan::new(GLOBAL_COLLECTOR.tx(), GLOBAL_RECORD_ID.next(), storage_type);
365 span.send(op);
366 span
367 }
368
369 #[cfg(test)]
370 pub fn new_with_op(
371 tx: Sender<RecordMsg>,
372 id: RecordId,
373 op: Operation,
374 storage_type: StorageType,
375 ) -> Self {
376 let span = TraceSpan::new(tx, id, storage_type);
377 span.send(op);
378 span
379 }
380}
381
382impl Drop for TraceSpan {
383 fn drop(&mut self) {
384 self.finish();
385 }
386}
387
388pub type RecordMsg = Option<Record>;
389pub type ConcurrentId = u64;
390pub type LocalStorageId = u64;
391
392#[derive(Copy, Clone, Debug, Encode, Decode, PartialEq, Hash, Eq)]
393pub enum StorageType {
394 Global,
395 Local(ConcurrentId, LocalStorageId),
396}
397
398task_local! {
399 #[allow(clippy::declare_interior_mutable_const)]
402 pub static LOCAL_ID: ConcurrentId;
403}
404
405#[cfg(test)]
406mod tests {
407 use std::sync::Arc;
408
409 use super::*;
410 use crate::MockTraceWriter;
411
412 #[tokio::test(flavor = "multi_thread")]
413 async fn test_new_spans_concurrent() {
414 let count = 200;
415
416 let collector = Arc::new(GlobalCollector::new());
417 let generator = Arc::new(UniqueIdGenerator::new(AtomicU64::new(0)));
418 let mut handles = Vec::with_capacity(count);
419
420 for i in 0..count {
421 let collector = collector.clone();
422 let generator = generator.clone();
423 let handle = tokio::spawn(async move {
424 let op = Operation::get(
425 Bytes::from(vec![i as u8]),
426 Some(123),
427 TracedReadOptions::for_test(0),
428 );
429 let _span = TraceSpan::new_with_op(
430 collector.tx(),
431 generator.next(),
432 op,
433 StorageType::Global,
434 );
435 });
436 handles.push(handle);
437 }
438
439 for handle in handles {
440 handle.await.unwrap();
441 }
442
443 let mut rx = collector.rx.lock().take().unwrap();
444 let mut rx_count = 0;
445 rx.close();
446 while rx.recv().await.is_some() {
447 rx_count += 1;
448 }
449 assert_eq!(count * 2, rx_count);
450 }
451
452 #[tokio::test(flavor = "multi_thread")]
453 async fn test_collector_run() {
454 let count = 5000;
455 let generator = Arc::new(UniqueIdGenerator::new(AtomicU64::new(0)));
456
457 let op = Operation::get(
458 Bytes::from(vec![74, 56, 43, 67]),
459 Some(256),
460 TracedReadOptions::for_test(0),
461 );
462 let mut mock_writer = MockTraceWriter::new();
463
464 mock_writer
465 .expect_write()
466 .times(count * 2)
467 .returning(|_| Ok(0));
468 mock_writer.expect_flush().times(1).returning(|| Ok(()));
469
470 let runner_handle = GlobalCollector::run(mock_writer);
471
472 let mut handles = Vec::with_capacity(count);
473
474 for _ in 0..count {
475 let op = op.clone();
476 let tx = GLOBAL_COLLECTOR.tx();
477 let generator = generator.clone();
478 let handle = tokio::spawn(async move {
479 let _span =
480 TraceSpan::new_with_op(tx, generator.next(), op, StorageType::Local(0, 0));
481 });
482 handles.push(handle);
483 }
484
485 for handle in handles {
486 handle.await.unwrap();
487 }
488
489 GLOBAL_COLLECTOR.finish();
490
491 runner_handle.await.unwrap();
492 }
493
494 #[ignore]
495 #[test]
496 fn test_set_use_trace() {
497 unsafe { std::env::remove_var(USE_TRACE) };
498 assert!(!set_should_use_trace());
499
500 unsafe { std::env::set_var(USE_TRACE, "true") };
501 assert!(set_should_use_trace());
502
503 unsafe { std::env::set_var(USE_TRACE, "false") };
504 assert!(!set_should_use_trace());
505
506 unsafe { std::env::set_var(USE_TRACE, "invalid") };
507 assert!(!set_should_use_trace());
508 }
509
510 #[ignore]
511 #[test]
512 fn test_should_use_trace() {
513 unsafe { std::env::set_var(USE_TRACE, "true") };
514 assert!(should_use_trace());
515 assert!(set_should_use_trace());
516 }
517}