1use std::cell::RefCell;
16use std::collections::HashMap;
17use std::sync::Arc;
18#[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::{AtomicU64, Ordering};
21
22use prometheus::local::{LocalHistogram, LocalIntCounter};
23use risingwave_common::catalog::TableId;
24use risingwave_common::metrics::LabelGuardedLocalIntCounter;
25use risingwave_hummock_sdk::table_stats::TableStatsMap;
26
27use super::HummockStateStoreMetrics;
28use crate::monitor::CompactorMetrics;
29
30thread_local!(static LOCAL_METRICS: RefCell<HashMap<u32,LocalStoreMetrics>> = RefCell::new(HashMap::default()));
31
32#[derive(Default, Debug)]
33pub struct StoreLocalStatistic {
34 pub cache_data_block_miss: u64,
35 pub cache_data_block_total: u64,
36 pub cache_meta_block_miss: u64,
37 pub cache_meta_block_total: u64,
38 pub cache_data_prefetch_count: u64,
39 pub cache_data_prefetch_block_count: u64,
40
41 pub total_key_count: u64,
43 pub skip_multi_version_key_count: u64,
44 pub skip_delete_key_count: u64,
45 pub processed_key_count: u64,
46 pub bloom_filter_true_negative_counts: u64,
47 pub remote_io_time: Arc<AtomicU64>,
48 pub bloom_filter_check_counts: u64,
49 pub get_shared_buffer_hit_counts: u64,
50 pub staging_imm_iter_count: u64,
51 pub staging_sst_iter_count: u64,
52 pub overlapping_iter_count: u64,
53 pub non_overlapping_iter_count: u64,
54 pub sub_iter_count: u64,
55 pub found_key: bool,
56
57 pub staging_imm_get_count: u64,
58 pub staging_sst_get_count: u64,
59 pub overlapping_get_count: u64,
60 pub non_overlapping_get_count: u64,
61
62 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
63 reported: AtomicBool,
64 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
65 added: AtomicBool,
66
67 pub skipped_by_watermark_table_stats: TableStatsMap,
71}
72
73impl StoreLocalStatistic {
74 pub fn add(&mut self, other: &StoreLocalStatistic) {
75 self.add_count(other);
76 self.add_histogram(other);
77 self.bloom_filter_true_negative_counts += other.bloom_filter_true_negative_counts;
78 self.remote_io_time.fetch_add(
79 other.remote_io_time.load(Ordering::Relaxed),
80 Ordering::Relaxed,
81 );
82 self.bloom_filter_check_counts += other.bloom_filter_check_counts;
83
84 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
85 if other.added.fetch_or(true, Ordering::Relaxed) || other.reported.load(Ordering::Relaxed) {
86 tracing::error!("double added\n{:#?}", other);
87 }
88 }
89
90 fn report(&self, metrics: &mut LocalStoreMetrics) {
91 metrics.add_count(self);
92 metrics.add_histogram(self);
93 let t = self.remote_io_time.load(Ordering::Relaxed) as f64;
94 if t > 0.0 {
95 metrics.remote_io_time.observe(t / 1000.0);
96 }
97
98 metrics.collect_count += 1;
99 if metrics.collect_count > FLUSH_LOCAL_METRICS_TIMES {
100 metrics.flush();
101 metrics.collect_count = 0;
102 }
103 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
104 if self.reported.fetch_or(true, Ordering::Relaxed) || self.added.load(Ordering::Relaxed) {
105 tracing::error!("double reported\n{:#?}", self);
106 }
107 }
108
109 pub fn discard(self) {
110 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
111 {
112 self.reported.fetch_or(true, Ordering::Relaxed);
113 }
114 }
115
116 pub fn report_compactor(&self, metrics: &CompactorMetrics) {
117 let t = self.remote_io_time.load(Ordering::Relaxed) as f64;
118 if t > 0.0 {
119 metrics.remote_read_time.observe(t / 1000.0);
120 }
121 if self.processed_key_count > 0 {
122 metrics
123 .iter_scan_key_counts
124 .with_label_values(&["processed"])
125 .inc_by(self.processed_key_count);
126 }
127
128 if self.skip_multi_version_key_count > 0 {
129 metrics
130 .iter_scan_key_counts
131 .with_label_values(&["skip_multi_version"])
132 .inc_by(self.skip_multi_version_key_count);
133 }
134
135 if self.skip_delete_key_count > 0 {
136 metrics
137 .iter_scan_key_counts
138 .with_label_values(&["skip_delete"])
139 .inc_by(self.skip_delete_key_count);
140 }
141
142 if self.total_key_count > 0 {
143 metrics
144 .iter_scan_key_counts
145 .with_label_values(&["total"])
146 .inc_by(self.total_key_count);
147 }
148
149 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
150 if self.reported.fetch_or(true, Ordering::Relaxed) || self.added.load(Ordering::Relaxed) {
151 tracing::error!("double reported\n{:#?}", self);
152 }
153 }
154
155 fn report_bloom_filter_metrics(&self, metrics: &BloomFilterLocalMetrics) {
156 if self.bloom_filter_check_counts == 0 {
157 return;
158 }
159 metrics
161 .bloom_filter_true_negative_counts
162 .inc_by(self.bloom_filter_true_negative_counts);
163 metrics
164 .bloom_filter_check_counts
165 .inc_by(self.bloom_filter_check_counts);
166 metrics.read_req_check_bloom_filter_counts.inc();
167
168 if self.bloom_filter_check_counts > self.bloom_filter_true_negative_counts {
169 if !self.found_key {
170 metrics.read_req_positive_but_non_exist_counts.inc();
174 }
175 metrics.read_req_bloom_filter_positive_counts.inc();
178 }
179 }
180
181 pub fn flush_all() {
182 LOCAL_METRICS.with_borrow_mut(|local_metrics| {
183 for metrics in local_metrics.values_mut() {
184 if metrics.collect_count > 0 {
185 metrics.flush();
186 metrics.collect_count = 0;
187 }
188 }
189 });
190 }
191
192 pub fn ignore(&self) {
193 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
194 self.reported.store(true, Ordering::Relaxed);
195 }
196
197 #[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
198 fn need_report(&self) -> bool {
199 self.cache_data_block_miss != 0
200 || self.cache_data_block_total != 0
201 || self.cache_meta_block_miss != 0
202 || self.cache_meta_block_total != 0
203 || self.cache_data_prefetch_count != 0
204 || self.skip_multi_version_key_count != 0
205 || self.skip_delete_key_count != 0
206 || self.processed_key_count != 0
207 || self.bloom_filter_true_negative_counts != 0
208 || self.remote_io_time.load(Ordering::Relaxed) != 0
209 || self.bloom_filter_check_counts != 0
210 }
211}
212
213#[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
214impl Drop for StoreLocalStatistic {
215 fn drop(&mut self) {
216 if !self.reported.load(Ordering::Relaxed)
217 && !self.added.load(Ordering::Relaxed)
218 && self.need_report()
219 {
220 tracing::error!("local stats lost!\n{:#?}", self);
221 }
222 }
223}
224
225struct LocalStoreMetrics {
226 cache_data_block_total: LabelGuardedLocalIntCounter<2>,
227 cache_data_block_miss: LabelGuardedLocalIntCounter<2>,
228 cache_meta_block_total: LabelGuardedLocalIntCounter<2>,
229 cache_meta_block_miss: LabelGuardedLocalIntCounter<2>,
230 cache_data_prefetch_count: LabelGuardedLocalIntCounter<2>,
231 cache_data_prefetch_block_count: LabelGuardedLocalIntCounter<2>,
232 remote_io_time: LocalHistogram,
233 processed_key_count: LabelGuardedLocalIntCounter<2>,
234 skip_multi_version_key_count: LabelGuardedLocalIntCounter<2>,
235 skip_delete_key_count: LabelGuardedLocalIntCounter<2>,
236 total_key_count: LabelGuardedLocalIntCounter<2>,
237 get_shared_buffer_hit_counts: LocalIntCounter,
238 staging_imm_iter_count: LocalHistogram,
239 staging_sst_iter_count: LocalHistogram,
240 overlapping_iter_count: LocalHistogram,
241 non_overlapping_iter_count: LocalHistogram,
242 sub_iter_count: LocalHistogram,
243 iter_filter_metrics: BloomFilterLocalMetrics,
244 get_filter_metrics: BloomFilterLocalMetrics,
245 collect_count: usize,
246
247 staging_imm_get_count: LocalHistogram,
248 staging_sst_get_count: LocalHistogram,
249 overlapping_get_count: LocalHistogram,
250 non_overlapping_get_count: LocalHistogram,
251}
252
253const FLUSH_LOCAL_METRICS_TIMES: usize = 32;
254
255impl LocalStoreMetrics {
256 pub fn new(metrics: &HummockStateStoreMetrics, table_id_label: &str) -> Self {
257 let cache_data_block_total = metrics
258 .sst_store_block_request_counts
259 .with_guarded_label_values(&[table_id_label, "data_total"])
260 .local();
261
262 let cache_data_block_miss = metrics
263 .sst_store_block_request_counts
264 .with_guarded_label_values(&[table_id_label, "data_miss"])
265 .local();
266
267 let cache_meta_block_total = metrics
268 .sst_store_block_request_counts
269 .with_guarded_label_values(&[table_id_label, "meta_total"])
270 .local();
271 let cache_data_prefetch_count = metrics
272 .sst_store_block_request_counts
273 .with_guarded_label_values(&[table_id_label, "prefetch_count"])
274 .local();
275 let cache_data_prefetch_block_count = metrics
276 .sst_store_block_request_counts
277 .with_guarded_label_values(&[table_id_label, "prefetch_data_count"])
278 .local();
279
280 let cache_meta_block_miss = metrics
281 .sst_store_block_request_counts
282 .with_guarded_label_values(&[table_id_label, "meta_miss"])
283 .local();
284
285 let remote_io_time = metrics
286 .remote_read_time
287 .with_label_values(&[table_id_label])
288 .local();
289
290 let processed_key_count = metrics
291 .iter_scan_key_counts
292 .with_guarded_label_values(&[table_id_label, "processed"])
293 .local();
294
295 let skip_multi_version_key_count = metrics
296 .iter_scan_key_counts
297 .with_guarded_label_values(&[table_id_label, "skip_multi_version"])
298 .local();
299
300 let skip_delete_key_count = metrics
301 .iter_scan_key_counts
302 .with_guarded_label_values(&[table_id_label, "skip_delete"])
303 .local();
304
305 let total_key_count = metrics
306 .iter_scan_key_counts
307 .with_guarded_label_values(&[table_id_label, "total"])
308 .local();
309
310 let get_shared_buffer_hit_counts = metrics
311 .get_shared_buffer_hit_counts
312 .with_label_values(&[table_id_label])
313 .local();
314
315 let staging_imm_iter_count = metrics
316 .iter_merge_sstable_counts
317 .with_label_values(&[table_id_label, "staging-imm-iter"])
318 .local();
319 let staging_sst_iter_count = metrics
320 .iter_merge_sstable_counts
321 .with_label_values(&[table_id_label, "staging-sst-iter"])
322 .local();
323 let overlapping_iter_count = metrics
324 .iter_merge_sstable_counts
325 .with_label_values(&[table_id_label, "committed-overlapping-iter"])
326 .local();
327 let non_overlapping_iter_count = metrics
328 .iter_merge_sstable_counts
329 .with_label_values(&[table_id_label, "committed-non-overlapping-iter"])
330 .local();
331 let sub_iter_count = metrics
332 .iter_merge_sstable_counts
333 .with_label_values(&[table_id_label, "sub-iter"])
334 .local();
335 let get_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "get");
336 let iter_filter_metrics = BloomFilterLocalMetrics::new(metrics, table_id_label, "iter");
337
338 let staging_imm_get_count = metrics
339 .iter_merge_sstable_counts
340 .with_label_values(&[table_id_label, "staging-imm-get"])
341 .local();
342 let staging_sst_get_count = metrics
343 .iter_merge_sstable_counts
344 .with_label_values(&[table_id_label, "staging-sst-get"])
345 .local();
346 let overlapping_get_count = metrics
347 .iter_merge_sstable_counts
348 .with_label_values(&[table_id_label, "committed-overlapping-get"])
349 .local();
350 let non_overlapping_get_count = metrics
351 .iter_merge_sstable_counts
352 .with_label_values(&[table_id_label, "committed-non-overlapping-get"])
353 .local();
354
355 Self {
356 cache_data_block_total,
357 cache_data_block_miss,
358 cache_meta_block_total,
359 cache_meta_block_miss,
360 cache_data_prefetch_count,
361 cache_data_prefetch_block_count,
362 remote_io_time,
363 processed_key_count,
364 skip_multi_version_key_count,
365 skip_delete_key_count,
366 total_key_count,
367 get_shared_buffer_hit_counts,
368 staging_imm_iter_count,
369 staging_sst_iter_count,
370 overlapping_iter_count,
371 sub_iter_count,
372 non_overlapping_iter_count,
373 get_filter_metrics,
374 iter_filter_metrics,
375 collect_count: 0,
376 staging_imm_get_count,
377 staging_sst_get_count,
378 overlapping_get_count,
379 non_overlapping_get_count,
380 }
381 }
382
383 pub fn flush(&mut self) {
384 self.remote_io_time.flush();
385 self.iter_filter_metrics.flush();
386 self.get_filter_metrics.flush();
387 self.flush_histogram();
388 self.flush_count();
389 }
390}
391
392macro_rules! add_local_metrics_histogram {
393 ($($x:ident),*) => (
394 impl LocalStoreMetrics {
395 fn add_histogram(&self, stats: &StoreLocalStatistic) {
396 $(
397 self.$x.observe(stats.$x as f64);
398 )*
399 }
400
401 fn flush_histogram(&mut self) {
402 $(
403 self.$x.flush();
404 )*
405 }
406 }
407
408 impl StoreLocalStatistic {
409 fn add_histogram(&mut self, other: &StoreLocalStatistic) {
410 $(
411 self.$x += other.$x;
412 )*
413 }
414 }
415 )
416}
417
418add_local_metrics_histogram!(
419 staging_imm_iter_count,
420 staging_sst_iter_count,
421 overlapping_iter_count,
422 non_overlapping_iter_count,
423 sub_iter_count,
424 staging_imm_get_count,
425 staging_sst_get_count,
426 overlapping_get_count,
427 non_overlapping_get_count
428);
429
430macro_rules! add_local_metrics_count {
431 ($($x:ident),*) => (
432 impl LocalStoreMetrics {
433 fn add_count(&self, stats: &StoreLocalStatistic) {
434 $(
435 self.$x.inc_by(stats.$x);
436 )*
437 }
438
439 fn flush_count(&mut self) {
440 $(
441 self.$x.flush();
442 )*
443 }
444 }
445
446 impl StoreLocalStatistic {
447 fn add_count(&mut self, other: &StoreLocalStatistic) {
448 $(
449 self.$x += other.$x;
450 )*
451 }
452 }
453 )
454}
455
456add_local_metrics_count!(
457 cache_data_block_total,
458 cache_data_block_miss,
459 cache_meta_block_total,
460 cache_meta_block_miss,
461 cache_data_prefetch_count,
462 cache_data_prefetch_block_count,
463 skip_multi_version_key_count,
464 skip_delete_key_count,
465 get_shared_buffer_hit_counts,
466 total_key_count,
467 processed_key_count
468);
469
470macro_rules! define_bloom_filter_metrics {
471 ($($x:ident),*) => (
472 struct BloomFilterLocalMetrics {
473 $($x: LabelGuardedLocalIntCounter<2>,)*
474 }
475
476 impl BloomFilterLocalMetrics {
477 pub fn new(metrics: &HummockStateStoreMetrics, table_id_label: &str, oper_type: &str) -> Self {
478 Self {
480 $($x: metrics.$x.with_guarded_label_values(&[table_id_label, oper_type]).local(),)*
481 }
482 }
483
484 pub fn flush(&mut self) {
485 $(
486 self.$x.flush();
487 )*
488 }
489 }
490 )
491}
492
493define_bloom_filter_metrics!(
494 read_req_check_bloom_filter_counts,
495 bloom_filter_check_counts,
496 bloom_filter_true_negative_counts,
497 read_req_positive_but_non_exist_counts,
498 read_req_bloom_filter_positive_counts
499);
500
501pub struct GetLocalMetricsGuard {
502 metrics: Arc<HummockStateStoreMetrics>,
503 table_id: TableId,
504 pub local_stats: StoreLocalStatistic,
505}
506
507impl GetLocalMetricsGuard {
508 pub fn new(metrics: Arc<HummockStateStoreMetrics>, table_id: TableId) -> Self {
509 Self {
510 metrics,
511 table_id,
512 local_stats: StoreLocalStatistic::default(),
513 }
514 }
515}
516
517impl Drop for GetLocalMetricsGuard {
518 fn drop(&mut self) {
519 LOCAL_METRICS.with_borrow_mut(|local_metrics| {
520 let table_metrics = local_metrics
521 .entry(self.table_id.table_id)
522 .or_insert_with(|| {
523 LocalStoreMetrics::new(
524 self.metrics.as_ref(),
525 self.table_id.to_string().as_str(),
526 )
527 });
528 self.local_stats.report(table_metrics);
529 self.local_stats
530 .report_bloom_filter_metrics(&table_metrics.get_filter_metrics);
531 });
532 }
533}
534
535pub struct IterLocalMetricsGuard {
536 metrics: Arc<HummockStateStoreMetrics>,
537 table_id: TableId,
538 pub local_stats: StoreLocalStatistic,
539}
540
541impl IterLocalMetricsGuard {
542 pub fn new(
543 metrics: Arc<HummockStateStoreMetrics>,
544 table_id: TableId,
545 local_stats: StoreLocalStatistic,
546 ) -> Self {
547 Self {
548 metrics,
549 table_id,
550 local_stats,
551 }
552 }
553}
554
555impl Drop for IterLocalMetricsGuard {
556 fn drop(&mut self) {
557 LOCAL_METRICS.with_borrow_mut(|local_metrics| {
558 let table_metrics = local_metrics
559 .entry(self.table_id.table_id)
560 .or_insert_with(|| {
561 LocalStoreMetrics::new(
562 self.metrics.as_ref(),
563 self.table_id.to_string().as_str(),
564 )
565 });
566 self.local_stats.report(table_metrics);
567 self.local_stats
568 .report_bloom_filter_metrics(&table_metrics.iter_filter_metrics);
569 });
570 }
571}