risingwave_hummock_trace/
collector.rs1use std::collections::HashSet;
16use std::env;
17use std::fs::{OpenOptions, create_dir_all};
18use std::io::BufWriter;
19use std::ops::Bound;
20use std::path::Path;
21use std::sync::LazyLock;
22use std::sync::atomic::AtomicU64;
23
24use bincode::{Decode, Encode};
25use bytes::Bytes;
26use parking_lot::Mutex;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
29use risingwave_pb::meta::SubscribeResponse;
30use tokio::runtime::Runtime;
31use tokio::sync::mpsc::{
32 UnboundedReceiver as Receiver, UnboundedSender as Sender, unbounded_channel as channel,
33};
34use tokio::task_local;
35
36use crate::write::{TraceWriter, TraceWriterImpl};
37use crate::{
38 ConcurrentIdGenerator, Operation, OperationResult, Record, RecordId, RecordIdGenerator,
39 TracedInitOptions, TracedNewLocalOptions, TracedReadOptions, TracedSealCurrentEpochOptions,
40 TracedSubResp, TracedTryWaitEpochOptions, UniqueIdGenerator,
41};
42
43static GLOBAL_COLLECTOR: LazyLock<GlobalCollector> = LazyLock::new(GlobalCollector::new);
45
46static GLOBAL_RECORD_ID: LazyLock<RecordIdGenerator> =
48 LazyLock::new(|| UniqueIdGenerator::new(AtomicU64::new(0)));
49
50static SHOULD_USE_TRACE: LazyLock<bool> = LazyLock::new(set_should_use_trace);
52
53pub static CONCURRENT_ID: LazyLock<ConcurrentIdGenerator> =
55 LazyLock::new(|| UniqueIdGenerator::new(AtomicU64::new(0)));
56
57static TRACE_RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
59
60pub const USE_TRACE: &str = "USE_HM_TRACE"; const LOG_PATH: &str = "HM_TRACE_PATH"; const DEFAULT_PATH: &str = ".trace/hummock.ht"; const WRITER_BUFFER_SIZE: usize = 1024; pub fn should_use_trace() -> bool {
67 *SHOULD_USE_TRACE
68}
69
70fn set_should_use_trace() -> bool {
72 match std::env::var(USE_TRACE) {
73 Ok(v) => v.parse().unwrap_or(false),
74 Err(_) => false,
75 }
76}
77
78pub fn init_collector() {
80 TRACE_RT.spawn(async move {
81 let path = env::var(LOG_PATH).unwrap_or_else(|_| DEFAULT_PATH.to_owned());
82 let path = Path::new(&path);
83 tracing::info!("Hummock Tracing log path {}", path.to_string_lossy());
84
85 if let Some(parent) = path.parent() {
86 if !parent.exists() {
87 create_dir_all(parent).unwrap();
88 }
89 }
90 let f = OpenOptions::new()
91 .write(true)
92 .truncate(true)
93 .create(true)
94 .open(path)
95 .expect("failed to open log file");
96 let writer = BufWriter::with_capacity(WRITER_BUFFER_SIZE, f);
97 let writer = TraceWriterImpl::try_new_bincode(writer).unwrap();
98 GlobalCollector::run(writer);
99 });
100}
101
102struct GlobalCollector {
105 tx: Sender<RecordMsg>,
106 rx: Mutex<Option<Receiver<RecordMsg>>>,
107}
108
109impl GlobalCollector {
110 fn new() -> Self {
111 let (tx, rx) = channel();
112 Self {
113 tx,
114 rx: Mutex::new(Some(rx)),
115 }
116 }
117
118 fn run(mut writer: impl TraceWriter + Send + 'static) -> tokio::task::JoinHandle<()> {
119 tokio::task::spawn_blocking(move || {
120 let mut rx = GLOBAL_COLLECTOR.rx.lock().take().unwrap();
121 while let Some(Some(r)) = rx.blocking_recv() {
122 writer.write(r).expect("failed to write hummock trace");
123 }
124 writer.flush().expect("failed to flush hummock trace");
125 })
126 }
127
128 fn finish(&self) {
129 self.tx.send(None).expect("failed to finish worker");
130 }
131
132 fn tx(&self) -> Sender<RecordMsg> {
133 self.tx.clone()
134 }
135}
136
137impl Drop for GlobalCollector {
138 fn drop(&mut self) {
139 if !self.tx.is_closed() {
141 self.finish();
142 }
143 }
144}
145
146#[must_use = "TraceSpan Lifetime is important"]
150#[derive(Clone)]
151pub struct TraceSpan {
152 tx: Sender<RecordMsg>,
153 id: RecordId,
154 storage_type: StorageType,
155}
156
157#[must_use = "TraceSpan Lifetime is important"]
158#[derive(Clone)]
159pub struct MayTraceSpan(Option<TraceSpan>);
160
161impl From<Option<TraceSpan>> for MayTraceSpan {
162 fn from(value: Option<TraceSpan>) -> Self {
163 Self(value)
164 }
165}
166
167impl MayTraceSpan {
168 pub fn may_send_result(&self, res: OperationResult) {
169 if let Some(span) = &self.0 {
170 span.send_result(res)
171 }
172 }
173
174 pub fn may_send_op(&self, op: Operation) {
175 if let Some(span) = &self.0 {
176 span.send(op)
177 }
178 }
179
180 pub fn may_send_iter_next(&self) {
181 if let Some(span) = &self.0 {
182 span.send(Operation::IterNext(span.id))
183 }
184 }
185}
186
187impl TraceSpan {
188 pub fn new(tx: Sender<RecordMsg>, id: RecordId, storage_type: StorageType) -> Self {
189 Self {
190 tx,
191 id,
192 storage_type,
193 }
194 }
195
196 pub fn new_global_op(op: Operation, storage_type: StorageType) -> MayTraceSpan {
197 match should_use_trace() {
198 true => Some(Self::new_to_global(op, storage_type)).into(),
199 false => None.into(),
200 }
201 }
202
203 pub fn new_seal_current_epoch_span(
204 epoch: u64,
205 opts: TracedSealCurrentEpochOptions,
206 storage_type: StorageType,
207 ) -> MayTraceSpan {
208 Self::new_global_op(Operation::SealCurrentEpoch { epoch, opts }, storage_type)
209 }
210
211 pub fn new_try_wait_epoch_span(
212 epoch: HummockReadEpoch,
213 options: TracedTryWaitEpochOptions,
214 ) -> MayTraceSpan {
215 Self::new_global_op(
216 Operation::TryWaitEpoch(epoch.into(), options),
217 StorageType::Global,
218 )
219 }
220
221 pub fn new_get_span(
222 key: Bytes,
223 epoch: Option<u64>,
224 read_options: TracedReadOptions,
225 storage_type: StorageType,
226 ) -> MayTraceSpan {
227 Self::new_global_op(Operation::get(key, epoch, read_options), storage_type)
228 }
229
230 pub fn new_iter_span(
231 key_range: (Bound<Bytes>, Bound<Bytes>),
232 epoch: Option<u64>,
233 read_options: TracedReadOptions,
234 storage_type: StorageType,
235 ) -> MayTraceSpan {
236 Self::new_global_op(
237 Operation::Iter {
238 key_range: (
239 key_range.0.as_ref().map(|v| v.clone().into()),
240 key_range.1.as_ref().map(|v| v.clone().into()),
241 ),
242 epoch,
243 read_options,
244 },
245 storage_type,
246 )
247 }
248
249 pub fn new_insert_span(
250 key: Bytes,
251 new_val: Bytes,
252 old_val: Option<Bytes>,
253 storage_type: StorageType,
254 ) -> MayTraceSpan {
255 Self::new_global_op(
256 Operation::Insert {
257 key: key.into(),
258 new_val: new_val.into(),
259 old_val: old_val.map(|b| b.into()),
260 },
261 storage_type,
262 )
263 }
264
265 pub fn new_delete_span(key: Bytes, old_val: Bytes, storage_type: StorageType) -> MayTraceSpan {
266 Self::new_global_op(
267 Operation::Delete {
268 key: key.into(),
269 old_val: old_val.into(),
270 },
271 storage_type,
272 )
273 }
274
275 pub fn new_sync_span(
276 sync_table_epochs: &Vec<(HummockEpoch, HashSet<TableId>)>,
277 storage_type: StorageType,
278 ) -> MayTraceSpan {
279 Self::new_global_op(
280 Operation::Sync(
281 sync_table_epochs
282 .iter()
283 .map(|(epoch, table_ids)| {
284 (
285 *epoch,
286 table_ids.iter().map(|table_id| table_id.table_id).collect(),
287 )
288 })
289 .collect(),
290 ),
291 storage_type,
292 )
293 }
294
295 pub fn new_local_storage_span(
296 option: TracedNewLocalOptions,
297 storage_type: StorageType,
298 local_storage_id: u64,
299 ) -> MayTraceSpan {
300 Self::new_global_op(
301 Operation::NewLocalStorage(option, local_storage_id),
302 storage_type,
303 )
304 }
305
306 pub fn new_drop_storage_span(storage_type: StorageType) -> MayTraceSpan {
307 Self::new_global_op(Operation::DropLocalStorage, storage_type)
308 }
309
310 pub fn new_flush_span(storage_type: StorageType) -> MayTraceSpan {
311 Self::new_global_op(Operation::Flush, storage_type)
312 }
313
314 pub fn new_try_flush_span(storage_type: StorageType) -> MayTraceSpan {
315 Self::new_global_op(Operation::TryFlush, storage_type)
316 }
317
318 pub fn new_meta_message_span(resp: SubscribeResponse) -> MayTraceSpan {
319 Self::new_global_op(
320 Operation::MetaMessage(Box::new(TracedSubResp::from(resp))),
321 StorageType::Global,
322 )
323 }
324
325 pub fn new_local_storage_init_span(
326 options: TracedInitOptions,
327 storage_type: StorageType,
328 ) -> MayTraceSpan {
329 Self::new_global_op(Operation::LocalStorageInit(options), storage_type)
330 }
331
332 pub fn send(&self, op: Operation) {
333 self.tx
334 .send(Some(Record::new(*self.storage_type(), self.id(), op)))
335 .expect("failed to log record");
336 }
337
338 pub fn send_result(&self, res: OperationResult) {
339 self.send(Operation::Result(res));
340 }
341
342 pub fn finish(&self) {
343 self.send(Operation::Finish);
344 }
345
346 pub fn id(&self) -> RecordId {
347 self.id
348 }
349
350 fn storage_type(&self) -> &StorageType {
351 &self.storage_type
352 }
353
354 pub fn new_to_global(op: Operation, storage_type: StorageType) -> Self {
356 let span = TraceSpan::new(GLOBAL_COLLECTOR.tx(), GLOBAL_RECORD_ID.next(), storage_type);
357 span.send(op);
358 span
359 }
360
361 #[cfg(test)]
362 pub fn new_with_op(
363 tx: Sender<RecordMsg>,
364 id: RecordId,
365 op: Operation,
366 storage_type: StorageType,
367 ) -> Self {
368 let span = TraceSpan::new(tx, id, storage_type);
369 span.send(op);
370 span
371 }
372}
373
374impl Drop for TraceSpan {
375 fn drop(&mut self) {
376 self.finish();
377 }
378}
379
380pub type RecordMsg = Option<Record>;
381pub type ConcurrentId = u64;
382pub type LocalStorageId = u64;
383
384#[derive(Copy, Clone, Debug, Encode, Decode, PartialEq, Hash, Eq)]
385pub enum StorageType {
386 Global,
387 Local(ConcurrentId, LocalStorageId),
388}
389
390task_local! {
391 #[allow(clippy::declare_interior_mutable_const)]
394 pub static LOCAL_ID: ConcurrentId;
395}
396
397#[cfg(test)]
398mod tests {
399 use std::sync::Arc;
400
401 use super::*;
402 use crate::MockTraceWriter;
403
404 #[tokio::test(flavor = "multi_thread")]
405 async fn test_new_spans_concurrent() {
406 let count = 200;
407
408 let collector = Arc::new(GlobalCollector::new());
409 let generator = Arc::new(UniqueIdGenerator::new(AtomicU64::new(0)));
410 let mut handles = Vec::with_capacity(count);
411
412 for i in 0..count {
413 let collector = collector.clone();
414 let generator = generator.clone();
415 let handle = tokio::spawn(async move {
416 let op = Operation::get(
417 Bytes::from(vec![i as u8]),
418 Some(123),
419 TracedReadOptions::for_test(0),
420 );
421 let _span = TraceSpan::new_with_op(
422 collector.tx(),
423 generator.next(),
424 op,
425 StorageType::Global,
426 );
427 });
428 handles.push(handle);
429 }
430
431 for handle in handles {
432 handle.await.unwrap();
433 }
434
435 let mut rx = collector.rx.lock().take().unwrap();
436 let mut rx_count = 0;
437 rx.close();
438 while rx.recv().await.is_some() {
439 rx_count += 1;
440 }
441 assert_eq!(count * 2, rx_count);
442 }
443
444 #[tokio::test(flavor = "multi_thread")]
445 async fn test_collector_run() {
446 let count = 5000;
447 let generator = Arc::new(UniqueIdGenerator::new(AtomicU64::new(0)));
448
449 let op = Operation::get(
450 Bytes::from(vec![74, 56, 43, 67]),
451 Some(256),
452 TracedReadOptions::for_test(0),
453 );
454 let mut mock_writer = MockTraceWriter::new();
455
456 mock_writer
457 .expect_write()
458 .times(count * 2)
459 .returning(|_| Ok(0));
460 mock_writer.expect_flush().times(1).returning(|| Ok(()));
461
462 let runner_handle = GlobalCollector::run(mock_writer);
463
464 let mut handles = Vec::with_capacity(count);
465
466 for _ in 0..count {
467 let op = op.clone();
468 let tx = GLOBAL_COLLECTOR.tx();
469 let generator = generator.clone();
470 let handle = tokio::spawn(async move {
471 let _span =
472 TraceSpan::new_with_op(tx, generator.next(), op, StorageType::Local(0, 0));
473 });
474 handles.push(handle);
475 }
476
477 for handle in handles {
478 handle.await.unwrap();
479 }
480
481 GLOBAL_COLLECTOR.finish();
482
483 runner_handle.await.unwrap();
484 }
485
486 #[ignore]
487 #[test]
488 fn test_set_use_trace() {
489 unsafe { std::env::remove_var(USE_TRACE) };
490 assert!(!set_should_use_trace());
491
492 unsafe { std::env::set_var(USE_TRACE, "true") };
493 assert!(set_should_use_trace());
494
495 unsafe { std::env::set_var(USE_TRACE, "false") };
496 assert!(!set_should_use_trace());
497
498 unsafe { std::env::set_var(USE_TRACE, "invalid") };
499 assert!(!set_should_use_trace());
500 }
501
502 #[ignore]
503 #[test]
504 fn test_should_use_trace() {
505 unsafe { std::env::set_var(USE_TRACE, "true") };
506 assert!(should_use_trace());
507 assert!(set_should_use_trace());
508 }
509}