risingwave_hummock_trace/
collector.rs1use std::collections::HashSet;
16use std::env;
17use std::fs::{OpenOptions, create_dir_all};
18use std::io::BufWriter;
19use std::ops::Bound;
20use std::path::Path;
21use std::sync::LazyLock;
22use std::sync::atomic::AtomicU64;
23
24use bincode::{Decode, Encode};
25use bytes::Bytes;
26use parking_lot::Mutex;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
29use risingwave_pb::meta::SubscribeResponse;
30use tokio::runtime::Runtime;
31use tokio::sync::mpsc::{
32 UnboundedReceiver as Receiver, UnboundedSender as Sender, unbounded_channel as channel,
33};
34use tokio::task_local;
35
36use crate::write::{TraceWriter, TraceWriterImpl};
37use crate::{
38 ConcurrentIdGenerator, Operation, OperationResult, Record, RecordId, RecordIdGenerator,
39 TracedInitOptions, TracedNewLocalOptions, TracedReadOptions, TracedSealCurrentEpochOptions,
40 TracedSubResp, TracedTryWaitEpochOptions, UniqueIdGenerator,
41};
42
43static GLOBAL_COLLECTOR: LazyLock<GlobalCollector> = LazyLock::new(GlobalCollector::new);
45
46static GLOBAL_RECORD_ID: LazyLock<RecordIdGenerator> =
48 LazyLock::new(|| UniqueIdGenerator::new(AtomicU64::new(0)));
49
50static SHOULD_USE_TRACE: LazyLock<bool> = LazyLock::new(set_should_use_trace);
52
53pub static CONCURRENT_ID: LazyLock<ConcurrentIdGenerator> =
55 LazyLock::new(|| UniqueIdGenerator::new(AtomicU64::new(0)));
56
57static TRACE_RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
59
60pub const USE_TRACE: &str = "USE_HM_TRACE"; const LOG_PATH: &str = "HM_TRACE_PATH"; const DEFAULT_PATH: &str = ".trace/hummock.ht"; const WRITER_BUFFER_SIZE: usize = 1024; pub fn should_use_trace() -> bool {
67 *SHOULD_USE_TRACE
68}
69
70fn set_should_use_trace() -> bool {
72 match std::env::var(USE_TRACE) {
73 Ok(v) => v.parse().unwrap_or(false),
74 Err(_) => false,
75 }
76}
77
78pub fn init_collector() {
80 TRACE_RT.spawn(async move {
81 let path = env::var(LOG_PATH).unwrap_or_else(|_| DEFAULT_PATH.to_owned());
82 let path = Path::new(&path);
83 tracing::info!("Hummock Tracing log path {}", path.to_string_lossy());
84
85 if let Some(parent) = path.parent()
86 && !parent.exists()
87 {
88 create_dir_all(parent).unwrap();
89 }
90 let f = OpenOptions::new()
91 .write(true)
92 .truncate(true)
93 .create(true)
94 .open(path)
95 .expect("failed to open log file");
96 let writer = BufWriter::with_capacity(WRITER_BUFFER_SIZE, f);
97 let writer = TraceWriterImpl::try_new_bincode(writer).unwrap();
98 GlobalCollector::run(writer);
99 });
100}
101
102struct GlobalCollector {
105 tx: Sender<RecordMsg>,
106 rx: Mutex<Option<Receiver<RecordMsg>>>,
107}
108
109impl GlobalCollector {
110 fn new() -> Self {
111 let (tx, rx) = channel();
112 Self {
113 tx,
114 rx: Mutex::new(Some(rx)),
115 }
116 }
117
118 fn run(mut writer: impl TraceWriter + Send + 'static) -> tokio::task::JoinHandle<()> {
119 tokio::task::spawn_blocking(move || {
120 let mut rx = GLOBAL_COLLECTOR.rx.lock().take().unwrap();
121 while let Some(Some(r)) = rx.blocking_recv() {
122 writer.write(r).expect("failed to write hummock trace");
123 }
124 writer.flush().expect("failed to flush hummock trace");
125 })
126 }
127
128 fn finish(&self) {
129 self.tx.send(None).expect("failed to finish worker");
130 }
131
132 fn tx(&self) -> Sender<RecordMsg> {
133 self.tx.clone()
134 }
135}
136
137impl Drop for GlobalCollector {
138 fn drop(&mut self) {
139 if !self.tx.is_closed() {
141 self.finish();
142 }
143 }
144}
145
146#[must_use = "TraceSpan Lifetime is important"]
150#[derive(Clone)]
151pub struct TraceSpan {
152 tx: Sender<RecordMsg>,
153 id: RecordId,
154 storage_type: StorageType,
155}
156
157#[must_use = "TraceSpan Lifetime is important"]
158#[derive(Clone)]
159pub struct MayTraceSpan(Option<TraceSpan>);
160
161impl From<Option<TraceSpan>> for MayTraceSpan {
162 fn from(value: Option<TraceSpan>) -> Self {
163 Self(value)
164 }
165}
166
167impl MayTraceSpan {
168 pub fn may_send_result(&self, res: OperationResult) {
169 if let Some(span) = &self.0 {
170 span.send_result(res)
171 }
172 }
173
174 pub fn may_send_op(&self, op: Operation) {
175 if let Some(span) = &self.0 {
176 span.send(op)
177 }
178 }
179
180 pub fn may_send_iter_next(&self) {
181 if let Some(span) = &self.0 {
182 span.send(Operation::IterNext(span.id))
183 }
184 }
185}
186
187impl TraceSpan {
188 pub fn new(tx: Sender<RecordMsg>, id: RecordId, storage_type: StorageType) -> Self {
189 Self {
190 tx,
191 id,
192 storage_type,
193 }
194 }
195
196 pub fn new_global_op(op: Operation, storage_type: StorageType) -> MayTraceSpan {
197 match should_use_trace() {
198 true => Some(Self::new_to_global(op, storage_type)).into(),
199 false => None.into(),
200 }
201 }
202
203 pub fn new_seal_current_epoch_span(
204 epoch: u64,
205 opts: TracedSealCurrentEpochOptions,
206 storage_type: StorageType,
207 ) -> MayTraceSpan {
208 Self::new_global_op(Operation::SealCurrentEpoch { epoch, opts }, storage_type)
209 }
210
211 pub fn new_try_wait_epoch_span(
212 epoch: HummockReadEpoch,
213 options: TracedTryWaitEpochOptions,
214 ) -> MayTraceSpan {
215 Self::new_global_op(
216 Operation::TryWaitEpoch(epoch.into(), options),
217 StorageType::Global,
218 )
219 }
220
221 pub fn new_get_span(
222 key: Bytes,
223 epoch: Option<u64>,
224 read_options: TracedReadOptions,
225 storage_type: StorageType,
226 ) -> MayTraceSpan {
227 Self::new_global_op(Operation::get(key, epoch, read_options), storage_type)
228 }
229
230 pub fn new_iter_span(
231 key_range: (Bound<Bytes>, Bound<Bytes>),
232 epoch: Option<u64>,
233 read_options: TracedReadOptions,
234 storage_type: StorageType,
235 ) -> MayTraceSpan {
236 Self::new_global_op(
237 Operation::Iter {
238 key_range: (
239 key_range.0.as_ref().map(|v| v.clone().into()),
240 key_range.1.as_ref().map(|v| v.clone().into()),
241 ),
242 epoch,
243 read_options,
244 },
245 storage_type,
246 )
247 }
248
249 pub fn new_insert_span(
250 key: Bytes,
251 new_val: Bytes,
252 old_val: Option<Bytes>,
253 storage_type: StorageType,
254 ) -> MayTraceSpan {
255 Self::new_global_op(
256 Operation::Insert {
257 key: key.into(),
258 new_val: new_val.into(),
259 old_val: old_val.map(|b| b.into()),
260 },
261 storage_type,
262 )
263 }
264
265 pub fn new_delete_span(key: Bytes, old_val: Bytes, storage_type: StorageType) -> MayTraceSpan {
266 Self::new_global_op(
267 Operation::Delete {
268 key: key.into(),
269 old_val: old_val.into(),
270 },
271 storage_type,
272 )
273 }
274
275 pub fn new_sync_span(
276 sync_table_epochs: &Vec<(HummockEpoch, HashSet<TableId>)>,
277 storage_type: StorageType,
278 ) -> MayTraceSpan {
279 Self::new_global_op(
280 Operation::Sync(
281 sync_table_epochs
282 .iter()
283 .map(|(epoch, table_ids)| {
284 (
285 *epoch,
286 table_ids
287 .iter()
288 .map(|table_id| table_id.as_raw_id())
289 .collect(),
290 )
291 })
292 .collect(),
293 ),
294 storage_type,
295 )
296 }
297
298 pub fn new_local_storage_span(
299 option: TracedNewLocalOptions,
300 storage_type: StorageType,
301 local_storage_id: u64,
302 ) -> MayTraceSpan {
303 Self::new_global_op(
304 Operation::NewLocalStorage(option, local_storage_id),
305 storage_type,
306 )
307 }
308
309 pub fn new_drop_storage_span(storage_type: StorageType) -> MayTraceSpan {
310 Self::new_global_op(Operation::DropLocalStorage, storage_type)
311 }
312
313 pub fn new_flush_span(storage_type: StorageType) -> MayTraceSpan {
314 Self::new_global_op(Operation::Flush, storage_type)
315 }
316
317 pub fn new_try_flush_span(storage_type: StorageType) -> MayTraceSpan {
318 Self::new_global_op(Operation::TryFlush, storage_type)
319 }
320
321 pub fn new_meta_message_span(resp: SubscribeResponse) -> MayTraceSpan {
322 Self::new_global_op(
323 Operation::MetaMessage(Box::new(TracedSubResp::from(resp))),
324 StorageType::Global,
325 )
326 }
327
328 pub fn new_local_storage_init_span(
329 options: TracedInitOptions,
330 storage_type: StorageType,
331 ) -> MayTraceSpan {
332 Self::new_global_op(Operation::LocalStorageInit(options), storage_type)
333 }
334
335 pub fn send(&self, op: Operation) {
336 self.tx
337 .send(Some(Record::new(*self.storage_type(), self.id(), op)))
338 .expect("failed to log record");
339 }
340
341 pub fn send_result(&self, res: OperationResult) {
342 self.send(Operation::Result(res));
343 }
344
345 pub fn finish(&self) {
346 self.send(Operation::Finish);
347 }
348
349 pub fn id(&self) -> RecordId {
350 self.id
351 }
352
353 fn storage_type(&self) -> &StorageType {
354 &self.storage_type
355 }
356
357 pub fn new_to_global(op: Operation, storage_type: StorageType) -> Self {
359 let span = TraceSpan::new(GLOBAL_COLLECTOR.tx(), GLOBAL_RECORD_ID.next(), storage_type);
360 span.send(op);
361 span
362 }
363
364 #[cfg(test)]
365 pub fn new_with_op(
366 tx: Sender<RecordMsg>,
367 id: RecordId,
368 op: Operation,
369 storage_type: StorageType,
370 ) -> Self {
371 let span = TraceSpan::new(tx, id, storage_type);
372 span.send(op);
373 span
374 }
375}
376
377impl Drop for TraceSpan {
378 fn drop(&mut self) {
379 self.finish();
380 }
381}
382
383pub type RecordMsg = Option<Record>;
384pub type ConcurrentId = u64;
385pub type LocalStorageId = u64;
386
387#[derive(Copy, Clone, Debug, Encode, Decode, PartialEq, Hash, Eq)]
388pub enum StorageType {
389 Global,
390 Local(ConcurrentId, LocalStorageId),
391}
392
393task_local! {
394 #[allow(clippy::declare_interior_mutable_const)]
397 pub static LOCAL_ID: ConcurrentId;
398}
399
400#[cfg(test)]
401mod tests {
402 use std::sync::Arc;
403
404 use super::*;
405 use crate::MockTraceWriter;
406
407 #[tokio::test(flavor = "multi_thread")]
408 async fn test_new_spans_concurrent() {
409 let count = 200;
410
411 let collector = Arc::new(GlobalCollector::new());
412 let generator = Arc::new(UniqueIdGenerator::new(AtomicU64::new(0)));
413 let mut handles = Vec::with_capacity(count);
414
415 for i in 0..count {
416 let collector = collector.clone();
417 let generator = generator.clone();
418 let handle = tokio::spawn(async move {
419 let op = Operation::get(
420 Bytes::from(vec![i as u8]),
421 Some(123),
422 TracedReadOptions::for_test(0),
423 );
424 let _span = TraceSpan::new_with_op(
425 collector.tx(),
426 generator.next(),
427 op,
428 StorageType::Global,
429 );
430 });
431 handles.push(handle);
432 }
433
434 for handle in handles {
435 handle.await.unwrap();
436 }
437
438 let mut rx = collector.rx.lock().take().unwrap();
439 let mut rx_count = 0;
440 rx.close();
441 while rx.recv().await.is_some() {
442 rx_count += 1;
443 }
444 assert_eq!(count * 2, rx_count);
445 }
446
447 #[tokio::test(flavor = "multi_thread")]
448 async fn test_collector_run() {
449 let count = 5000;
450 let generator = Arc::new(UniqueIdGenerator::new(AtomicU64::new(0)));
451
452 let op = Operation::get(
453 Bytes::from(vec![74, 56, 43, 67]),
454 Some(256),
455 TracedReadOptions::for_test(0),
456 );
457 let mut mock_writer = MockTraceWriter::new();
458
459 mock_writer
460 .expect_write()
461 .times(count * 2)
462 .returning(|_| Ok(0));
463 mock_writer.expect_flush().times(1).returning(|| Ok(()));
464
465 let runner_handle = GlobalCollector::run(mock_writer);
466
467 let mut handles = Vec::with_capacity(count);
468
469 for _ in 0..count {
470 let op = op.clone();
471 let tx = GLOBAL_COLLECTOR.tx();
472 let generator = generator.clone();
473 let handle = tokio::spawn(async move {
474 let _span =
475 TraceSpan::new_with_op(tx, generator.next(), op, StorageType::Local(0, 0));
476 });
477 handles.push(handle);
478 }
479
480 for handle in handles {
481 handle.await.unwrap();
482 }
483
484 GLOBAL_COLLECTOR.finish();
485
486 runner_handle.await.unwrap();
487 }
488
489 #[ignore]
490 #[test]
491 fn test_set_use_trace() {
492 unsafe { std::env::remove_var(USE_TRACE) };
493 assert!(!set_should_use_trace());
494
495 unsafe { std::env::set_var(USE_TRACE, "true") };
496 assert!(set_should_use_trace());
497
498 unsafe { std::env::set_var(USE_TRACE, "false") };
499 assert!(!set_should_use_trace());
500
501 unsafe { std::env::set_var(USE_TRACE, "invalid") };
502 assert!(!set_should_use_trace());
503 }
504
505 #[ignore]
506 #[test]
507 fn test_should_use_trace() {
508 unsafe { std::env::set_var(USE_TRACE, "true") };
509 assert!(should_use_trace());
510 assert!(set_should_use_trace());
511 }
512}