risingwave_storage/monitor/
local_metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::HashMap;
17use std::sync::Arc;
18#[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::{AtomicU64, Ordering};
21
22use prometheus::local::{LocalHistogram, LocalIntCounter};
23use risingwave_common::catalog::TableId;
24use risingwave_common::metrics::LabelGuardedLocalIntCounter;
25use risingwave_hummock_sdk::table_stats::TableStatsMap;
26
27use super::HummockStateStoreMetrics;
28use crate::monitor::CompactorMetrics;
29
30thread_local!(static LOCAL_METRICS: RefCell<HashMap<u32,LocalStoreMetrics>> = RefCell::new(HashMap::default()));
31
32#[derive(Default, Debug)]
33pub struct StoreLocalStatistic {
34    pub cache_data_block_miss: u64,
35    pub cache_data_block_total: u64,
36    pub cache_meta_block_miss: u64,
37    pub cache_meta_block_total: u64,
38    pub cache_data_prefetch_count: u64,
39    pub cache_data_prefetch_block_count: u64,
40
41    // include multiple versions of one key.
42    pub total_key_count: u64,
43    pub skip_multi_version_key_count: u64,
44    pub skip_delete_key_count: u64,
45    pub processed_key_count: u64,
46    pub bloom_filter_true_negative_counts: u64,
47    pub remote_io_time: Arc<AtomicU64>,
48    pub bloom_filter_check_counts: u64,
49    pub get_shared_buffer_hit_counts: u64,
50    pub staging_imm_iter_count: u64,
51    pub staging_sst_iter_count: u64,
52    pub overlapping_iter_count: u64,
53    pub non_overlapping_iter_count: u64,
54    pub sub_iter_count: u64,
55    pub found_key: bool,
56
57    pub staging_imm_get_count: u64,
58    pub staging_sst_get_count: u64,
59    pub overlapping_get_count: u64,
60    pub non_overlapping_get_count: u64,
61
62    #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
63    reported: AtomicBool,
64    #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
65    added: AtomicBool,
66
67    /// The stats of key skipped by watermark for each table.
68    /// Used by `SkipWatermarkIterator`.
69    /// Generalize it in the future if there're other iterators that'll also drop keys.
70    pub skipped_by_watermark_table_stats: TableStatsMap,
71}
72
73impl StoreLocalStatistic {
74    pub fn add(&mut self, other: &StoreLocalStatistic) {
75        self.add_count(other);
76        self.add_histogram(other);
77        self.bloom_filter_true_negative_counts += other.bloom_filter_true_negative_counts;
78        self.remote_io_time.fetch_add(
79            other.remote_io_time.load(Ordering::Relaxed),
80            Ordering::Relaxed,
81        );
82        self.bloom_filter_check_counts += other.bloom_filter_check_counts;
83
84        #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
85        if other.added.fetch_or(true, Ordering::Relaxed) || other.reported.load(Ordering::Relaxed) {
86            tracing::error!("double added\n{:#?}", other);
87        }
88    }
89
90    fn report(&self, metrics: &mut LocalStoreMetrics) {
91        metrics.add_count(self);
92        metrics.add_histogram(self);
93        let t = self.remote_io_time.load(Ordering::Relaxed) as f64;
94        if t > 0.0 {
95            metrics.remote_io_time.observe(t / 1000.0);
96        }
97
98        metrics.collect_count += 1;
99        if metrics.collect_count > FLUSH_LOCAL_METRICS_TIMES {
100            metrics.flush();
101            metrics.collect_count = 0;
102        }
103        #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
104        if self.reported.fetch_or(true, Ordering::Relaxed) || self.added.load(Ordering::Relaxed) {
105            tracing::error!("double reported\n{:#?}", self);
106        }
107    }
108
109    pub fn discard(self) {
110        #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
111        {
112            self.reported.fetch_or(true, Ordering::Relaxed);
113        }
114    }
115
116    pub fn report_compactor(&self, metrics: &CompactorMetrics) {
117        let t = self.remote_io_time.load(Ordering::Relaxed) as f64;
118        if t > 0.0 {
119            metrics.remote_read_time.observe(t / 1000.0);
120        }
121        if self.processed_key_count > 0 {
122            metrics
123                .iter_scan_key_counts
124                .with_label_values(&["processed"])
125                .inc_by(self.processed_key_count);
126        }
127
128        if self.skip_multi_version_key_count > 0 {
129            metrics
130                .iter_scan_key_counts
131                .with_label_values(&["skip_multi_version"])
132                .inc_by(self.skip_multi_version_key_count);
133        }
134
135        if self.skip_delete_key_count > 0 {
136            metrics
137                .iter_scan_key_counts
138                .with_label_values(&["skip_delete"])
139                .inc_by(self.skip_delete_key_count);
140        }
141
142        if self.total_key_count > 0 {
143            metrics
144                .iter_scan_key_counts
145                .with_label_values(&["total"])
146                .inc_by(self.total_key_count);
147        }
148
149        #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
150        if self.reported.fetch_or(true, Ordering::Relaxed) || self.added.load(Ordering::Relaxed) {
151            tracing::error!("double reported\n{:#?}", self);
152        }
153    }
154
155    fn report_bloom_filter_metrics(&self, metrics: &BloomFilterLocalMetrics) {
156        if self.bloom_filter_check_counts == 0 {
157            return;
158        }
159        // checks SST bloom filters
160        metrics
161            .bloom_filter_true_negative_counts
162            .inc_by(self.bloom_filter_true_negative_counts);
163        metrics
164            .bloom_filter_check_counts
165            .inc_by(self.bloom_filter_check_counts);
166        metrics.read_req_check_bloom_filter_counts.inc();
167
168        if self.bloom_filter_check_counts > self.bloom_filter_true_negative_counts {
169            if !self.found_key {
170                // false positive
171                // checks SST bloom filters (at least one bloom filter return true) but returns
172                // nothing
173                metrics.read_req_positive_but_non_exist_counts.inc();
174            }
175            // positive
176            // checks SST bloom filters and at least one bloom filter returns positive
177            metrics.read_req_bloom_filter_positive_counts.inc();
178        }
179    }
180
181    pub fn flush_all() {
182        LOCAL_METRICS.with_borrow_mut(|local_metrics| {
183            for metrics in local_metrics.values_mut() {
184                if metrics.collect_count > 0 {
185                    metrics.flush();
186                    metrics.collect_count = 0;
187                }
188            }
189        });
190    }
191
192    pub fn ignore(&self) {
193        #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
194        self.reported.store(true, Ordering::Relaxed);
195    }
196
197    #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
198    fn need_report(&self) -> bool {
199        self.cache_data_block_miss != 0
200            || self.cache_data_block_total != 0
201            || self.cache_meta_block_miss != 0
202            || self.cache_meta_block_total != 0
203            || self.cache_data_prefetch_count != 0
204            || self.skip_multi_version_key_count != 0
205            || self.skip_delete_key_count != 0
206            || self.processed_key_count != 0
207            || self.bloom_filter_true_negative_counts != 0
208            || self.remote_io_time.load(Ordering::Relaxed) != 0
209            || self.bloom_filter_check_counts != 0
210    }
211}
212
213#[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
214impl Drop for StoreLocalStatistic {
215    fn drop(&mut self) {
216        if !self.reported.load(Ordering::Relaxed)
217            && !self.added.load(Ordering::Relaxed)
218            && self.need_report()
219        {
220            tracing::error!("local stats lost!\n{:#?}", self);
221        }
222    }
223}
224
225struct LocalStoreMetrics {
226    cache_data_block_total: LabelGuardedLocalIntCounter<2>,
227    cache_data_block_miss: LabelGuardedLocalIntCounter<2>,
228    cache_meta_block_total: LabelGuardedLocalIntCounter<2>,
229    cache_meta_block_miss: LabelGuardedLocalIntCounter<2>,
230    cache_data_prefetch_count: LabelGuardedLocalIntCounter<2>,
231    cache_data_prefetch_block_count: LabelGuardedLocalIntCounter<2>,
232    remote_io_time: LocalHistogram,
233    processed_key_count: LabelGuardedLocalIntCounter<2>,
234    skip_multi_version_key_count: LabelGuardedLocalIntCounter<2>,
235    skip_delete_key_count: LabelGuardedLocalIntCounter<2>,
236    total_key_count: LabelGuardedLocalIntCounter<2>,
237    get_shared_buffer_hit_counts: LocalIntCounter,
238    staging_imm_iter_count: LocalHistogram,
239    staging_sst_iter_count: LocalHistogram,
240    overlapping_iter_count: LocalHistogram,
241    non_overlapping_iter_count: LocalHistogram,
242    sub_iter_count: LocalHistogram,
243    iter_filter_metrics: BloomFilterLocalMetrics,
244    get_filter_metrics: BloomFilterLocalMetrics,
245    collect_count: usize,
246
247    staging_imm_get_count: LocalHistogram,
248    staging_sst_get_count: LocalHistogram,
249    overlapping_get_count: LocalHistogram,
250    non_overlapping_get_count: LocalHistogram,
251}
252
253const FLUSH_LOCAL_METRICS_TIMES: usize = 32;
254
255impl LocalStoreMetrics {
256    pub fn new(metrics: &HummockStateStoreMetrics, table_id_label: &str) -> Self {
257        let cache_data_block_total = metrics
258            .sst_store_block_request_counts
259            .with_guarded_label_values(&[table_id_label, "data_total"])
260            .local();
261
262        let cache_data_block_miss = metrics
263            .sst_store_block_request_counts
264            .with_guarded_label_values(&[table_id_label, "data_miss"])
265            .local();
266
267        let cache_meta_block_total = metrics
268            .sst_store_block_request_counts
269            .with_guarded_label_values(&[table_id_label, "meta_total"])
270            .local();
271        let cache_data_prefetch_count = metrics
272            .sst_store_block_request_counts
273            .with_guarded_label_values(&[table_id_label, "prefetch_count"])
274            .local();
275        let cache_data_prefetch_block_count = metrics
276            .sst_store_block_request_counts
277            .with_guarded_label_values(&[table_id_label, "prefetch_data_count"])
278            .local();
279
280        let cache_meta_block_miss = metrics
281            .sst_store_block_request_counts
282            .with_guarded_label_values(&[table_id_label, "meta_miss"])
283            .local();
284
285        let remote_io_time = metrics
286            .remote_read_time
287            .with_label_values(&[table_id_label])
288            .local();
289
290        let processed_key_count = metrics
291            .iter_scan_key_counts
292            .with_guarded_label_values(&[table_id_label, "processed"])
293            .local();
294
295        let skip_multi_version_key_count = metrics
296            .iter_scan_key_counts
297            .with_guarded_label_values(&[table_id_label, "skip_multi_version"])
298            .local();
299
300        let skip_delete_key_count = metrics
301            .iter_scan_key_counts
302            .with_guarded_label_values(&[table_id_label, "skip_delete"])
303            .local();
304
305        let total_key_count = metrics
306            .iter_scan_key_counts
307            .with_guarded_label_values(&[table_id_label, "total"])
308            .local();
309
310        let get_shared_buffer_hit_counts = metrics
311            .get_shared_buffer_hit_counts
312            .with_label_values(&[table_id_label])
313            .local();
314
315        let staging_imm_iter_count = metrics
316            .iter_merge_sstable_counts
317            .with_label_values(&[table_id_label, "staging-imm-iter"])
318            .local();
319        let staging_sst_iter_count = metrics
320            .iter_merge_sstable_counts
321            .with_label_values(&[table_id_label, "staging-sst-iter"])
322            .local();
323        let overlapping_iter_count = metrics
324            .iter_merge_sstable_counts
325            .with_label_values(&[table_id_label, "committed-overlapping-iter"])
326            .local();
327        let non_overlapping_iter_count = metrics
328            .iter_merge_sstable_counts
329            .with_label_values(&[table_id_label, "committed-non-overlapping-iter"])
330            .local();
331        let sub_iter_count = metrics
332            .iter_merge_sstable_counts
333            .with_label_values(&[table_id_label, "sub-iter"])
334            .local();
335        let get_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "get");
336        let iter_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "iter");
337
338        let staging_imm_get_count = metrics
339            .iter_merge_sstable_counts
340            .with_label_values(&[table_id_label, "staging-imm-get"])
341            .local();
342        let staging_sst_get_count = metrics
343            .iter_merge_sstable_counts
344            .with_label_values(&[table_id_label, "staging-sst-get"])
345            .local();
346        let overlapping_get_count = metrics
347            .iter_merge_sstable_counts
348            .with_label_values(&[table_id_label, "committed-overlapping-get"])
349            .local();
350        let non_overlapping_get_count = metrics
351            .iter_merge_sstable_counts
352            .with_label_values(&[table_id_label, "committed-non-overlapping-get"])
353            .local();
354
355        Self {
356            cache_data_block_total,
357            cache_data_block_miss,
358            cache_meta_block_total,
359            cache_meta_block_miss,
360            cache_data_prefetch_count,
361            cache_data_prefetch_block_count,
362            remote_io_time,
363            processed_key_count,
364            skip_multi_version_key_count,
365            skip_delete_key_count,
366            total_key_count,
367            get_shared_buffer_hit_counts,
368            staging_imm_iter_count,
369            staging_sst_iter_count,
370            overlapping_iter_count,
371            sub_iter_count,
372            non_overlapping_iter_count,
373            get_filter_metrics,
374            iter_filter_metrics,
375            collect_count: 0,
376            staging_imm_get_count,
377            staging_sst_get_count,
378            overlapping_get_count,
379            non_overlapping_get_count,
380        }
381    }
382
383    pub fn flush(&mut self) {
384        self.remote_io_time.flush();
385        self.iter_filter_metrics.flush();
386        self.get_filter_metrics.flush();
387        self.flush_histogram();
388        self.flush_count();
389    }
390}
391
392macro_rules! add_local_metrics_histogram {
393    ($($x:ident),*) => (
394        impl LocalStoreMetrics {
395            fn add_histogram(&self, stats: &StoreLocalStatistic) {
396                $(
397                    self.$x.observe(stats.$x as f64);
398                )*
399            }
400
401            fn flush_histogram(&mut self) {
402                $(
403                    self.$x.flush();
404                )*
405            }
406        }
407
408        impl StoreLocalStatistic {
409            fn add_histogram(&mut self, other: &StoreLocalStatistic) {
410                $(
411                    self.$x += other.$x;
412                )*
413            }
414        }
415    )
416}
417
418add_local_metrics_histogram!(
419    staging_imm_iter_count,
420    staging_sst_iter_count,
421    overlapping_iter_count,
422    non_overlapping_iter_count,
423    sub_iter_count,
424    staging_imm_get_count,
425    staging_sst_get_count,
426    overlapping_get_count,
427    non_overlapping_get_count
428);
429
430macro_rules! add_local_metrics_count {
431    ($($x:ident),*) => (
432        impl LocalStoreMetrics {
433            fn add_count(&self, stats: &StoreLocalStatistic) {
434                $(
435                    self.$x.inc_by(stats.$x);
436                )*
437            }
438
439            fn flush_count(&mut self) {
440                $(
441                    self.$x.flush();
442                )*
443            }
444        }
445
446        impl StoreLocalStatistic {
447            fn add_count(&mut self, other: &StoreLocalStatistic) {
448                $(
449                    self.$x += other.$x;
450                )*
451            }
452        }
453    )
454}
455
456add_local_metrics_count!(
457    cache_data_block_total,
458    cache_data_block_miss,
459    cache_meta_block_total,
460    cache_meta_block_miss,
461    cache_data_prefetch_count,
462    cache_data_prefetch_block_count,
463    skip_multi_version_key_count,
464    skip_delete_key_count,
465    get_shared_buffer_hit_counts,
466    total_key_count,
467    processed_key_count
468);
469
470macro_rules! define_bloom_filter_metrics {
471    ($($x:ident),*) => (
472        struct BloomFilterLocalMetrics {
473            $($x: LabelGuardedLocalIntCounter<2>,)*
474        }
475
476        impl BloomFilterLocalMetrics {
477            pub fn new(metrics: &HummockStateStoreMetrics, table_id_label: &str, oper_type: &str) -> Self {
478                // checks SST bloom filters
479                Self {
480                    $($x: metrics.$x.with_guarded_label_values(&[table_id_label, oper_type]).local(),)*
481                }
482            }
483
484            pub fn flush(&mut self) {
485                $(
486                    self.$x.flush();
487                )*
488            }
489        }
490    )
491}
492
493define_bloom_filter_metrics!(
494    read_req_check_bloom_filter_counts,
495    bloom_filter_check_counts,
496    bloom_filter_true_negative_counts,
497    read_req_positive_but_non_exist_counts,
498    read_req_bloom_filter_positive_counts
499);
500
501pub struct GetLocalMetricsGuard {
502    metrics: Arc<HummockStateStoreMetrics>,
503    table_id: TableId,
504    pub local_stats: StoreLocalStatistic,
505}
506
507impl GetLocalMetricsGuard {
508    pub fn new(metrics: Arc<HummockStateStoreMetrics>, table_id: TableId) -> Self {
509        Self {
510            metrics,
511            table_id,
512            local_stats: StoreLocalStatistic::default(),
513        }
514    }
515}
516
517impl Drop for GetLocalMetricsGuard {
518    fn drop(&mut self) {
519        LOCAL_METRICS.with_borrow_mut(|local_metrics| {
520            let table_metrics = local_metrics
521                .entry(self.table_id.table_id)
522                .or_insert_with(|| {
523                    LocalStoreMetrics::new(
524                        self.metrics.as_ref(),
525                        self.table_id.to_string().as_str(),
526                    )
527                });
528            self.local_stats.report(table_metrics);
529            self.local_stats
530                .report_bloom_filter_metrics(&table_metrics.get_filter_metrics);
531        });
532    }
533}
534
535pub struct IterLocalMetricsGuard {
536    metrics: Arc<HummockStateStoreMetrics>,
537    table_id: TableId,
538    pub local_stats: StoreLocalStatistic,
539}
540
541impl IterLocalMetricsGuard {
542    pub fn new(
543        metrics: Arc<HummockStateStoreMetrics>,
544        table_id: TableId,
545        local_stats: StoreLocalStatistic,
546    ) -> Self {
547        Self {
548            metrics,
549            table_id,
550            local_stats,
551        }
552    }
553}
554
555impl Drop for IterLocalMetricsGuard {
556    fn drop(&mut self) {
557        LOCAL_METRICS.with_borrow_mut(|local_metrics| {
558            let table_metrics = local_metrics
559                .entry(self.table_id.table_id)
560                .or_insert_with(|| {
561                    LocalStoreMetrics::new(
562                        self.metrics.as_ref(),
563                        self.table_id.to_string().as_str(),
564                    )
565                });
566            self.local_stats.report(table_metrics);
567            self.local_stats
568                .report_bloom_filter_metrics(&table_metrics.iter_filter_metrics);
569        });
570    }
571}