1use std::sync::{Arc, OnceLock};
16
17use prometheus::core::{AtomicU64, Collector, Desc, GenericCounter};
18use prometheus::{
19 Gauge, Histogram, HistogramVec, IntGauge, Opts, Registry, exponential_buckets, histogram_opts,
20 proto, register_histogram_vec_with_registry, register_histogram_with_registry,
21 register_int_counter_vec_with_registry, register_int_gauge_with_registry,
22};
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25 RelabeledCounterVec, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
26 RelabeledHistogramVec, RelabeledMetricVec, UintGauge,
27};
28use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
29use risingwave_common::{
30 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
31};
32use thiserror_ext::AsReport;
33use tracing::warn;
34
35#[derive(Debug, Clone)]
41pub struct HummockStateStoreMetrics {
42 pub bloom_filter_true_negative_counts: RelabeledGuardedIntCounterVec<2>,
43 pub bloom_filter_check_counts: RelabeledGuardedIntCounterVec<2>,
44 pub iter_merge_sstable_counts: RelabeledHistogramVec,
45 pub sst_store_block_request_counts: RelabeledGuardedIntCounterVec<2>,
46 pub iter_scan_key_counts: RelabeledGuardedIntCounterVec<2>,
47 pub get_shared_buffer_hit_counts: RelabeledCounterVec,
48 pub remote_read_time: RelabeledHistogramVec,
49 pub iter_fetch_meta_duration: RelabeledGuardedHistogramVec<1>,
50 pub iter_fetch_meta_cache_unhits: IntGauge,
51 pub iter_slow_fetch_meta_cache_unhits: IntGauge,
52
53 pub read_req_bloom_filter_positive_counts: RelabeledGuardedIntCounterVec<2>,
54 pub read_req_positive_but_non_exist_counts: RelabeledGuardedIntCounterVec<2>,
55 pub read_req_check_bloom_filter_counts: RelabeledGuardedIntCounterVec<2>,
56
57 pub write_batch_tuple_counts: RelabeledCounterVec,
58 pub write_batch_duration: RelabeledHistogramVec,
59 pub write_batch_size: RelabeledHistogramVec,
60
61 pub merge_imm_task_counts: RelabeledCounterVec,
63 pub merge_imm_batch_memory_sz: RelabeledCounterVec,
65
66 pub spill_task_counts_from_unsealed: GenericCounter<AtomicU64>,
68 pub spill_task_size_from_unsealed: GenericCounter<AtomicU64>,
70 pub spill_task_counts_from_sealed: GenericCounter<AtomicU64>,
72 pub spill_task_size_from_sealed: GenericCounter<AtomicU64>,
74
75 pub uploader_uploading_task_size: UintGauge,
77 pub uploader_uploading_task_count: IntGauge,
78 pub uploader_imm_size: UintGauge,
79 pub uploader_upload_task_latency: Histogram,
80 pub uploader_syncing_epoch_count: IntGauge,
81 pub uploader_wait_poll_latency: Histogram,
82
83 pub mem_table_spill_counts: RelabeledCounterVec,
85 pub old_value_size: IntGauge,
86
87 pub block_efficiency_histogram: Histogram,
89
90 pub event_handler_pending_event: IntGauge,
91 pub event_handler_latency: HistogramVec,
92
93 pub safe_version_hit: GenericCounter<AtomicU64>,
94 pub safe_version_miss: GenericCounter<AtomicU64>,
95}
96
97pub static GLOBAL_HUMMOCK_STATE_STORE_METRICS: OnceLock<HummockStateStoreMetrics> = OnceLock::new();
98
99pub fn global_hummock_state_store_metrics(metric_level: MetricLevel) -> HummockStateStoreMetrics {
100 GLOBAL_HUMMOCK_STATE_STORE_METRICS
101 .get_or_init(|| HummockStateStoreMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
102 .clone()
103}
104
105impl HummockStateStoreMetrics {
106 pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
107 let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
109
110 let state_store_read_time_buckets = exponential_buckets(0.001, 10.0, 5).unwrap();
112
113 let bloom_filter_true_negative_counts = register_guarded_int_counter_vec_with_registry!(
114 "state_store_bloom_filter_true_negative_counts",
115 "Total number of sstables that have been considered true negative by bloom filters",
116 &["table_id", "type"],
117 registry
118 )
119 .unwrap();
120 let bloom_filter_true_negative_counts = RelabeledMetricVec::with_metric_level(
121 MetricLevel::Debug,
122 bloom_filter_true_negative_counts,
123 metric_level,
124 );
125
126 let bloom_filter_check_counts = register_guarded_int_counter_vec_with_registry!(
127 "state_store_bloom_filter_check_counts",
128 "Total number of read request to check bloom filters",
129 &["table_id", "type"],
130 registry
131 )
132 .unwrap();
133 let bloom_filter_check_counts = RelabeledMetricVec::with_metric_level(
134 MetricLevel::Debug,
135 bloom_filter_check_counts,
136 metric_level,
137 );
138
139 let opts = histogram_opts!(
141 "state_store_iter_merge_sstable_counts",
142 "Number of child iterators merged into one MergeIterator",
143 vec![1.0, 10.0, 100.0, 1000.0, 10000.0]
144 );
145 let iter_merge_sstable_counts =
146 register_histogram_vec_with_registry!(opts, &["table_id", "type"], registry).unwrap();
147 let iter_merge_sstable_counts = RelabeledHistogramVec::with_metric_level(
148 MetricLevel::Debug,
149 iter_merge_sstable_counts,
150 metric_level,
151 );
152
153 let sst_store_block_request_counts = register_guarded_int_counter_vec_with_registry!(
155 "state_store_sst_store_block_request_counts",
156 "Total number of sst block requests that have been issued to sst store",
157 &["table_id", "type"],
158 registry
159 )
160 .unwrap();
161 let sst_store_block_request_counts = RelabeledGuardedIntCounterVec::with_metric_level(
162 MetricLevel::Critical,
163 sst_store_block_request_counts,
164 metric_level,
165 );
166
167 let iter_scan_key_counts = register_guarded_int_counter_vec_with_registry!(
168 "state_store_iter_scan_key_counts",
169 "Total number of keys read by iterator",
170 &["table_id", "type"],
171 registry
172 )
173 .unwrap();
174 let iter_scan_key_counts = RelabeledGuardedIntCounterVec::with_metric_level(
175 MetricLevel::Info,
176 iter_scan_key_counts,
177 metric_level,
178 );
179
180 let get_shared_buffer_hit_counts = register_int_counter_vec_with_registry!(
181 "state_store_get_shared_buffer_hit_counts",
182 "Total number of get requests that have been fulfilled by shared buffer",
183 &["table_id"],
184 registry
185 )
186 .unwrap();
187 let get_shared_buffer_hit_counts = RelabeledCounterVec::with_metric_level(
188 MetricLevel::Debug,
189 get_shared_buffer_hit_counts,
190 metric_level,
191 );
192
193 let opts = histogram_opts!(
194 "state_store_remote_read_time_per_task",
195 "Total time of operations which read from remote storage when enable prefetch",
196 time_buckets.clone(),
197 );
198 let remote_read_time =
199 register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
200 let remote_read_time = RelabeledHistogramVec::with_metric_level(
201 MetricLevel::Debug,
202 remote_read_time,
203 metric_level,
204 );
205
206 let opts = histogram_opts!(
207 "state_store_iter_fetch_meta_duration",
208 "Histogram of iterator fetch SST meta time that have been issued to state store",
209 state_store_read_time_buckets.clone(),
210 );
211 let iter_fetch_meta_duration =
212 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
213 let iter_fetch_meta_duration = RelabeledGuardedHistogramVec::with_metric_level(
214 MetricLevel::Info,
215 iter_fetch_meta_duration,
216 metric_level,
217 );
218
219 let iter_fetch_meta_cache_unhits = register_int_gauge_with_registry!(
220 "state_store_iter_fetch_meta_cache_unhits",
221 "Number of SST meta cache unhit during one iterator meta fetch",
222 registry
223 )
224 .unwrap();
225
226 let iter_slow_fetch_meta_cache_unhits = register_int_gauge_with_registry!(
227 "state_store_iter_slow_fetch_meta_cache_unhits",
228 "Number of SST meta cache unhit during a iterator meta fetch which is slow (costs >5 seconds)",
229 registry
230 )
231 .unwrap();
232
233 let write_batch_tuple_counts = register_int_counter_vec_with_registry!(
235 "state_store_write_batch_tuple_counts",
236 "Total number of batched write kv pairs requests that have been issued to state store",
237 &["table_id"],
238 registry
239 )
240 .unwrap();
241 let write_batch_tuple_counts = RelabeledCounterVec::with_metric_level(
242 MetricLevel::Debug,
243 write_batch_tuple_counts,
244 metric_level,
245 );
246
247 let opts = histogram_opts!(
248 "state_store_write_batch_duration",
249 "Total time of batched write that have been issued to state store. With shared buffer on, this is the latency writing to the shared buffer",
250 time_buckets.clone()
251 );
252 let write_batch_duration =
253 register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
254 let write_batch_duration = RelabeledHistogramVec::with_metric_level(
255 MetricLevel::Debug,
256 write_batch_duration,
257 metric_level,
258 );
259
260 let opts = histogram_opts!(
261 "state_store_write_batch_size",
262 "Total size of batched write that have been issued to state store",
263 exponential_buckets(256.0, 16.0, 7).unwrap() );
265 let write_batch_size =
266 register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
267 let write_batch_size = RelabeledHistogramVec::with_metric_level(
268 MetricLevel::Debug,
269 write_batch_size,
270 metric_level,
271 );
272
273 let merge_imm_task_counts = register_int_counter_vec_with_registry!(
274 "state_store_merge_imm_task_counts",
275 "Total number of merge imm task that have been finished",
276 &["table_id"],
277 registry
278 )
279 .unwrap();
280 let merge_imm_task_counts = RelabeledCounterVec::with_metric_level(
281 MetricLevel::Debug,
282 merge_imm_task_counts,
283 metric_level,
284 );
285
286 let merge_imm_batch_memory_sz = register_int_counter_vec_with_registry!(
287 "state_store_merge_imm_memory_sz",
288 "Number of imm batches that have been merged by a merge task",
289 &["table_id"],
290 registry
291 )
292 .unwrap();
293 let merge_imm_batch_memory_sz = RelabeledCounterVec::with_metric_level(
294 MetricLevel::Debug,
295 merge_imm_batch_memory_sz,
296 metric_level,
297 );
298
299 let spill_task_counts = register_int_counter_vec_with_registry!(
300 "state_store_spill_task_counts",
301 "Total number of started spill tasks",
302 &["uploader_stage"],
303 registry
304 )
305 .unwrap();
306
307 let spill_task_size = register_int_counter_vec_with_registry!(
308 "state_store_spill_task_size",
309 "Total task of started spill tasks",
310 &["uploader_stage"],
311 registry
312 )
313 .unwrap();
314
315 let uploader_uploading_task_size = UintGauge::new(
316 "state_store_uploader_uploading_task_size",
317 "Total size of uploader uploading tasks",
318 )
319 .unwrap();
320 registry
321 .register(Box::new(uploader_uploading_task_size.clone()))
322 .unwrap();
323
324 let uploader_uploading_task_count = register_int_gauge_with_registry!(
325 "state_store_uploader_uploading_task_count",
326 "Total number of uploader uploading tasks",
327 registry
328 )
329 .unwrap();
330
331 let uploader_imm_size = UintGauge::new(
332 "state_store_uploader_imm_size",
333 "Total size of imms tracked by uploader",
334 )
335 .unwrap();
336 registry
337 .register(Box::new(uploader_imm_size.clone()))
338 .unwrap();
339
340 let opts = histogram_opts!(
341 "state_store_uploader_upload_task_latency",
342 "Latency of uploader uploading tasks",
343 time_buckets
344 );
345
346 let uploader_upload_task_latency =
347 register_histogram_with_registry!(opts, registry).unwrap();
348
349 let opts = histogram_opts!(
350 "state_store_uploader_wait_poll_latency",
351 "Latency of upload uploading task being polled after finish",
352 exponential_buckets(0.001, 5.0, 7).unwrap(), );
354
355 let uploader_wait_poll_latency = register_histogram_with_registry!(opts, registry).unwrap();
356
357 let uploader_syncing_epoch_count = register_int_gauge_with_registry!(
358 "state_store_uploader_syncing_epoch_count",
359 "Total number of syncing epoch",
360 registry
361 )
362 .unwrap();
363
364 let read_req_bloom_filter_positive_counts = register_guarded_int_counter_vec_with_registry!(
365 "state_store_read_req_bloom_filter_positive_counts",
366 "Total number of read request with at least one SST bloom filter check returns positive",
367 &["table_id", "type"],
368 registry
369 )
370 .unwrap();
371 let read_req_bloom_filter_positive_counts =
372 RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
373 MetricLevel::Info,
374 read_req_bloom_filter_positive_counts,
375 metric_level,
376 1,
377 );
378
379 let read_req_positive_but_non_exist_counts = register_guarded_int_counter_vec_with_registry!(
380 "state_store_read_req_positive_but_non_exist_counts",
381 "Total number of read request on non-existent key/prefix with at least one SST bloom filter check returns positive",
382 &["table_id", "type"],
383 registry
384 )
385 .unwrap();
386 let read_req_positive_but_non_exist_counts =
387 RelabeledGuardedIntCounterVec::with_metric_level(
388 MetricLevel::Info,
389 read_req_positive_but_non_exist_counts,
390 metric_level,
391 );
392
393 let read_req_check_bloom_filter_counts = register_guarded_int_counter_vec_with_registry!(
394 "state_store_read_req_check_bloom_filter_counts",
395 "Total number of read request that checks bloom filter with a prefix hint",
396 &["table_id", "type"],
397 registry
398 )
399 .unwrap();
400
401 let read_req_check_bloom_filter_counts = RelabeledGuardedIntCounterVec::with_metric_level(
402 MetricLevel::Info,
403 read_req_check_bloom_filter_counts,
404 metric_level,
405 );
406
407 let mem_table_spill_counts = register_int_counter_vec_with_registry!(
408 "state_store_mem_table_spill_counts",
409 "Total number of mem table spill occurs for one table",
410 &["table_id"],
411 registry
412 )
413 .unwrap();
414
415 let mem_table_spill_counts = RelabeledCounterVec::with_metric_level(
416 MetricLevel::Info,
417 mem_table_spill_counts,
418 metric_level,
419 );
420
421 let old_value_size = register_int_gauge_with_registry!(
422 "state_store_old_value_size",
423 "The size of old value",
424 registry
425 )
426 .unwrap();
427
428 let opts = histogram_opts!(
429 "block_efficiency_histogram",
430 "Access ratio of in-memory block.",
431 exponential_buckets(0.001, 2.0, 11).unwrap(),
432 );
433 let block_efficiency_histogram = register_histogram_with_registry!(opts, registry).unwrap();
434
435 let event_handler_pending_event = register_int_gauge_with_registry!(
436 "state_store_event_handler_pending_event",
437 "The number of sent but unhandled events",
438 registry,
439 )
440 .unwrap();
441
442 let opts = histogram_opts!(
443 "state_store_event_handler_latency",
444 "Latency to handle event",
445 exponential_buckets(0.001, 5.0, 7).unwrap(), );
447
448 let event_handler_latency =
449 register_histogram_vec_with_registry!(opts, &["event_type"], registry).unwrap();
450
451 let safe_version_hit = GenericCounter::new(
452 "state_store_safe_version_hit",
453 "The total count of a safe version that can be retrieved successfully",
454 )
455 .unwrap();
456 registry
457 .register(Box::new(safe_version_hit.clone()))
458 .unwrap();
459
460 let safe_version_miss = GenericCounter::new(
461 "state_store_safe_version_miss",
462 "The total count of a safe version that cannot be retrieved",
463 )
464 .unwrap();
465 registry
466 .register(Box::new(safe_version_miss.clone()))
467 .unwrap();
468
469 Self {
470 bloom_filter_true_negative_counts,
471 bloom_filter_check_counts,
472 iter_merge_sstable_counts,
473 sst_store_block_request_counts,
474 iter_scan_key_counts,
475 get_shared_buffer_hit_counts,
476 remote_read_time,
477 iter_fetch_meta_duration,
478 iter_fetch_meta_cache_unhits,
479 iter_slow_fetch_meta_cache_unhits,
480 read_req_bloom_filter_positive_counts,
481 read_req_positive_but_non_exist_counts,
482 read_req_check_bloom_filter_counts,
483 write_batch_tuple_counts,
484 write_batch_duration,
485 write_batch_size,
486 merge_imm_task_counts,
487 merge_imm_batch_memory_sz,
488 spill_task_counts_from_sealed: spill_task_counts.with_label_values(&["sealed"]),
489 spill_task_counts_from_unsealed: spill_task_counts.with_label_values(&["unsealed"]),
490 spill_task_size_from_sealed: spill_task_size.with_label_values(&["sealed"]),
491 spill_task_size_from_unsealed: spill_task_size.with_label_values(&["unsealed"]),
492 uploader_uploading_task_size,
493 uploader_uploading_task_count,
494 uploader_imm_size,
495 uploader_upload_task_latency,
496 uploader_syncing_epoch_count,
497 uploader_wait_poll_latency,
498 mem_table_spill_counts,
499 old_value_size,
500
501 block_efficiency_histogram,
502 event_handler_pending_event,
503 event_handler_latency,
504 safe_version_hit,
505 safe_version_miss,
506 }
507 }
508
509 pub fn unused() -> Self {
510 global_hummock_state_store_metrics(MetricLevel::Disabled)
511 }
512}
513
514pub trait MemoryCollector: Sync + Send {
515 fn get_meta_memory_usage(&self) -> u64;
516 fn get_data_memory_usage(&self) -> u64;
517 fn get_uploading_memory_usage(&self) -> u64;
518 fn get_prefetch_memory_usage(&self) -> usize;
519 fn get_meta_cache_memory_usage_ratio(&self) -> f64;
520 fn get_block_cache_memory_usage_ratio(&self) -> f64;
521 fn get_shared_buffer_usage_ratio(&self) -> f64;
522}
523
524#[derive(Clone)]
525struct StateStoreCollector {
526 memory_collector: Arc<dyn MemoryCollector>,
527 descs: Vec<Desc>,
528 block_cache_size: IntGauge,
529 meta_cache_size: IntGauge,
530 uploading_memory_size: IntGauge,
531 prefetch_memory_size: IntGauge,
532 meta_cache_usage_ratio: Gauge,
533 block_cache_usage_ratio: Gauge,
534 uploading_memory_usage_ratio: Gauge,
535}
536
537impl StateStoreCollector {
538 pub fn new(memory_collector: Arc<dyn MemoryCollector>) -> Self {
539 let mut descs = Vec::new();
540
541 let block_cache_size = IntGauge::with_opts(Opts::new(
542 "state_store_block_cache_size",
543 "the size of cache for data block cache",
544 ))
545 .unwrap();
546 descs.extend(block_cache_size.desc().into_iter().cloned());
547
548 let block_cache_usage_ratio = Gauge::with_opts(Opts::new(
549 "state_store_block_cache_usage_ratio",
550 "the ratio of block cache to it's pre-allocated memory",
551 ))
552 .unwrap();
553 descs.extend(block_cache_usage_ratio.desc().into_iter().cloned());
554
555 let meta_cache_size = IntGauge::with_opts(Opts::new(
556 "state_store_meta_cache_size",
557 "the size of cache for meta file cache",
558 ))
559 .unwrap();
560 descs.extend(meta_cache_size.desc().into_iter().cloned());
561
562 let meta_cache_usage_ratio = Gauge::with_opts(Opts::new(
563 "state_store_meta_cache_usage_ratio",
564 "the ratio of meta cache to it's pre-allocated memory",
565 ))
566 .unwrap();
567 descs.extend(meta_cache_usage_ratio.desc().into_iter().cloned());
568
569 let uploading_memory_size = IntGauge::with_opts(Opts::new(
570 "uploading_memory_size",
571 "the size of uploading SSTs memory usage",
572 ))
573 .unwrap();
574 descs.extend(uploading_memory_size.desc().into_iter().cloned());
575
576 let uploading_memory_usage_ratio = Gauge::with_opts(Opts::new(
577 "state_store_uploading_memory_usage_ratio",
578 "the ratio of uploading SSTs memory usage to it's pre-allocated memory",
579 ))
580 .unwrap();
581 descs.extend(uploading_memory_usage_ratio.desc().into_iter().cloned());
582
583 let prefetch_memory_size = IntGauge::with_opts(Opts::new(
584 "state_store_prefetch_memory_size",
585 "the size of prefetch memory usage",
586 ))
587 .unwrap();
588 descs.extend(prefetch_memory_size.desc().into_iter().cloned());
589
590 Self {
591 memory_collector,
592 descs,
593 block_cache_size,
594 meta_cache_size,
595 uploading_memory_size,
596 prefetch_memory_size,
597 meta_cache_usage_ratio,
598 block_cache_usage_ratio,
599
600 uploading_memory_usage_ratio,
601 }
602 }
603}
604
605impl Collector for StateStoreCollector {
606 fn desc(&self) -> Vec<&Desc> {
607 self.descs.iter().collect()
608 }
609
610 fn collect(&self) -> Vec<proto::MetricFamily> {
611 self.block_cache_size
612 .set(self.memory_collector.get_data_memory_usage() as i64);
613 self.meta_cache_size
614 .set(self.memory_collector.get_meta_memory_usage() as i64);
615 self.uploading_memory_size
616 .set(self.memory_collector.get_uploading_memory_usage() as i64);
617 self.prefetch_memory_size
618 .set(self.memory_collector.get_prefetch_memory_usage() as i64);
619 self.meta_cache_usage_ratio
620 .set(self.memory_collector.get_meta_cache_memory_usage_ratio());
621 self.block_cache_usage_ratio
622 .set(self.memory_collector.get_block_cache_memory_usage_ratio());
623 self.uploading_memory_usage_ratio
624 .set(self.memory_collector.get_shared_buffer_usage_ratio());
625 let mut mfs = Vec::with_capacity(3);
627 mfs.extend(self.block_cache_size.collect());
628 mfs.extend(self.meta_cache_size.collect());
629 mfs.extend(self.uploading_memory_size.collect());
630 mfs
631 }
632}
633
634pub fn monitor_cache(memory_collector: Arc<dyn MemoryCollector>) {
635 let collector = Box::new(StateStoreCollector::new(memory_collector));
636 if let Err(e) = GLOBAL_METRICS_REGISTRY.register(collector) {
637 warn!(
638 "unable to monitor cache. May have been registered if in all-in-one deployment: {}",
639 e.as_report()
640 );
641 }
642}