1use std::cell::RefCell;
16use std::collections::HashMap;
17use std::sync::Arc;
18#[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::{AtomicU64, Ordering};
21
22use prometheus::local::{LocalHistogram, LocalIntCounter};
23use risingwave_common::catalog::TableId;
24use risingwave_common::metrics::LabelGuardedLocalIntCounter;
25use risingwave_hummock_sdk::table_stats::TableStatsMap;
26
27use super::HummockStateStoreMetrics;
28use crate::monitor::CompactorMetrics;
29
30thread_local!(static LOCAL_METRICS: RefCell<HashMap<u32,LocalStoreMetrics>> = RefCell::new(HashMap::default()));
31
32#[cfg(test)]
33pub(crate) fn flush_local_metrics_for_test() {
34 LOCAL_METRICS.with_borrow_mut(|local_metrics| {
35 for metrics in local_metrics.values_mut() {
36 metrics.flush();
37 }
38 });
39}
40
41#[derive(Default, Debug)]
42pub struct StoreLocalStatistic {
43 pub cache_data_block_miss: u64,
44 pub cache_data_block_total: u64,
45 pub cache_meta_block_miss: u64,
46 pub cache_meta_block_total: u64,
47 pub cache_data_prefetch_count: u64,
48 pub cache_data_prefetch_block_count: u64,
49
50 pub total_key_count: u64,
52 pub skip_multi_version_key_count: u64,
53 pub skip_delete_key_count: u64,
54 pub processed_key_count: u64,
55 pub bloom_filter_true_negative_counts: u64,
56 pub remote_io_time: Arc<AtomicU64>,
57 pub bloom_filter_check_counts: u64,
58 pub get_shared_buffer_hit_counts: u64,
59 pub staging_imm_iter_count: u64,
60 pub staging_sst_iter_count: u64,
61 pub overlapping_iter_count: u64,
62 pub non_overlapping_iter_count: u64,
63 pub sub_iter_count: u64,
64 pub found_key: bool,
65
66 pub staging_imm_get_count: u64,
67 pub staging_sst_get_count: u64,
68 pub overlapping_get_count: u64,
69 pub non_overlapping_get_count: u64,
70
71 pub vnode_checked_get_count: u64,
72 pub vnode_pruned_get_count: u64,
73
74 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
75 reported: AtomicBool,
76 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
77 added: AtomicBool,
78
79 pub skipped_by_watermark_table_stats: TableStatsMap,
83}
84
85impl StoreLocalStatistic {
86 pub fn add(&mut self, other: &StoreLocalStatistic) {
87 self.add_count(other);
88 self.add_histogram(other);
89 self.bloom_filter_true_negative_counts += other.bloom_filter_true_negative_counts;
90 self.remote_io_time.fetch_add(
91 other.remote_io_time.load(Ordering::Relaxed),
92 Ordering::Relaxed,
93 );
94 self.bloom_filter_check_counts += other.bloom_filter_check_counts;
95
96 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
97 if other.added.fetch_or(true, Ordering::Relaxed) || other.reported.load(Ordering::Relaxed) {
98 tracing::error!("double added\n{:#?}", other);
99 }
100 }
101
102 fn report(&self, metrics: &mut LocalStoreMetrics) {
103 metrics.add_count(self);
104 metrics.add_histogram(self);
105 let t = self.remote_io_time.load(Ordering::Relaxed) as f64;
106 if t > 0.0 {
107 metrics.remote_io_time.observe(t / 1000.0);
108 }
109
110 metrics.collect_count += 1;
111 if metrics.collect_count > FLUSH_LOCAL_METRICS_TIMES {
112 metrics.flush();
113 metrics.collect_count = 0;
114 }
115 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
116 if self.reported.fetch_or(true, Ordering::Relaxed) || self.added.load(Ordering::Relaxed) {
117 tracing::error!("double reported\n{:#?}", self);
118 }
119 }
120
121 pub fn discard(self) {
122 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
123 {
124 self.reported.fetch_or(true, Ordering::Relaxed);
125 }
126 }
127
128 pub fn report_compactor(&self, metrics: &CompactorMetrics) {
129 let t = self.remote_io_time.load(Ordering::Relaxed) as f64;
130 if t > 0.0 {
131 metrics.remote_read_time.observe(t / 1000.0);
132 }
133 if self.processed_key_count > 0 {
134 metrics
135 .iter_scan_key_counts
136 .with_label_values(&["processed"])
137 .inc_by(self.processed_key_count);
138 }
139
140 if self.skip_multi_version_key_count > 0 {
141 metrics
142 .iter_scan_key_counts
143 .with_label_values(&["skip_multi_version"])
144 .inc_by(self.skip_multi_version_key_count);
145 }
146
147 if self.skip_delete_key_count > 0 {
148 metrics
149 .iter_scan_key_counts
150 .with_label_values(&["skip_delete"])
151 .inc_by(self.skip_delete_key_count);
152 }
153
154 if self.total_key_count > 0 {
155 metrics
156 .iter_scan_key_counts
157 .with_label_values(&["total"])
158 .inc_by(self.total_key_count);
159 }
160
161 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
162 if self.reported.fetch_or(true, Ordering::Relaxed) || self.added.load(Ordering::Relaxed) {
163 tracing::error!("double reported\n{:#?}", self);
164 }
165 }
166
167 fn report_bloom_filter_metrics(&self, metrics: &BloomFilterLocalMetrics) {
168 if self.bloom_filter_check_counts == 0 {
169 return;
170 }
171 metrics
173 .bloom_filter_true_negative_counts
174 .inc_by(self.bloom_filter_true_negative_counts);
175 metrics
176 .bloom_filter_check_counts
177 .inc_by(self.bloom_filter_check_counts);
178 metrics.read_req_check_bloom_filter_counts.inc();
179
180 if self.bloom_filter_check_counts > self.bloom_filter_true_negative_counts {
181 if !self.found_key {
182 metrics.read_req_positive_but_non_exist_counts.inc();
186 }
187 metrics.read_req_bloom_filter_positive_counts.inc();
190 }
191 }
192
193 pub fn flush_all() {
194 LOCAL_METRICS.with_borrow_mut(|local_metrics| {
195 for metrics in local_metrics.values_mut() {
196 if metrics.collect_count > 0 {
197 metrics.flush();
198 metrics.collect_count = 0;
199 }
200 }
201 });
202 }
203
204 pub fn ignore(&self) {
205 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
206 self.reported.store(true, Ordering::Relaxed);
207 }
208
209 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
210 fn need_report(&self) -> bool {
211 self.cache_data_block_miss != 0
212 || self.cache_data_block_total != 0
213 || self.cache_meta_block_miss != 0
214 || self.cache_meta_block_total != 0
215 || self.cache_data_prefetch_count != 0
216 || self.skip_multi_version_key_count != 0
217 || self.skip_delete_key_count != 0
218 || self.processed_key_count != 0
219 || self.bloom_filter_true_negative_counts != 0
220 || self.remote_io_time.load(Ordering::Relaxed) != 0
221 || self.bloom_filter_check_counts != 0
222 || self.vnode_checked_get_count != 0
223 || self.vnode_pruned_get_count != 0
224 }
225}
226
227#[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
228impl Drop for StoreLocalStatistic {
229 fn drop(&mut self) {
230 if !self.reported.load(Ordering::Relaxed)
231 && !self.added.load(Ordering::Relaxed)
232 && self.need_report()
233 {
234 tracing::error!("local stats lost!\n{:#?}", self);
235 }
236 }
237}
238
239struct LocalStoreMetrics {
240 cache_data_block_total: LabelGuardedLocalIntCounter,
241 cache_data_block_miss: LabelGuardedLocalIntCounter,
242 cache_meta_block_total: LabelGuardedLocalIntCounter,
243 cache_meta_block_miss: LabelGuardedLocalIntCounter,
244 cache_data_prefetch_count: LabelGuardedLocalIntCounter,
245 cache_data_prefetch_block_count: LabelGuardedLocalIntCounter,
246 remote_io_time: LocalHistogram,
247 processed_key_count: LabelGuardedLocalIntCounter,
248 skip_multi_version_key_count: LabelGuardedLocalIntCounter,
249 skip_delete_key_count: LabelGuardedLocalIntCounter,
250 total_key_count: LabelGuardedLocalIntCounter,
251 get_shared_buffer_hit_counts: LocalIntCounter,
252 staging_imm_iter_count: LocalHistogram,
253 staging_sst_iter_count: LocalHistogram,
254 overlapping_iter_count: LocalHistogram,
255 non_overlapping_iter_count: LocalHistogram,
256 sub_iter_count: LocalHistogram,
257 iter_filter_metrics: BloomFilterLocalMetrics,
258 get_filter_metrics: BloomFilterLocalMetrics,
259 collect_count: usize,
260
261 staging_imm_get_count: LocalHistogram,
262 staging_sst_get_count: LocalHistogram,
263 overlapping_get_count: LocalHistogram,
264 non_overlapping_get_count: LocalHistogram,
265
266 vnode_checked_get_count: LabelGuardedLocalIntCounter,
267 vnode_pruned_get_count: LabelGuardedLocalIntCounter,
268}
269
270const FLUSH_LOCAL_METRICS_TIMES: usize = 32;
271
272impl LocalStoreMetrics {
273 pub fn new(metrics: &HummockStateStoreMetrics, table_id_label: &str) -> Self {
274 let cache_data_block_total = metrics
275 .sst_store_block_request_counts
276 .with_guarded_label_values(&[table_id_label, "data_total"])
277 .local();
278
279 let cache_data_block_miss = metrics
280 .sst_store_block_request_counts
281 .with_guarded_label_values(&[table_id_label, "data_miss"])
282 .local();
283
284 let cache_meta_block_total = metrics
285 .sst_store_block_request_counts
286 .with_guarded_label_values(&[table_id_label, "meta_total"])
287 .local();
288 let cache_data_prefetch_count = metrics
289 .sst_store_block_request_counts
290 .with_guarded_label_values(&[table_id_label, "prefetch_count"])
291 .local();
292 let cache_data_prefetch_block_count = metrics
293 .sst_store_block_request_counts
294 .with_guarded_label_values(&[table_id_label, "prefetch_data_count"])
295 .local();
296
297 let cache_meta_block_miss = metrics
298 .sst_store_block_request_counts
299 .with_guarded_label_values(&[table_id_label, "meta_miss"])
300 .local();
301
302 let remote_io_time = metrics
303 .remote_read_time
304 .with_label_values(&[table_id_label])
305 .local();
306
307 let processed_key_count = metrics
308 .iter_scan_key_counts
309 .with_guarded_label_values(&[table_id_label, "processed"])
310 .local();
311
312 let skip_multi_version_key_count = metrics
313 .iter_scan_key_counts
314 .with_guarded_label_values(&[table_id_label, "skip_multi_version"])
315 .local();
316
317 let skip_delete_key_count = metrics
318 .iter_scan_key_counts
319 .with_guarded_label_values(&[table_id_label, "skip_delete"])
320 .local();
321
322 let total_key_count = metrics
323 .iter_scan_key_counts
324 .with_guarded_label_values(&[table_id_label, "total"])
325 .local();
326
327 let get_shared_buffer_hit_counts = metrics
328 .get_shared_buffer_hit_counts
329 .with_label_values(&[table_id_label])
330 .local();
331
332 let staging_imm_iter_count = metrics
333 .iter_merge_sstable_counts
334 .with_label_values(&[table_id_label, "staging-imm-iter"])
335 .local();
336 let staging_sst_iter_count = metrics
337 .iter_merge_sstable_counts
338 .with_label_values(&[table_id_label, "staging-sst-iter"])
339 .local();
340 let overlapping_iter_count = metrics
341 .iter_merge_sstable_counts
342 .with_label_values(&[table_id_label, "committed-overlapping-iter"])
343 .local();
344 let non_overlapping_iter_count = metrics
345 .iter_merge_sstable_counts
346 .with_label_values(&[table_id_label, "committed-non-overlapping-iter"])
347 .local();
348 let sub_iter_count = metrics
349 .iter_merge_sstable_counts
350 .with_label_values(&[table_id_label, "sub-iter"])
351 .local();
352 let get_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "get");
353 let iter_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "iter");
354
355 let staging_imm_get_count = metrics
356 .iter_merge_sstable_counts
357 .with_label_values(&[table_id_label, "staging-imm-get"])
358 .local();
359 let staging_sst_get_count = metrics
360 .iter_merge_sstable_counts
361 .with_label_values(&[table_id_label, "staging-sst-get"])
362 .local();
363 let overlapping_get_count = metrics
364 .iter_merge_sstable_counts
365 .with_label_values(&[table_id_label, "committed-overlapping-get"])
366 .local();
367 let non_overlapping_get_count = metrics
368 .iter_merge_sstable_counts
369 .with_label_values(&[table_id_label, "committed-non-overlapping-get"])
370 .local();
371
372 let vnode_checked_get_count = metrics
373 .vnode_pruning_counts
374 .with_guarded_label_values(&[table_id_label, "get", "checked"])
375 .local();
376 let vnode_pruned_get_count = metrics
377 .vnode_pruning_counts
378 .with_guarded_label_values(&[table_id_label, "get", "pruned"])
379 .local();
380
381 Self {
382 cache_data_block_total,
383 cache_data_block_miss,
384 cache_meta_block_total,
385 cache_meta_block_miss,
386 cache_data_prefetch_count,
387 cache_data_prefetch_block_count,
388 remote_io_time,
389 processed_key_count,
390 skip_multi_version_key_count,
391 skip_delete_key_count,
392 total_key_count,
393 get_shared_buffer_hit_counts,
394 staging_imm_iter_count,
395 staging_sst_iter_count,
396 overlapping_iter_count,
397 sub_iter_count,
398 non_overlapping_iter_count,
399 get_filter_metrics,
400 iter_filter_metrics,
401 collect_count: 0,
402 staging_imm_get_count,
403 staging_sst_get_count,
404 overlapping_get_count,
405 non_overlapping_get_count,
406 vnode_checked_get_count,
407 vnode_pruned_get_count,
408 }
409 }
410
411 pub fn flush(&mut self) {
412 self.remote_io_time.flush();
413 self.iter_filter_metrics.flush();
414 self.get_filter_metrics.flush();
415 self.flush_histogram();
416 self.flush_count();
417 }
418}
419
420macro_rules! add_local_metrics_histogram {
421 ($($x:ident),*) => (
422 impl LocalStoreMetrics {
423 fn add_histogram(&self, stats: &StoreLocalStatistic) {
424 $(
425 self.$x.observe(stats.$x as f64);
426 )*
427 }
428
429 fn flush_histogram(&mut self) {
430 $(
431 self.$x.flush();
432 )*
433 }
434 }
435
436 impl StoreLocalStatistic {
437 fn add_histogram(&mut self, other: &StoreLocalStatistic) {
438 $(
439 self.$x += other.$x;
440 )*
441 }
442 }
443 )
444}
445
446add_local_metrics_histogram!(
447 staging_imm_iter_count,
448 staging_sst_iter_count,
449 overlapping_iter_count,
450 non_overlapping_iter_count,
451 sub_iter_count,
452 staging_imm_get_count,
453 staging_sst_get_count,
454 overlapping_get_count,
455 non_overlapping_get_count
456);
457
458macro_rules! add_local_metrics_count {
459 ($($x:ident),*) => (
460 impl LocalStoreMetrics {
461 fn add_count(&self, stats: &StoreLocalStatistic) {
462 $(
463 self.$x.inc_by(stats.$x);
464 )*
465 }
466
467 fn flush_count(&mut self) {
468 $(
469 self.$x.flush();
470 )*
471 }
472 }
473
474 impl StoreLocalStatistic {
475 fn add_count(&mut self, other: &StoreLocalStatistic) {
476 $(
477 self.$x += other.$x;
478 )*
479 }
480 }
481 )
482}
483
484add_local_metrics_count!(
485 cache_data_block_total,
486 cache_data_block_miss,
487 cache_meta_block_total,
488 cache_meta_block_miss,
489 cache_data_prefetch_count,
490 cache_data_prefetch_block_count,
491 skip_multi_version_key_count,
492 skip_delete_key_count,
493 get_shared_buffer_hit_counts,
494 total_key_count,
495 processed_key_count,
496 vnode_checked_get_count,
497 vnode_pruned_get_count
498);
499
500macro_rules! define_bloom_filter_metrics {
501 ($($x:ident),*) => (
502 struct BloomFilterLocalMetrics {
503 $($x: LabelGuardedLocalIntCounter,)*
504 }
505
506 impl BloomFilterLocalMetrics {
507 pub fn new(metrics: &HummockStateStoreMetrics, table_id_label: &str, oper_type: &str) -> Self {
508 Self {
510 $($x: metrics.$x.with_guarded_label_values(&[table_id_label, oper_type]).local(),)*
511 }
512 }
513
514 pub fn flush(&mut self) {
515 $(
516 self.$x.flush();
517 )*
518 }
519 }
520 )
521}
522
523define_bloom_filter_metrics!(
524 read_req_check_bloom_filter_counts,
525 bloom_filter_check_counts,
526 bloom_filter_true_negative_counts,
527 read_req_positive_but_non_exist_counts,
528 read_req_bloom_filter_positive_counts
529);
530
531pub struct GetLocalMetricsGuard {
532 metrics: Arc<HummockStateStoreMetrics>,
533 table_id: TableId,
534 pub local_stats: StoreLocalStatistic,
535}
536
537impl GetLocalMetricsGuard {
538 pub fn new(metrics: Arc<HummockStateStoreMetrics>, table_id: TableId) -> Self {
539 Self {
540 metrics,
541 table_id,
542 local_stats: StoreLocalStatistic::default(),
543 }
544 }
545}
546
547impl Drop for GetLocalMetricsGuard {
548 fn drop(&mut self) {
549 LOCAL_METRICS.with_borrow_mut(|local_metrics| {
550 let table_metrics = local_metrics
551 .entry(self.table_id.as_raw_id())
552 .or_insert_with(|| {
553 LocalStoreMetrics::new(
554 self.metrics.as_ref(),
555 self.table_id.to_string().as_str(),
556 )
557 });
558 self.local_stats.report(table_metrics);
559 self.local_stats
560 .report_bloom_filter_metrics(&table_metrics.get_filter_metrics);
561 });
562 }
563}
564
565pub struct IterLocalMetricsGuard {
566 metrics: Arc<HummockStateStoreMetrics>,
567 table_id: TableId,
568 pub local_stats: StoreLocalStatistic,
569}
570
571impl IterLocalMetricsGuard {
572 pub fn new(
573 metrics: Arc<HummockStateStoreMetrics>,
574 table_id: TableId,
575 local_stats: StoreLocalStatistic,
576 ) -> Self {
577 Self {
578 metrics,
579 table_id,
580 local_stats,
581 }
582 }
583}
584
585impl Drop for IterLocalMetricsGuard {
586 fn drop(&mut self) {
587 LOCAL_METRICS.with_borrow_mut(|local_metrics| {
588 let table_metrics = local_metrics
589 .entry(self.table_id.as_raw_id())
590 .or_insert_with(|| {
591 LocalStoreMetrics::new(
592 self.metrics.as_ref(),
593 self.table_id.to_string().as_str(),
594 )
595 });
596 self.local_stats.report(table_metrics);
597 self.local_stats
598 .report_bloom_filter_metrics(&table_metrics.iter_filter_metrics);
599 });
600 }
601}