risingwave_storage/monitor/
local_metrics.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cell::RefCell;
16use std::collections::HashMap;
17use std::sync::Arc;
18#[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::{AtomicU64, Ordering};
21
22use prometheus::local::{LocalHistogram, LocalIntCounter};
23use risingwave_common::catalog::TableId;
24use risingwave_common::metrics::LabelGuardedLocalIntCounter;
25use risingwave_hummock_sdk::table_stats::TableStatsMap;
26
27use super::HummockStateStoreMetrics;
28use crate::monitor::CompactorMetrics;
29
30thread_local!(static LOCAL_METRICS: RefCell<HashMap<u32,LocalStoreMetrics>> = RefCell::new(HashMap::default()));
31
32#[cfg(test)]
33pub(crate) fn flush_local_metrics_for_test() {
34    LOCAL_METRICS.with_borrow_mut(|local_metrics| {
35        for metrics in local_metrics.values_mut() {
36            metrics.flush();
37        }
38    });
39}
40
41#[derive(Default, Debug)]
42pub struct StoreLocalStatistic {
43    pub cache_data_block_miss: u64,
44    pub cache_data_block_total: u64,
45    pub cache_meta_block_miss: u64,
46    pub cache_meta_block_total: u64,
47    pub cache_data_prefetch_count: u64,
48    pub cache_data_prefetch_block_count: u64,
49
50    // include multiple versions of one key.
51    pub total_key_count: u64,
52    pub skip_multi_version_key_count: u64,
53    pub skip_delete_key_count: u64,
54    pub processed_key_count: u64,
55    pub bloom_filter_true_negative_counts: u64,
56    pub remote_io_time: Arc<AtomicU64>,
57    pub bloom_filter_check_counts: u64,
58    pub get_shared_buffer_hit_counts: u64,
59    pub staging_imm_iter_count: u64,
60    pub staging_sst_iter_count: u64,
61    pub overlapping_iter_count: u64,
62    pub non_overlapping_iter_count: u64,
63    pub sub_iter_count: u64,
64    pub found_key: bool,
65
66    pub staging_imm_get_count: u64,
67    pub staging_sst_get_count: u64,
68    pub overlapping_get_count: u64,
69    pub non_overlapping_get_count: u64,
70
71    pub vnode_checked_get_count: u64,
72    pub vnode_pruned_get_count: u64,
73
74    #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
75    reported: AtomicBool,
76    #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
77    added: AtomicBool,
78
79    /// The stats of key skipped by watermark for each table.
80    /// Used by `SkipWatermarkIterator`.
81    /// Generalize it in the future if there're other iterators that'll also drop keys.
82    pub skipped_by_watermark_table_stats: TableStatsMap,
83}
84
85impl StoreLocalStatistic {
86    pub fn add(&mut self, other: &StoreLocalStatistic) {
87        self.add_count(other);
88        self.add_histogram(other);
89        self.bloom_filter_true_negative_counts += other.bloom_filter_true_negative_counts;
90        self.remote_io_time.fetch_add(
91            other.remote_io_time.load(Ordering::Relaxed),
92            Ordering::Relaxed,
93        );
94        self.bloom_filter_check_counts += other.bloom_filter_check_counts;
95
96        #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
97        if other.added.fetch_or(true, Ordering::Relaxed) || other.reported.load(Ordering::Relaxed) {
98            tracing::error!("double added\n{:#?}", other);
99        }
100    }
101
102    fn report(&self, metrics: &mut LocalStoreMetrics) {
103        metrics.add_count(self);
104        metrics.add_histogram(self);
105        let t = self.remote_io_time.load(Ordering::Relaxed) as f64;
106        if t > 0.0 {
107            metrics.remote_io_time.observe(t / 1000.0);
108        }
109
110        metrics.collect_count += 1;
111        if metrics.collect_count > FLUSH_LOCAL_METRICS_TIMES {
112            metrics.flush();
113            metrics.collect_count = 0;
114        }
115        #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
116        if self.reported.fetch_or(true, Ordering::Relaxed) || self.added.load(Ordering::Relaxed) {
117            tracing::error!("double reported\n{:#?}", self);
118        }
119    }
120
121    pub fn discard(self) {
122        #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
123        {
124            self.reported.fetch_or(true, Ordering::Relaxed);
125        }
126    }
127
128    pub fn report_compactor(&self, metrics: &CompactorMetrics) {
129        let t = self.remote_io_time.load(Ordering::Relaxed) as f64;
130        if t > 0.0 {
131            metrics.remote_read_time.observe(t / 1000.0);
132        }
133        if self.processed_key_count > 0 {
134            metrics
135                .iter_scan_key_counts
136                .with_label_values(&["processed"])
137                .inc_by(self.processed_key_count);
138        }
139
140        if self.skip_multi_version_key_count > 0 {
141            metrics
142                .iter_scan_key_counts
143                .with_label_values(&["skip_multi_version"])
144                .inc_by(self.skip_multi_version_key_count);
145        }
146
147        if self.skip_delete_key_count > 0 {
148            metrics
149                .iter_scan_key_counts
150                .with_label_values(&["skip_delete"])
151                .inc_by(self.skip_delete_key_count);
152        }
153
154        if self.total_key_count > 0 {
155            metrics
156                .iter_scan_key_counts
157                .with_label_values(&["total"])
158                .inc_by(self.total_key_count);
159        }
160
161        #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
162        if self.reported.fetch_or(true, Ordering::Relaxed) || self.added.load(Ordering::Relaxed) {
163            tracing::error!("double reported\n{:#?}", self);
164        }
165    }
166
167    fn report_bloom_filter_metrics(&self, metrics: &BloomFilterLocalMetrics) {
168        if self.bloom_filter_check_counts == 0 {
169            return;
170        }
171        // checks SST bloom filters
172        metrics
173            .bloom_filter_true_negative_counts
174            .inc_by(self.bloom_filter_true_negative_counts);
175        metrics
176            .bloom_filter_check_counts
177            .inc_by(self.bloom_filter_check_counts);
178        metrics.read_req_check_bloom_filter_counts.inc();
179
180        if self.bloom_filter_check_counts > self.bloom_filter_true_negative_counts {
181            if !self.found_key {
182                // false positive
183                // checks SST bloom filters (at least one bloom filter return true) but returns
184                // nothing
185                metrics.read_req_positive_but_non_exist_counts.inc();
186            }
187            // positive
188            // checks SST bloom filters and at least one bloom filter returns positive
189            metrics.read_req_bloom_filter_positive_counts.inc();
190        }
191    }
192
193    pub fn flush_all() {
194        LOCAL_METRICS.with_borrow_mut(|local_metrics| {
195            for metrics in local_metrics.values_mut() {
196                if metrics.collect_count > 0 {
197                    metrics.flush();
198                    metrics.collect_count = 0;
199                }
200            }
201        });
202    }
203
204    pub fn ignore(&self) {
205        #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
206        self.reported.store(true, Ordering::Relaxed);
207    }
208
209    #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
210    fn need_report(&self) -> bool {
211        self.cache_data_block_miss != 0
212            || self.cache_data_block_total != 0
213            || self.cache_meta_block_miss != 0
214            || self.cache_meta_block_total != 0
215            || self.cache_data_prefetch_count != 0
216            || self.skip_multi_version_key_count != 0
217            || self.skip_delete_key_count != 0
218            || self.processed_key_count != 0
219            || self.bloom_filter_true_negative_counts != 0
220            || self.remote_io_time.load(Ordering::Relaxed) != 0
221            || self.bloom_filter_check_counts != 0
222            || self.vnode_checked_get_count != 0
223            || self.vnode_pruned_get_count != 0
224    }
225}
226
227#[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
228impl Drop for StoreLocalStatistic {
229    fn drop(&mut self) {
230        if !self.reported.load(Ordering::Relaxed)
231            && !self.added.load(Ordering::Relaxed)
232            && self.need_report()
233        {
234            tracing::error!("local stats lost!\n{:#?}", self);
235        }
236    }
237}
238
239struct LocalStoreMetrics {
240    cache_data_block_total: LabelGuardedLocalIntCounter,
241    cache_data_block_miss: LabelGuardedLocalIntCounter,
242    cache_meta_block_total: LabelGuardedLocalIntCounter,
243    cache_meta_block_miss: LabelGuardedLocalIntCounter,
244    cache_data_prefetch_count: LabelGuardedLocalIntCounter,
245    cache_data_prefetch_block_count: LabelGuardedLocalIntCounter,
246    remote_io_time: LocalHistogram,
247    processed_key_count: LabelGuardedLocalIntCounter,
248    skip_multi_version_key_count: LabelGuardedLocalIntCounter,
249    skip_delete_key_count: LabelGuardedLocalIntCounter,
250    total_key_count: LabelGuardedLocalIntCounter,
251    get_shared_buffer_hit_counts: LocalIntCounter,
252    staging_imm_iter_count: LocalHistogram,
253    staging_sst_iter_count: LocalHistogram,
254    overlapping_iter_count: LocalHistogram,
255    non_overlapping_iter_count: LocalHistogram,
256    sub_iter_count: LocalHistogram,
257    iter_filter_metrics: BloomFilterLocalMetrics,
258    get_filter_metrics: BloomFilterLocalMetrics,
259    collect_count: usize,
260
261    staging_imm_get_count: LocalHistogram,
262    staging_sst_get_count: LocalHistogram,
263    overlapping_get_count: LocalHistogram,
264    non_overlapping_get_count: LocalHistogram,
265
266    vnode_checked_get_count: LabelGuardedLocalIntCounter,
267    vnode_pruned_get_count: LabelGuardedLocalIntCounter,
268}
269
270const FLUSH_LOCAL_METRICS_TIMES: usize = 32;
271
272impl LocalStoreMetrics {
273    pub fn new(metrics: &HummockStateStoreMetrics, table_id_label: &str) -> Self {
274        let cache_data_block_total = metrics
275            .sst_store_block_request_counts
276            .with_guarded_label_values(&[table_id_label, "data_total"])
277            .local();
278
279        let cache_data_block_miss = metrics
280            .sst_store_block_request_counts
281            .with_guarded_label_values(&[table_id_label, "data_miss"])
282            .local();
283
284        let cache_meta_block_total = metrics
285            .sst_store_block_request_counts
286            .with_guarded_label_values(&[table_id_label, "meta_total"])
287            .local();
288        let cache_data_prefetch_count = metrics
289            .sst_store_block_request_counts
290            .with_guarded_label_values(&[table_id_label, "prefetch_count"])
291            .local();
292        let cache_data_prefetch_block_count = metrics
293            .sst_store_block_request_counts
294            .with_guarded_label_values(&[table_id_label, "prefetch_data_count"])
295            .local();
296
297        let cache_meta_block_miss = metrics
298            .sst_store_block_request_counts
299            .with_guarded_label_values(&[table_id_label, "meta_miss"])
300            .local();
301
302        let remote_io_time = metrics
303            .remote_read_time
304            .with_label_values(&[table_id_label])
305            .local();
306
307        let processed_key_count = metrics
308            .iter_scan_key_counts
309            .with_guarded_label_values(&[table_id_label, "processed"])
310            .local();
311
312        let skip_multi_version_key_count = metrics
313            .iter_scan_key_counts
314            .with_guarded_label_values(&[table_id_label, "skip_multi_version"])
315            .local();
316
317        let skip_delete_key_count = metrics
318            .iter_scan_key_counts
319            .with_guarded_label_values(&[table_id_label, "skip_delete"])
320            .local();
321
322        let total_key_count = metrics
323            .iter_scan_key_counts
324            .with_guarded_label_values(&[table_id_label, "total"])
325            .local();
326
327        let get_shared_buffer_hit_counts = metrics
328            .get_shared_buffer_hit_counts
329            .with_label_values(&[table_id_label])
330            .local();
331
332        let staging_imm_iter_count = metrics
333            .iter_merge_sstable_counts
334            .with_label_values(&[table_id_label, "staging-imm-iter"])
335            .local();
336        let staging_sst_iter_count = metrics
337            .iter_merge_sstable_counts
338            .with_label_values(&[table_id_label, "staging-sst-iter"])
339            .local();
340        let overlapping_iter_count = metrics
341            .iter_merge_sstable_counts
342            .with_label_values(&[table_id_label, "committed-overlapping-iter"])
343            .local();
344        let non_overlapping_iter_count = metrics
345            .iter_merge_sstable_counts
346            .with_label_values(&[table_id_label, "committed-non-overlapping-iter"])
347            .local();
348        let sub_iter_count = metrics
349            .iter_merge_sstable_counts
350            .with_label_values(&[table_id_label, "sub-iter"])
351            .local();
352        let get_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "get");
353        let iter_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "iter");
354
355        let staging_imm_get_count = metrics
356            .iter_merge_sstable_counts
357            .with_label_values(&[table_id_label, "staging-imm-get"])
358            .local();
359        let staging_sst_get_count = metrics
360            .iter_merge_sstable_counts
361            .with_label_values(&[table_id_label, "staging-sst-get"])
362            .local();
363        let overlapping_get_count = metrics
364            .iter_merge_sstable_counts
365            .with_label_values(&[table_id_label, "committed-overlapping-get"])
366            .local();
367        let non_overlapping_get_count = metrics
368            .iter_merge_sstable_counts
369            .with_label_values(&[table_id_label, "committed-non-overlapping-get"])
370            .local();
371
372        let vnode_checked_get_count = metrics
373            .vnode_pruning_counts
374            .with_guarded_label_values(&[table_id_label, "get", "checked"])
375            .local();
376        let vnode_pruned_get_count = metrics
377            .vnode_pruning_counts
378            .with_guarded_label_values(&[table_id_label, "get", "pruned"])
379            .local();
380
381        Self {
382            cache_data_block_total,
383            cache_data_block_miss,
384            cache_meta_block_total,
385            cache_meta_block_miss,
386            cache_data_prefetch_count,
387            cache_data_prefetch_block_count,
388            remote_io_time,
389            processed_key_count,
390            skip_multi_version_key_count,
391            skip_delete_key_count,
392            total_key_count,
393            get_shared_buffer_hit_counts,
394            staging_imm_iter_count,
395            staging_sst_iter_count,
396            overlapping_iter_count,
397            sub_iter_count,
398            non_overlapping_iter_count,
399            get_filter_metrics,
400            iter_filter_metrics,
401            collect_count: 0,
402            staging_imm_get_count,
403            staging_sst_get_count,
404            overlapping_get_count,
405            non_overlapping_get_count,
406            vnode_checked_get_count,
407            vnode_pruned_get_count,
408        }
409    }
410
411    pub fn flush(&mut self) {
412        self.remote_io_time.flush();
413        self.iter_filter_metrics.flush();
414        self.get_filter_metrics.flush();
415        self.flush_histogram();
416        self.flush_count();
417    }
418}
419
420macro_rules! add_local_metrics_histogram {
421    ($($x:ident),*) => (
422        impl LocalStoreMetrics {
423            fn add_histogram(&self, stats: &StoreLocalStatistic) {
424                $(
425                    self.$x.observe(stats.$x as f64);
426                )*
427            }
428
429            fn flush_histogram(&mut self) {
430                $(
431                    self.$x.flush();
432                )*
433            }
434        }
435
436        impl StoreLocalStatistic {
437            fn add_histogram(&mut self, other: &StoreLocalStatistic) {
438                $(
439                    self.$x += other.$x;
440                )*
441            }
442        }
443    )
444}
445
446add_local_metrics_histogram!(
447    staging_imm_iter_count,
448    staging_sst_iter_count,
449    overlapping_iter_count,
450    non_overlapping_iter_count,
451    sub_iter_count,
452    staging_imm_get_count,
453    staging_sst_get_count,
454    overlapping_get_count,
455    non_overlapping_get_count
456);
457
458macro_rules! add_local_metrics_count {
459    ($($x:ident),*) => (
460        impl LocalStoreMetrics {
461            fn add_count(&self, stats: &StoreLocalStatistic) {
462                $(
463                    self.$x.inc_by(stats.$x);
464                )*
465            }
466
467            fn flush_count(&mut self) {
468                $(
469                    self.$x.flush();
470                )*
471            }
472        }
473
474        impl StoreLocalStatistic {
475            fn add_count(&mut self, other: &StoreLocalStatistic) {
476                $(
477                    self.$x += other.$x;
478                )*
479            }
480        }
481    )
482}
483
484add_local_metrics_count!(
485    cache_data_block_total,
486    cache_data_block_miss,
487    cache_meta_block_total,
488    cache_meta_block_miss,
489    cache_data_prefetch_count,
490    cache_data_prefetch_block_count,
491    skip_multi_version_key_count,
492    skip_delete_key_count,
493    get_shared_buffer_hit_counts,
494    total_key_count,
495    processed_key_count,
496    vnode_checked_get_count,
497    vnode_pruned_get_count
498);
499
500macro_rules! define_bloom_filter_metrics {
501    ($($x:ident),*) => (
502        struct BloomFilterLocalMetrics {
503            $($x: LabelGuardedLocalIntCounter,)*
504        }
505
506        impl BloomFilterLocalMetrics {
507            pub fn new(metrics: &HummockStateStoreMetrics, table_id_label: &str, oper_type: &str) -> Self {
508                // checks SST bloom filters
509                Self {
510                    $($x: metrics.$x.with_guarded_label_values(&[table_id_label, oper_type]).local(),)*
511                }
512            }
513
514            pub fn flush(&mut self) {
515                $(
516                    self.$x.flush();
517                )*
518            }
519        }
520    )
521}
522
523define_bloom_filter_metrics!(
524    read_req_check_bloom_filter_counts,
525    bloom_filter_check_counts,
526    bloom_filter_true_negative_counts,
527    read_req_positive_but_non_exist_counts,
528    read_req_bloom_filter_positive_counts
529);
530
531pub struct GetLocalMetricsGuard {
532    metrics: Arc<HummockStateStoreMetrics>,
533    table_id: TableId,
534    pub local_stats: StoreLocalStatistic,
535}
536
537impl GetLocalMetricsGuard {
538    pub fn new(metrics: Arc<HummockStateStoreMetrics>, table_id: TableId) -> Self {
539        Self {
540            metrics,
541            table_id,
542            local_stats: StoreLocalStatistic::default(),
543        }
544    }
545}
546
547impl Drop for GetLocalMetricsGuard {
548    fn drop(&mut self) {
549        LOCAL_METRICS.with_borrow_mut(|local_metrics| {
550            let table_metrics = local_metrics
551                .entry(self.table_id.as_raw_id())
552                .or_insert_with(|| {
553                    LocalStoreMetrics::new(
554                        self.metrics.as_ref(),
555                        self.table_id.to_string().as_str(),
556                    )
557                });
558            self.local_stats.report(table_metrics);
559            self.local_stats
560                .report_bloom_filter_metrics(&table_metrics.get_filter_metrics);
561        });
562    }
563}
564
565pub struct IterLocalMetricsGuard {
566    metrics: Arc<HummockStateStoreMetrics>,
567    table_id: TableId,
568    pub local_stats: StoreLocalStatistic,
569}
570
571impl IterLocalMetricsGuard {
572    pub fn new(
573        metrics: Arc<HummockStateStoreMetrics>,
574        table_id: TableId,
575        local_stats: StoreLocalStatistic,
576    ) -> Self {
577        Self {
578            metrics,
579            table_id,
580            local_stats,
581        }
582    }
583}
584
585impl Drop for IterLocalMetricsGuard {
586    fn drop(&mut self) {
587        LOCAL_METRICS.with_borrow_mut(|local_metrics| {
588            let table_metrics = local_metrics
589                .entry(self.table_id.as_raw_id())
590                .or_insert_with(|| {
591                    LocalStoreMetrics::new(
592                        self.metrics.as_ref(),
593                        self.table_id.to_string().as_str(),
594                    )
595                });
596            self.local_stats.report(table_metrics);
597            self.local_stats
598                .report_bloom_filter_metrics(&table_metrics.iter_filter_metrics);
599        });
600    }
601}