risingwave_hummock_trace/
collector.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::env;
17use std::fs::{OpenOptions, create_dir_all};
18use std::io::BufWriter;
19use std::ops::Bound;
20use std::path::Path;
21use std::sync::LazyLock;
22use std::sync::atomic::AtomicU64;
23
24use bincode::{Decode, Encode};
25use bytes::Bytes;
26use parking_lot::Mutex;
27use risingwave_common::catalog::TableId;
28use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
29use risingwave_pb::meta::SubscribeResponse;
30use tokio::runtime::Runtime;
31use tokio::sync::mpsc::{
32    UnboundedReceiver as Receiver, UnboundedSender as Sender, unbounded_channel as channel,
33};
34use tokio::task_local;
35
36use crate::write::{TraceWriter, TraceWriterImpl};
37use crate::{
38    ConcurrentIdGenerator, Operation, OperationResult, Record, RecordId, RecordIdGenerator,
39    TracedInitOptions, TracedNewLocalOptions, TracedReadOptions, TracedSealCurrentEpochOptions,
40    TracedSubResp, TracedTryWaitEpochOptions, UniqueIdGenerator,
41};
42
43// Global collector instance used for trace collection
44static GLOBAL_COLLECTOR: LazyLock<GlobalCollector> = LazyLock::new(GlobalCollector::new);
45
46// Global record ID generator for generating unique record IDs
47static GLOBAL_RECORD_ID: LazyLock<RecordIdGenerator> =
48    LazyLock::new(|| UniqueIdGenerator::new(AtomicU64::new(0)));
49
50// Flag indicating whether trace should be used
51static SHOULD_USE_TRACE: LazyLock<bool> = LazyLock::new(set_should_use_trace);
52
53// Concurrent record ID generator for generating unique record IDs in concurrent environments
54pub static CONCURRENT_ID: LazyLock<ConcurrentIdGenerator> =
55    LazyLock::new(|| UniqueIdGenerator::new(AtomicU64::new(0)));
56
57// A tokio runtime for hummock tracing
58static TRACE_RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
59
60pub const USE_TRACE: &str = "USE_HM_TRACE"; // Environment variable name for enabling trace
61const LOG_PATH: &str = "HM_TRACE_PATH"; // Environment variable name for specifying trace log path
62const DEFAULT_PATH: &str = ".trace/hummock.ht"; // Default trace log path
63const WRITER_BUFFER_SIZE: usize = 1024; // Buffer size for trace writer
64
65/// Returns whether trace should be used based on the environment variable
66pub fn should_use_trace() -> bool {
67    *SHOULD_USE_TRACE
68}
69
70/// Sets the value of the `SHOULD_USE_TRACE` flag based on the `USE_TRACE` environment variable
71fn set_should_use_trace() -> bool {
72    match std::env::var(USE_TRACE) {
73        Ok(v) => v.parse().unwrap_or(false),
74        Err(_) => false,
75    }
76}
77
78/// Initialize the `GLOBAL_COLLECTOR` with configured log file
79pub fn init_collector() {
80    TRACE_RT.spawn(async move {
81        let path = env::var(LOG_PATH).unwrap_or_else(|_| DEFAULT_PATH.to_owned());
82        let path = Path::new(&path);
83        tracing::info!("Hummock Tracing log path {}", path.to_string_lossy());
84
85        if let Some(parent) = path.parent() {
86            if !parent.exists() {
87                create_dir_all(parent).unwrap();
88            }
89        }
90        let f = OpenOptions::new()
91            .write(true)
92            .truncate(true)
93            .create(true)
94            .open(path)
95            .expect("failed to open log file");
96        let writer = BufWriter::with_capacity(WRITER_BUFFER_SIZE, f);
97        let writer = TraceWriterImpl::try_new_bincode(writer).unwrap();
98        GlobalCollector::run(writer);
99    });
100}
101
102/// `GlobalCollector` collects traced hummock operations.
103/// It starts a collector thread and writer thread.
104struct GlobalCollector {
105    tx: Sender<RecordMsg>,
106    rx: Mutex<Option<Receiver<RecordMsg>>>,
107}
108
109impl GlobalCollector {
110    fn new() -> Self {
111        let (tx, rx) = channel();
112        Self {
113            tx,
114            rx: Mutex::new(Some(rx)),
115        }
116    }
117
118    fn run(mut writer: impl TraceWriter + Send + 'static) -> tokio::task::JoinHandle<()> {
119        tokio::task::spawn_blocking(move || {
120            let mut rx = GLOBAL_COLLECTOR.rx.lock().take().unwrap();
121            while let Some(Some(r)) = rx.blocking_recv() {
122                writer.write(r).expect("failed to write hummock trace");
123            }
124            writer.flush().expect("failed to flush hummock trace");
125        })
126    }
127
128    fn finish(&self) {
129        self.tx.send(None).expect("failed to finish worker");
130    }
131
132    fn tx(&self) -> Sender<RecordMsg> {
133        self.tx.clone()
134    }
135}
136
137impl Drop for GlobalCollector {
138    fn drop(&mut self) {
139        // only send when channel is not closed
140        if !self.tx.is_closed() {
141            self.finish();
142        }
143    }
144}
145
146/// `TraceSpan` traces hummock operations. It marks the beginning of an operation and
147/// the end when the span is dropped. So, please make sure the span live long enough.
148/// Underscore binding like `let _ = span` will drop the span immediately.
149#[must_use = "TraceSpan Lifetime is important"]
150#[derive(Clone)]
151pub struct TraceSpan {
152    tx: Sender<RecordMsg>,
153    id: RecordId,
154    storage_type: StorageType,
155}
156
157#[must_use = "TraceSpan Lifetime is important"]
158#[derive(Clone)]
159pub struct MayTraceSpan(Option<TraceSpan>);
160
161impl From<Option<TraceSpan>> for MayTraceSpan {
162    fn from(value: Option<TraceSpan>) -> Self {
163        Self(value)
164    }
165}
166
167impl MayTraceSpan {
168    pub fn may_send_result(&self, res: OperationResult) {
169        if let Some(span) = &self.0 {
170            span.send_result(res)
171        }
172    }
173
174    pub fn may_send_op(&self, op: Operation) {
175        if let Some(span) = &self.0 {
176            span.send(op)
177        }
178    }
179
180    pub fn may_send_iter_next(&self) {
181        if let Some(span) = &self.0 {
182            span.send(Operation::IterNext(span.id))
183        }
184    }
185}
186
187impl TraceSpan {
188    pub fn new(tx: Sender<RecordMsg>, id: RecordId, storage_type: StorageType) -> Self {
189        Self {
190            tx,
191            id,
192            storage_type,
193        }
194    }
195
196    pub fn new_global_op(op: Operation, storage_type: StorageType) -> MayTraceSpan {
197        match should_use_trace() {
198            true => Some(Self::new_to_global(op, storage_type)).into(),
199            false => None.into(),
200        }
201    }
202
203    pub fn new_seal_current_epoch_span(
204        epoch: u64,
205        opts: TracedSealCurrentEpochOptions,
206        storage_type: StorageType,
207    ) -> MayTraceSpan {
208        Self::new_global_op(Operation::SealCurrentEpoch { epoch, opts }, storage_type)
209    }
210
211    pub fn new_try_wait_epoch_span(
212        epoch: HummockReadEpoch,
213        options: TracedTryWaitEpochOptions,
214    ) -> MayTraceSpan {
215        Self::new_global_op(
216            Operation::TryWaitEpoch(epoch.into(), options),
217            StorageType::Global,
218        )
219    }
220
221    pub fn new_get_span(
222        key: Bytes,
223        epoch: Option<u64>,
224        read_options: TracedReadOptions,
225        storage_type: StorageType,
226    ) -> MayTraceSpan {
227        Self::new_global_op(Operation::get(key, epoch, read_options), storage_type)
228    }
229
230    pub fn new_iter_span(
231        key_range: (Bound<Bytes>, Bound<Bytes>),
232        epoch: Option<u64>,
233        read_options: TracedReadOptions,
234        storage_type: StorageType,
235    ) -> MayTraceSpan {
236        Self::new_global_op(
237            Operation::Iter {
238                key_range: (
239                    key_range.0.as_ref().map(|v| v.clone().into()),
240                    key_range.1.as_ref().map(|v| v.clone().into()),
241                ),
242                epoch,
243                read_options,
244            },
245            storage_type,
246        )
247    }
248
249    pub fn new_insert_span(
250        key: Bytes,
251        new_val: Bytes,
252        old_val: Option<Bytes>,
253        storage_type: StorageType,
254    ) -> MayTraceSpan {
255        Self::new_global_op(
256            Operation::Insert {
257                key: key.into(),
258                new_val: new_val.into(),
259                old_val: old_val.map(|b| b.into()),
260            },
261            storage_type,
262        )
263    }
264
265    pub fn new_delete_span(key: Bytes, old_val: Bytes, storage_type: StorageType) -> MayTraceSpan {
266        Self::new_global_op(
267            Operation::Delete {
268                key: key.into(),
269                old_val: old_val.into(),
270            },
271            storage_type,
272        )
273    }
274
275    pub fn new_sync_span(
276        sync_table_epochs: &Vec<(HummockEpoch, HashSet<TableId>)>,
277        storage_type: StorageType,
278    ) -> MayTraceSpan {
279        Self::new_global_op(
280            Operation::Sync(
281                sync_table_epochs
282                    .iter()
283                    .map(|(epoch, table_ids)| {
284                        (
285                            *epoch,
286                            table_ids.iter().map(|table_id| table_id.table_id).collect(),
287                        )
288                    })
289                    .collect(),
290            ),
291            storage_type,
292        )
293    }
294
295    pub fn new_local_storage_span(
296        option: TracedNewLocalOptions,
297        storage_type: StorageType,
298        local_storage_id: u64,
299    ) -> MayTraceSpan {
300        Self::new_global_op(
301            Operation::NewLocalStorage(option, local_storage_id),
302            storage_type,
303        )
304    }
305
306    pub fn new_drop_storage_span(storage_type: StorageType) -> MayTraceSpan {
307        Self::new_global_op(Operation::DropLocalStorage, storage_type)
308    }
309
310    pub fn new_flush_span(storage_type: StorageType) -> MayTraceSpan {
311        Self::new_global_op(Operation::Flush, storage_type)
312    }
313
314    pub fn new_try_flush_span(storage_type: StorageType) -> MayTraceSpan {
315        Self::new_global_op(Operation::TryFlush, storage_type)
316    }
317
318    pub fn new_meta_message_span(resp: SubscribeResponse) -> MayTraceSpan {
319        Self::new_global_op(
320            Operation::MetaMessage(Box::new(TracedSubResp::from(resp))),
321            StorageType::Global,
322        )
323    }
324
325    pub fn new_local_storage_init_span(
326        options: TracedInitOptions,
327        storage_type: StorageType,
328    ) -> MayTraceSpan {
329        Self::new_global_op(Operation::LocalStorageInit(options), storage_type)
330    }
331
332    pub fn send(&self, op: Operation) {
333        self.tx
334            .send(Some(Record::new(*self.storage_type(), self.id(), op)))
335            .expect("failed to log record");
336    }
337
338    pub fn send_result(&self, res: OperationResult) {
339        self.send(Operation::Result(res));
340    }
341
342    pub fn finish(&self) {
343        self.send(Operation::Finish);
344    }
345
346    pub fn id(&self) -> RecordId {
347        self.id
348    }
349
350    fn storage_type(&self) -> &StorageType {
351        &self.storage_type
352    }
353
354    /// Create a span and send operation to the `GLOBAL_COLLECTOR`
355    pub fn new_to_global(op: Operation, storage_type: StorageType) -> Self {
356        let span = TraceSpan::new(GLOBAL_COLLECTOR.tx(), GLOBAL_RECORD_ID.next(), storage_type);
357        span.send(op);
358        span
359    }
360
361    #[cfg(test)]
362    pub fn new_with_op(
363        tx: Sender<RecordMsg>,
364        id: RecordId,
365        op: Operation,
366        storage_type: StorageType,
367    ) -> Self {
368        let span = TraceSpan::new(tx, id, storage_type);
369        span.send(op);
370        span
371    }
372}
373
374impl Drop for TraceSpan {
375    fn drop(&mut self) {
376        self.finish();
377    }
378}
379
380pub type RecordMsg = Option<Record>;
381pub type ConcurrentId = u64;
382pub type LocalStorageId = u64;
383
384#[derive(Copy, Clone, Debug, Encode, Decode, PartialEq, Hash, Eq)]
385pub enum StorageType {
386    Global,
387    Local(ConcurrentId, LocalStorageId),
388}
389
390task_local! {
391    // This is why we need to ignore this rule
392    // https://github.com/rust-lang/rust-clippy/issues/9224
393    #[allow(clippy::declare_interior_mutable_const)]
394    pub static LOCAL_ID: ConcurrentId;
395}
396
397#[cfg(test)]
398mod tests {
399    use std::sync::Arc;
400
401    use super::*;
402    use crate::MockTraceWriter;
403
404    #[tokio::test(flavor = "multi_thread")]
405    async fn test_new_spans_concurrent() {
406        let count = 200;
407
408        let collector = Arc::new(GlobalCollector::new());
409        let generator = Arc::new(UniqueIdGenerator::new(AtomicU64::new(0)));
410        let mut handles = Vec::with_capacity(count);
411
412        for i in 0..count {
413            let collector = collector.clone();
414            let generator = generator.clone();
415            let handle = tokio::spawn(async move {
416                let op = Operation::get(
417                    Bytes::from(vec![i as u8]),
418                    Some(123),
419                    TracedReadOptions::for_test(0),
420                );
421                let _span = TraceSpan::new_with_op(
422                    collector.tx(),
423                    generator.next(),
424                    op,
425                    StorageType::Global,
426                );
427            });
428            handles.push(handle);
429        }
430
431        for handle in handles {
432            handle.await.unwrap();
433        }
434
435        let mut rx = collector.rx.lock().take().unwrap();
436        let mut rx_count = 0;
437        rx.close();
438        while rx.recv().await.is_some() {
439            rx_count += 1;
440        }
441        assert_eq!(count * 2, rx_count);
442    }
443
444    #[tokio::test(flavor = "multi_thread")]
445    async fn test_collector_run() {
446        let count = 5000;
447        let generator = Arc::new(UniqueIdGenerator::new(AtomicU64::new(0)));
448
449        let op = Operation::get(
450            Bytes::from(vec![74, 56, 43, 67]),
451            Some(256),
452            TracedReadOptions::for_test(0),
453        );
454        let mut mock_writer = MockTraceWriter::new();
455
456        mock_writer
457            .expect_write()
458            .times(count * 2)
459            .returning(|_| Ok(0));
460        mock_writer.expect_flush().times(1).returning(|| Ok(()));
461
462        let runner_handle = GlobalCollector::run(mock_writer);
463
464        let mut handles = Vec::with_capacity(count);
465
466        for _ in 0..count {
467            let op = op.clone();
468            let tx = GLOBAL_COLLECTOR.tx();
469            let generator = generator.clone();
470            let handle = tokio::spawn(async move {
471                let _span =
472                    TraceSpan::new_with_op(tx, generator.next(), op, StorageType::Local(0, 0));
473            });
474            handles.push(handle);
475        }
476
477        for handle in handles {
478            handle.await.unwrap();
479        }
480
481        GLOBAL_COLLECTOR.finish();
482
483        runner_handle.await.unwrap();
484    }
485
486    #[ignore]
487    #[test]
488    fn test_set_use_trace() {
489        unsafe { std::env::remove_var(USE_TRACE) };
490        assert!(!set_should_use_trace());
491
492        unsafe { std::env::set_var(USE_TRACE, "true") };
493        assert!(set_should_use_trace());
494
495        unsafe { std::env::set_var(USE_TRACE, "false") };
496        assert!(!set_should_use_trace());
497
498        unsafe { std::env::set_var(USE_TRACE, "invalid") };
499        assert!(!set_should_use_trace());
500    }
501
502    #[ignore]
503    #[test]
504    fn test_should_use_trace() {
505        unsafe { std::env::set_var(USE_TRACE, "true") };
506        assert!(should_use_trace());
507        assert!(set_should_use_trace());
508    }
509}