risingwave_hummock_trace/
collector.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::env;
17use std::fs::{OpenOptions, create_dir_all};
18use std::io::BufWriter;
19use std::ops::Bound;
20use std::path::Path;
21use std::sync::LazyLock;
22use std::sync::atomic::AtomicU64;
23
24use bincode::{Decode, Encode};
25use bytes::Bytes;
26use parking_lot::Mutex;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
29use risingwave_pb::meta::SubscribeResponse;
30use tokio::runtime::Runtime;
31use tokio::sync::mpsc::{
32    UnboundedReceiver as Receiver, UnboundedSender as Sender, unbounded_channel as channel,
33};
34use tokio::task_local;
35
36use crate::write::{TraceWriter, TraceWriterImpl};
37use crate::{
38    ConcurrentIdGenerator, Operation, OperationResult, Record, RecordId, RecordIdGenerator,
39    TracedInitOptions, TracedNewLocalOptions, TracedReadOptions, TracedSealCurrentEpochOptions,
40    TracedSubResp, TracedTryWaitEpochOptions, UniqueIdGenerator,
41};
42
43// Global collector instance used for trace collection
44static GLOBAL_COLLECTOR: LazyLock<GlobalCollector> = LazyLock::new(GlobalCollector::new);
45
46// Global record ID generator for generating unique record IDs
47static GLOBAL_RECORD_ID: LazyLock<RecordIdGenerator> =
48    LazyLock::new(|| UniqueIdGenerator::new(AtomicU64::new(0)));
49
50// Flag indicating whether trace should be used
51static SHOULD_USE_TRACE: LazyLock<bool> = LazyLock::new(set_should_use_trace);
52
53// Concurrent record ID generator for generating unique record IDs in concurrent environments
54pub static CONCURRENT_ID: LazyLock<ConcurrentIdGenerator> =
55    LazyLock::new(|| UniqueIdGenerator::new(AtomicU64::new(0)));
56
57// A tokio runtime for hummock tracing
58static TRACE_RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
59
60pub const USE_TRACE: &str = "USE_HM_TRACE"; // Environment variable name for enabling trace
61const LOG_PATH: &str = "HM_TRACE_PATH"; // Environment variable name for specifying trace log path
62const DEFAULT_PATH: &str = ".trace/hummock.ht"; // Default trace log path
63const WRITER_BUFFER_SIZE: usize = 1024; // Buffer size for trace writer
64
65/// Returns whether trace should be used based on the environment variable
66pub fn should_use_trace() -> bool {
67    *SHOULD_USE_TRACE
68}
69
70/// Sets the value of the `SHOULD_USE_TRACE` flag based on the `USE_TRACE` environment variable
71fn set_should_use_trace() -> bool {
72    match std::env::var(USE_TRACE) {
73        Ok(v) => v.parse().unwrap_or(false),
74        Err(_) => false,
75    }
76}
77
78/// Initialize the `GLOBAL_COLLECTOR` with configured log file
79pub fn init_collector() {
80    TRACE_RT.spawn(async move {
81        let path = env::var(LOG_PATH).unwrap_or_else(|_| DEFAULT_PATH.to_owned());
82        let path = Path::new(&path);
83        tracing::info!("Hummock Tracing log path {}", path.to_string_lossy());
84
85        if let Some(parent) = path.parent() {
86            if !parent.exists() {
87                create_dir_all(parent).unwrap();
88            }
89        }
90        let f = OpenOptions::new()
91            .write(true)
92            .truncate(true)
93            .create(true)
94            .open(path)
95            .expect("failed to open log file");
96        let writer = BufWriter::with_capacity(WRITER_BUFFER_SIZE, f);
97        let writer = TraceWriterImpl::try_new_bincode(writer).unwrap();
98        GlobalCollector::run(writer);
99    });
100}
101
102/// `GlobalCollector` collects traced hummock operations.
103/// It starts a collector thread and writer thread.
104struct GlobalCollector {
105    tx: Sender<RecordMsg>,
106    rx: Mutex<Option<Receiver<RecordMsg>>>,
107}
108
109impl GlobalCollector {
110    fn new() -> Self {
111        let (tx, rx) = channel();
112        Self {
113            tx,
114            rx: Mutex::new(Some(rx)),
115        }
116    }
117
118    fn run(mut writer: impl TraceWriter + Send + 'static) -> tokio::task::JoinHandle<()> {
119        tokio::task::spawn_blocking(move || {
120            let mut rx = GLOBAL_COLLECTOR.rx.lock().take().unwrap();
121            while let Some(Some(r)) = rx.blocking_recv() {
122                writer.write(r).expect("failed to write hummock trace");
123            }
124            writer.flush().expect("failed to flush hummock trace");
125        })
126    }
127
128    fn finish(&self) {
129        self.tx.send(None).expect("failed to finish worker");
130    }
131
132    fn tx(&self) -> Sender<RecordMsg> {
133        self.tx.clone()
134    }
135}
136
137impl Drop for GlobalCollector {
138    fn drop(&mut self) {
139        // only send when channel is not closed
140        if !self.tx.is_closed() {
141            self.finish();
142        }
143    }
144}
145
146/// `TraceSpan` traces hummock operations. It marks the beginning of an operation and
147/// the end when the span is dropped. So, please make sure the span live long enough.
148/// Underscore binding like `let _ = span` will drop the span immediately.
149#[must_use = "TraceSpan Lifetime is important"]
150#[derive(Clone)]
151pub struct TraceSpan {
152    tx: Sender<RecordMsg>,
153    id: RecordId,
154    storage_type: StorageType,
155}
156
157#[must_use = "TraceSpan Lifetime is important"]
158#[derive(Clone)]
159pub struct MayTraceSpan(Option<TraceSpan>);
160
161impl From<Option<TraceSpan>> for MayTraceSpan {
162    fn from(value: Option<TraceSpan>) -> Self {
163        Self(value)
164    }
165}
166
167impl MayTraceSpan {
168    pub fn may_send_result(&self, res: OperationResult) {
169        if let Some(span) = &self.0 {
170            span.send_result(res)
171        }
172    }
173
174    pub fn may_send_op(&self, op: Operation) {
175        if let Some(span) = &self.0 {
176            span.send(op)
177        }
178    }
179
180    pub fn may_send_iter_next(&self) {
181        if let Some(span) = &self.0 {
182            span.send(Operation::IterNext(span.id))
183        }
184    }
185}
186
187impl TraceSpan {
188    pub fn new(tx: Sender<RecordMsg>, id: RecordId, storage_type: StorageType) -> Self {
189        Self {
190            tx,
191            id,
192            storage_type,
193        }
194    }
195
196    pub fn new_global_op(op: Operation, storage_type: StorageType) -> MayTraceSpan {
197        match should_use_trace() {
198            true => Some(Self::new_to_global(op, storage_type)).into(),
199            false => None.into(),
200        }
201    }
202
203    pub fn new_epoch_span(storage_type: StorageType) -> MayTraceSpan {
204        Self::new_global_op(Operation::LocalStorageEpoch, storage_type)
205    }
206
207    pub fn new_is_dirty_span(storage_type: StorageType) -> MayTraceSpan {
208        Self::new_global_op(Operation::LocalStorageIsDirty, storage_type)
209    }
210
211    pub fn new_seal_current_epoch_span(
212        epoch: u64,
213        opts: TracedSealCurrentEpochOptions,
214        storage_type: StorageType,
215    ) -> MayTraceSpan {
216        Self::new_global_op(Operation::SealCurrentEpoch { epoch, opts }, storage_type)
217    }
218
219    pub fn new_try_wait_epoch_span(
220        epoch: HummockReadEpoch,
221        options: TracedTryWaitEpochOptions,
222    ) -> MayTraceSpan {
223        Self::new_global_op(
224            Operation::TryWaitEpoch(epoch.into(), options),
225            StorageType::Global,
226        )
227    }
228
229    pub fn new_get_span(
230        key: Bytes,
231        epoch: Option<u64>,
232        read_options: TracedReadOptions,
233        storage_type: StorageType,
234    ) -> MayTraceSpan {
235        Self::new_global_op(Operation::get(key, epoch, read_options), storage_type)
236    }
237
238    pub fn new_iter_span(
239        key_range: (Bound<Bytes>, Bound<Bytes>),
240        epoch: Option<u64>,
241        read_options: TracedReadOptions,
242        storage_type: StorageType,
243    ) -> MayTraceSpan {
244        Self::new_global_op(
245            Operation::Iter {
246                key_range: (
247                    key_range.0.as_ref().map(|v| v.clone().into()),
248                    key_range.1.as_ref().map(|v| v.clone().into()),
249                ),
250                epoch,
251                read_options,
252            },
253            storage_type,
254        )
255    }
256
257    pub fn new_insert_span(
258        key: Bytes,
259        new_val: Bytes,
260        old_val: Option<Bytes>,
261        storage_type: StorageType,
262    ) -> MayTraceSpan {
263        Self::new_global_op(
264            Operation::Insert {
265                key: key.into(),
266                new_val: new_val.into(),
267                old_val: old_val.map(|b| b.into()),
268            },
269            storage_type,
270        )
271    }
272
273    pub fn new_delete_span(key: Bytes, old_val: Bytes, storage_type: StorageType) -> MayTraceSpan {
274        Self::new_global_op(
275            Operation::Delete {
276                key: key.into(),
277                old_val: old_val.into(),
278            },
279            storage_type,
280        )
281    }
282
283    pub fn new_sync_span(
284        sync_table_epochs: &Vec<(HummockEpoch, HashSet<TableId>)>,
285        storage_type: StorageType,
286    ) -> MayTraceSpan {
287        Self::new_global_op(
288            Operation::Sync(
289                sync_table_epochs
290                    .iter()
291                    .map(|(epoch, table_ids)| {
292                        (
293                            *epoch,
294                            table_ids.iter().map(|table_id| table_id.table_id).collect(),
295                        )
296                    })
297                    .collect(),
298            ),
299            storage_type,
300        )
301    }
302
303    pub fn new_local_storage_span(
304        option: TracedNewLocalOptions,
305        storage_type: StorageType,
306        local_storage_id: u64,
307    ) -> MayTraceSpan {
308        Self::new_global_op(
309            Operation::NewLocalStorage(option, local_storage_id),
310            storage_type,
311        )
312    }
313
314    pub fn new_drop_storage_span(storage_type: StorageType) -> MayTraceSpan {
315        Self::new_global_op(Operation::DropLocalStorage, storage_type)
316    }
317
318    pub fn new_flush_span(storage_type: StorageType) -> MayTraceSpan {
319        Self::new_global_op(Operation::Flush, storage_type)
320    }
321
322    pub fn new_try_flush_span(storage_type: StorageType) -> MayTraceSpan {
323        Self::new_global_op(Operation::TryFlush, storage_type)
324    }
325
326    pub fn new_meta_message_span(resp: SubscribeResponse) -> MayTraceSpan {
327        Self::new_global_op(
328            Operation::MetaMessage(Box::new(TracedSubResp::from(resp))),
329            StorageType::Global,
330        )
331    }
332
333    pub fn new_local_storage_init_span(
334        options: TracedInitOptions,
335        storage_type: StorageType,
336    ) -> MayTraceSpan {
337        Self::new_global_op(Operation::LocalStorageInit(options), storage_type)
338    }
339
340    pub fn send(&self, op: Operation) {
341        self.tx
342            .send(Some(Record::new(*self.storage_type(), self.id(), op)))
343            .expect("failed to log record");
344    }
345
346    pub fn send_result(&self, res: OperationResult) {
347        self.send(Operation::Result(res));
348    }
349
350    pub fn finish(&self) {
351        self.send(Operation::Finish);
352    }
353
354    pub fn id(&self) -> RecordId {
355        self.id
356    }
357
358    fn storage_type(&self) -> &StorageType {
359        &self.storage_type
360    }
361
362    /// Create a span and send operation to the `GLOBAL_COLLECTOR`
363    pub fn new_to_global(op: Operation, storage_type: StorageType) -> Self {
364        let span = TraceSpan::new(GLOBAL_COLLECTOR.tx(), GLOBAL_RECORD_ID.next(), storage_type);
365        span.send(op);
366        span
367    }
368
369    #[cfg(test)]
370    pub fn new_with_op(
371        tx: Sender<RecordMsg>,
372        id: RecordId,
373        op: Operation,
374        storage_type: StorageType,
375    ) -> Self {
376        let span = TraceSpan::new(tx, id, storage_type);
377        span.send(op);
378        span
379    }
380}
381
382impl Drop for TraceSpan {
383    fn drop(&mut self) {
384        self.finish();
385    }
386}
387
388pub type RecordMsg = Option<Record>;
389pub type ConcurrentId = u64;
390pub type LocalStorageId = u64;
391
392#[derive(Copy, Clone, Debug, Encode, Decode, PartialEq, Hash, Eq)]
393pub enum StorageType {
394    Global,
395    Local(ConcurrentId, LocalStorageId),
396}
397
398task_local! {
399    // This is why we need to ignore this rule
400    // https://github.com/rust-lang/rust-clippy/issues/9224
401    #[allow(clippy::declare_interior_mutable_const)]
402    pub static LOCAL_ID: ConcurrentId;
403}
404
405#[cfg(test)]
406mod tests {
407    use std::sync::Arc;
408
409    use super::*;
410    use crate::MockTraceWriter;
411
412    #[tokio::test(flavor = "multi_thread")]
413    async fn test_new_spans_concurrent() {
414        let count = 200;
415
416        let collector = Arc::new(GlobalCollector::new());
417        let generator = Arc::new(UniqueIdGenerator::new(AtomicU64::new(0)));
418        let mut handles = Vec::with_capacity(count);
419
420        for i in 0..count {
421            let collector = collector.clone();
422            let generator = generator.clone();
423            let handle = tokio::spawn(async move {
424                let op = Operation::get(
425                    Bytes::from(vec![i as u8]),
426                    Some(123),
427                    TracedReadOptions::for_test(0),
428                );
429                let _span = TraceSpan::new_with_op(
430                    collector.tx(),
431                    generator.next(),
432                    op,
433                    StorageType::Global,
434                );
435            });
436            handles.push(handle);
437        }
438
439        for handle in handles {
440            handle.await.unwrap();
441        }
442
443        let mut rx = collector.rx.lock().take().unwrap();
444        let mut rx_count = 0;
445        rx.close();
446        while rx.recv().await.is_some() {
447            rx_count += 1;
448        }
449        assert_eq!(count * 2, rx_count);
450    }
451
452    #[tokio::test(flavor = "multi_thread")]
453    async fn test_collector_run() {
454        let count = 5000;
455        let generator = Arc::new(UniqueIdGenerator::new(AtomicU64::new(0)));
456
457        let op = Operation::get(
458            Bytes::from(vec![74, 56, 43, 67]),
459            Some(256),
460            TracedReadOptions::for_test(0),
461        );
462        let mut mock_writer = MockTraceWriter::new();
463
464        mock_writer
465            .expect_write()
466            .times(count * 2)
467            .returning(|_| Ok(0));
468        mock_writer.expect_flush().times(1).returning(|| Ok(()));
469
470        let runner_handle = GlobalCollector::run(mock_writer);
471
472        let mut handles = Vec::with_capacity(count);
473
474        for _ in 0..count {
475            let op = op.clone();
476            let tx = GLOBAL_COLLECTOR.tx();
477            let generator = generator.clone();
478            let handle = tokio::spawn(async move {
479                let _span =
480                    TraceSpan::new_with_op(tx, generator.next(), op, StorageType::Local(0, 0));
481            });
482            handles.push(handle);
483        }
484
485        for handle in handles {
486            handle.await.unwrap();
487        }
488
489        GLOBAL_COLLECTOR.finish();
490
491        runner_handle.await.unwrap();
492    }
493
494    #[ignore]
495    #[test]
496    fn test_set_use_trace() {
497        unsafe { std::env::remove_var(USE_TRACE) };
498        assert!(!set_should_use_trace());
499
500        unsafe { std::env::set_var(USE_TRACE, "true") };
501        assert!(set_should_use_trace());
502
503        unsafe { std::env::set_var(USE_TRACE, "false") };
504        assert!(!set_should_use_trace());
505
506        unsafe { std::env::set_var(USE_TRACE, "invalid") };
507        assert!(!set_should_use_trace());
508    }
509
510    #[ignore]
511    #[test]
512    fn test_should_use_trace() {
513        unsafe { std::env::set_var(USE_TRACE, "true") };
514        assert!(should_use_trace());
515        assert!(set_should_use_trace());
516    }
517}