1use std::sync::{Arc, OnceLock};
16
17use prometheus::core::{AtomicU64, Collector, Desc, GenericCounter};
18use prometheus::{
19 Gauge, Histogram, HistogramVec, IntGauge, IntGaugeVec, Opts, Registry, exponential_buckets,
20 histogram_opts, proto, register_histogram_vec_with_registry, register_histogram_with_registry,
21 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
22 register_int_gauge_with_registry,
23};
24use risingwave_common::config::MetricLevel;
25use risingwave_common::metrics::{
26 RelabeledCounterVec, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
27 RelabeledGuardedIntGaugeVec, RelabeledHistogramVec, RelabeledMetricVec, UintGauge,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::{
31 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
32 register_guarded_int_gauge_vec_with_registry,
33};
34use thiserror_ext::AsReport;
35use tracing::warn;
36
37#[derive(Debug, Clone)]
43pub struct HummockStateStoreMetrics {
44 pub bloom_filter_true_negative_counts: RelabeledGuardedIntCounterVec,
45 pub bloom_filter_check_counts: RelabeledGuardedIntCounterVec,
46 pub iter_merge_sstable_counts: RelabeledHistogramVec,
47 pub vnode_pruning_counts: RelabeledGuardedIntCounterVec,
48 pub sst_store_block_request_counts: RelabeledGuardedIntCounterVec,
49 pub iter_scan_key_counts: RelabeledGuardedIntCounterVec,
50 pub get_shared_buffer_hit_counts: RelabeledCounterVec,
51 pub remote_read_time: RelabeledHistogramVec,
52 pub iter_fetch_meta_duration: RelabeledGuardedHistogramVec,
53 pub iter_fetch_meta_cache_unhits: IntGauge,
54 pub iter_slow_fetch_meta_cache_unhits: IntGauge,
55
56 pub vector_object_request_counts: RelabeledGuardedIntCounterVec,
57 pub vector_request_stats: RelabeledGuardedHistogramVec,
58 pub vector_hnsw_graph_level_node_count: RelabeledGuardedIntGaugeVec,
59 pub vector_index_file_count: RelabeledGuardedIntGaugeVec,
60 pub vector_index_file_size: RelabeledGuardedIntGaugeVec,
61
62 pub read_req_bloom_filter_positive_counts: RelabeledGuardedIntCounterVec,
63 pub read_req_positive_but_non_exist_counts: RelabeledGuardedIntCounterVec,
64 pub read_req_check_bloom_filter_counts: RelabeledGuardedIntCounterVec,
65
66 pub write_batch_tuple_counts: RelabeledGuardedIntCounterVec,
67 pub write_batch_duration: RelabeledGuardedHistogramVec,
68 pub write_batch_size: RelabeledGuardedHistogramVec,
69
70 pub spill_task_counts_from_unsealed: GenericCounter<AtomicU64>,
72 pub spill_task_size_from_unsealed: GenericCounter<AtomicU64>,
74
75 pub uploader_uploading_task_size: UintGauge,
77 pub uploader_uploading_task_count: IntGauge,
78 pub uploader_imm_size: UintGauge,
79 pub uploader_upload_task_latency: Histogram,
80 pub uploader_syncing_epoch_count: IntGauge,
81 pub uploader_wait_poll_latency: Histogram,
82 pub uploader_per_table_imm_size: RelabeledGuardedIntGaugeVec,
83 pub uploader_per_table_imm_count: RelabeledGuardedIntGaugeVec,
84
85 pub per_table_imm_size: RelabeledGuardedIntGaugeVec,
87 pub per_table_imm_count: RelabeledGuardedIntGaugeVec,
88 pub mem_table_spill_counts: RelabeledGuardedIntCounterVec,
89 pub old_value_size: RelabeledGuardedIntGaugeVec,
90
91 pub block_efficiency_histogram: Histogram,
93
94 pub event_handler_pending_event: IntGaugeVec,
95 pub event_handler_latency: HistogramVec,
96
97 pub safe_version_hit: GenericCounter<AtomicU64>,
98 pub safe_version_miss: GenericCounter<AtomicU64>,
99}
100
101pub static GLOBAL_HUMMOCK_STATE_STORE_METRICS: OnceLock<HummockStateStoreMetrics> = OnceLock::new();
102
103pub fn global_hummock_state_store_metrics(metric_level: MetricLevel) -> HummockStateStoreMetrics {
104 GLOBAL_HUMMOCK_STATE_STORE_METRICS
105 .get_or_init(|| HummockStateStoreMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
106 .clone()
107}
108
109impl HummockStateStoreMetrics {
110 pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
111 let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
113
114 let state_store_read_time_buckets = exponential_buckets(0.001, 10.0, 5).unwrap();
116
117 let bloom_filter_true_negative_counts = register_guarded_int_counter_vec_with_registry!(
118 "state_store_bloom_filter_true_negative_counts",
119 "Total number of sstables that have been considered true negative by bloom filters",
120 &["table_id", "type"],
121 registry
122 )
123 .unwrap();
124 let bloom_filter_true_negative_counts = RelabeledMetricVec::with_metric_level(
125 MetricLevel::Debug,
126 bloom_filter_true_negative_counts,
127 metric_level,
128 );
129
130 let bloom_filter_check_counts = register_guarded_int_counter_vec_with_registry!(
131 "state_store_bloom_filter_check_counts",
132 "Total number of read request to check bloom filters",
133 &["table_id", "type"],
134 registry
135 )
136 .unwrap();
137 let bloom_filter_check_counts = RelabeledMetricVec::with_metric_level(
138 MetricLevel::Debug,
139 bloom_filter_check_counts,
140 metric_level,
141 );
142
143 let opts = histogram_opts!(
145 "state_store_iter_merge_sstable_counts",
146 "Number of child iterators merged into one MergeIterator",
147 vec![1.0, 10.0, 100.0, 1000.0, 10000.0]
148 );
149 let iter_merge_sstable_counts =
150 register_histogram_vec_with_registry!(opts, &["table_id", "type"], registry).unwrap();
151 let iter_merge_sstable_counts = RelabeledHistogramVec::with_metric_level(
152 MetricLevel::Debug,
153 iter_merge_sstable_counts,
154 metric_level,
155 );
156
157 let vnode_pruning_counts = register_guarded_int_counter_vec_with_registry!(
158 "state_store_vnode_pruning_counts",
159 "Total number of SST pruning operations by vnode key range hints",
160 &["table_id", "operation", "result"],
161 registry
162 )
163 .unwrap();
164
165 let vnode_pruning_counts = RelabeledGuardedIntCounterVec::with_metric_level(
166 MetricLevel::Debug,
167 vnode_pruning_counts,
168 metric_level,
169 );
170
171 let sst_store_block_request_counts = register_guarded_int_counter_vec_with_registry!(
173 "state_store_sst_store_block_request_counts",
174 "Total number of sst block requests that have been issued to sst store",
175 &["table_id", "type"],
176 registry
177 )
178 .unwrap();
179 let sst_store_block_request_counts = RelabeledGuardedIntCounterVec::with_metric_level(
180 MetricLevel::Info,
181 sst_store_block_request_counts,
182 metric_level,
183 );
184
185 let iter_scan_key_counts = register_guarded_int_counter_vec_with_registry!(
186 "state_store_iter_scan_key_counts",
187 "Total number of keys read by iterator",
188 &["table_id", "type"],
189 registry
190 )
191 .unwrap();
192 let iter_scan_key_counts = RelabeledGuardedIntCounterVec::with_metric_level(
193 MetricLevel::Info,
194 iter_scan_key_counts,
195 metric_level,
196 );
197
198 let get_shared_buffer_hit_counts = register_int_counter_vec_with_registry!(
199 "state_store_get_shared_buffer_hit_counts",
200 "Total number of get requests that have been fulfilled by shared buffer",
201 &["table_id"],
202 registry
203 )
204 .unwrap();
205 let get_shared_buffer_hit_counts = RelabeledCounterVec::with_metric_level(
206 MetricLevel::Debug,
207 get_shared_buffer_hit_counts,
208 metric_level,
209 );
210
211 let opts = histogram_opts!(
212 "state_store_remote_read_time_per_task",
213 "Total time of operations which read from remote storage when enable prefetch",
214 time_buckets.clone(),
215 );
216 let remote_read_time =
217 register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
218 let remote_read_time = RelabeledHistogramVec::with_metric_level(
219 MetricLevel::Debug,
220 remote_read_time,
221 metric_level,
222 );
223
224 let opts = histogram_opts!(
225 "state_store_iter_fetch_meta_duration",
226 "Histogram of iterator fetch SST meta time that have been issued to state store",
227 state_store_read_time_buckets,
228 );
229 let iter_fetch_meta_duration =
230 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
231 let iter_fetch_meta_duration = RelabeledGuardedHistogramVec::with_metric_level(
232 MetricLevel::Info,
233 iter_fetch_meta_duration,
234 metric_level,
235 );
236
237 let iter_fetch_meta_cache_unhits = register_int_gauge_with_registry!(
238 "state_store_iter_fetch_meta_cache_unhits",
239 "Number of SST meta cache unhit during one iterator meta fetch",
240 registry
241 )
242 .unwrap();
243
244 let iter_slow_fetch_meta_cache_unhits = register_int_gauge_with_registry!(
245 "state_store_iter_slow_fetch_meta_cache_unhits",
246 "Number of SST meta cache unhit during a iterator meta fetch which is slow (costs >5 seconds)",
247 registry
248 )
249 .unwrap();
250
251 let vector_object_request_counts = register_guarded_int_counter_vec_with_registry!(
253 "state_store_vector_object_request_counts",
254 "Metrics about vector object requests that have been issued",
255 &["table_id", "type", "mode"],
256 registry
257 )
258 .unwrap();
259 let vector_object_request_counts = RelabeledGuardedIntCounterVec::with_metric_level(
260 MetricLevel::Critical,
261 vector_object_request_counts,
262 metric_level,
263 );
264
265 let opts = histogram_opts!(
266 "state_store_vector_request_stats",
267 "Metrics about vector requests",
268 exponential_buckets(100.0, 10.0, 5).unwrap(),
269 );
270
271 let vector_request_stats = register_guarded_histogram_vec_with_registry!(
272 opts,
273 &["table_id", "type", "mode", "top_n", "ef"],
274 registry
275 )
276 .unwrap();
277 let vector_request_stats = RelabeledGuardedHistogramVec::with_metric_level(
278 MetricLevel::Critical,
279 vector_request_stats,
280 metric_level,
281 );
282
283 let vector_hnsw_graph_level_node_count = register_guarded_int_gauge_vec_with_registry!(
284 "state_store_vector_hnsw_graph_level_node_count",
285 "Number of nodes in each level of hnsw graph",
286 &["table_id", "level"],
287 registry
288 )
289 .unwrap();
290 let vector_hnsw_graph_level_node_count = RelabeledGuardedIntGaugeVec::with_metric_level(
291 MetricLevel::Critical,
292 vector_hnsw_graph_level_node_count,
293 metric_level,
294 );
295
296 let vector_index_file_count = register_guarded_int_gauge_vec_with_registry!(
297 "state_store_vector_index_file_count",
298 "Number of vector file",
299 &["table_id"],
300 registry
301 )
302 .unwrap();
303 let vector_index_file_count = RelabeledGuardedIntGaugeVec::with_metric_level(
304 MetricLevel::Critical,
305 vector_index_file_count,
306 metric_level,
307 );
308
309 let vector_index_file_size = register_guarded_int_gauge_vec_with_registry!(
310 "state_store_vector_index_file_size",
311 "total size of vector index file",
312 &["table_id", "type"],
313 registry
314 )
315 .unwrap();
316 let vector_index_file_size = RelabeledGuardedIntGaugeVec::with_metric_level(
317 MetricLevel::Critical,
318 vector_index_file_size,
319 metric_level,
320 );
321
322 let write_batch_tuple_counts = register_guarded_int_counter_vec_with_registry!(
324 "state_store_write_batch_tuple_counts",
325 "Total number of batched write kv pairs requests that have been issued to state store",
326 &["table_id"],
327 registry
328 )
329 .unwrap();
330 let write_batch_tuple_counts = RelabeledGuardedIntCounterVec::with_metric_level(
331 MetricLevel::Debug,
332 write_batch_tuple_counts,
333 metric_level,
334 );
335
336 let opts = histogram_opts!(
337 "state_store_write_batch_duration",
338 "Total time of batched write that have been issued to state store. With shared buffer on, this is the latency writing to the shared buffer",
339 time_buckets.clone()
340 );
341 let write_batch_duration =
342 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
343 let write_batch_duration = RelabeledGuardedHistogramVec::with_metric_level(
344 MetricLevel::Debug,
345 write_batch_duration,
346 metric_level,
347 );
348
349 let opts = histogram_opts!(
350 "state_store_write_batch_size",
351 "Total size of batched write that have been issued to state store",
352 exponential_buckets(256.0, 16.0, 7).unwrap() );
354 let write_batch_size =
355 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
356 let write_batch_size = RelabeledGuardedHistogramVec::with_metric_level(
357 MetricLevel::Debug,
358 write_batch_size,
359 metric_level,
360 );
361
362 let spill_task_counts = register_int_counter_vec_with_registry!(
363 "state_store_spill_task_counts",
364 "Total number of started spill tasks",
365 &["uploader_stage"],
366 registry
367 )
368 .unwrap();
369
370 let spill_task_size = register_int_counter_vec_with_registry!(
371 "state_store_spill_task_size",
372 "Total task of started spill tasks",
373 &["uploader_stage"],
374 registry
375 )
376 .unwrap();
377
378 let uploader_uploading_task_size = UintGauge::new(
379 "state_store_uploader_uploading_task_size",
380 "Total size of uploader uploading tasks",
381 )
382 .unwrap();
383 registry
384 .register(Box::new(uploader_uploading_task_size.clone()))
385 .unwrap();
386
387 let uploader_uploading_task_count = register_int_gauge_with_registry!(
388 "state_store_uploader_uploading_task_count",
389 "Total number of uploader uploading tasks",
390 registry
391 )
392 .unwrap();
393
394 let uploader_imm_size = UintGauge::new(
395 "state_store_uploader_imm_size",
396 "Total size of imms tracked by uploader",
397 )
398 .unwrap();
399 registry
400 .register(Box::new(uploader_imm_size.clone()))
401 .unwrap();
402
403 let opts = histogram_opts!(
404 "state_store_uploader_upload_task_latency",
405 "Latency of uploader uploading tasks",
406 time_buckets
407 );
408
409 let uploader_upload_task_latency =
410 register_histogram_with_registry!(opts, registry).unwrap();
411
412 let opts = histogram_opts!(
413 "state_store_uploader_wait_poll_latency",
414 "Latency of upload uploading task being polled after finish",
415 exponential_buckets(0.001, 5.0, 7).unwrap(), );
417
418 let uploader_wait_poll_latency = register_histogram_with_registry!(opts, registry).unwrap();
419
420 let uploader_syncing_epoch_count = register_int_gauge_with_registry!(
421 "state_store_uploader_syncing_epoch_count",
422 "Total number of syncing epoch",
423 registry
424 )
425 .unwrap();
426
427 let uploader_per_table_imm_size = register_guarded_int_gauge_vec_with_registry!(
428 "state_store_uploader_per_table_imm_size",
429 "Total uploader-tracked imm size per table",
430 &["table_id"],
431 registry
432 )
433 .unwrap();
434
435 let uploader_per_table_imm_size = RelabeledGuardedIntGaugeVec::with_metric_level(
436 MetricLevel::Debug,
437 uploader_per_table_imm_size,
438 metric_level,
439 );
440
441 let uploader_per_table_imm_count = register_guarded_int_gauge_vec_with_registry!(
442 "state_store_uploader_per_table_imm_count",
443 "Total uploader-tracked imm count per table",
444 &["table_id"],
445 registry
446 )
447 .unwrap();
448
449 let uploader_per_table_imm_count = RelabeledGuardedIntGaugeVec::with_metric_level(
450 MetricLevel::Debug,
451 uploader_per_table_imm_count,
452 metric_level,
453 );
454
455 let per_table_imm_size = register_guarded_int_gauge_vec_with_registry!(
456 "state_store_per_table_imm_size",
457 "Total imm size per table",
458 &["table_id", "fragment_id"],
459 registry
460 )
461 .unwrap();
462
463 let per_table_imm_size = RelabeledGuardedIntGaugeVec::with_metric_level_relabel_n(
464 MetricLevel::Debug,
465 per_table_imm_size,
466 metric_level,
467 1,
468 );
469
470 let per_table_imm_count = register_guarded_int_gauge_vec_with_registry!(
471 "state_store_per_table_imm_count",
472 "Total imm count per table",
473 &["table_id"],
474 registry
475 )
476 .unwrap();
477
478 let per_table_imm_count = RelabeledGuardedIntGaugeVec::with_metric_level(
479 MetricLevel::Debug,
480 per_table_imm_count,
481 metric_level,
482 );
483
484 let read_req_bloom_filter_positive_counts = register_guarded_int_counter_vec_with_registry!(
485 "state_store_read_req_bloom_filter_positive_counts",
486 "Total number of read request with at least one SST bloom filter check returns positive",
487 &["table_id", "type"],
488 registry
489 )
490 .unwrap();
491 let read_req_bloom_filter_positive_counts =
492 RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
493 MetricLevel::Info,
494 read_req_bloom_filter_positive_counts,
495 metric_level,
496 1,
497 );
498
499 let read_req_positive_but_non_exist_counts = register_guarded_int_counter_vec_with_registry!(
500 "state_store_read_req_positive_but_non_exist_counts",
501 "Total number of read request on non-existent key/prefix with at least one SST bloom filter check returns positive",
502 &["table_id", "type"],
503 registry
504 )
505 .unwrap();
506 let read_req_positive_but_non_exist_counts =
507 RelabeledGuardedIntCounterVec::with_metric_level(
508 MetricLevel::Info,
509 read_req_positive_but_non_exist_counts,
510 metric_level,
511 );
512
513 let read_req_check_bloom_filter_counts = register_guarded_int_counter_vec_with_registry!(
514 "state_store_read_req_check_bloom_filter_counts",
515 "Total number of read request that checks bloom filter with a prefix hint",
516 &["table_id", "type"],
517 registry
518 )
519 .unwrap();
520
521 let read_req_check_bloom_filter_counts = RelabeledGuardedIntCounterVec::with_metric_level(
522 MetricLevel::Info,
523 read_req_check_bloom_filter_counts,
524 metric_level,
525 );
526
527 let mem_table_spill_counts = register_guarded_int_counter_vec_with_registry!(
528 "state_store_mem_table_spill_counts",
529 "Total number of mem table spill occurs for one table",
530 &["table_id"],
531 registry
532 )
533 .unwrap();
534
535 let mem_table_spill_counts = RelabeledGuardedIntCounterVec::with_metric_level(
536 MetricLevel::Info,
537 mem_table_spill_counts,
538 metric_level,
539 );
540
541 let old_value_size = register_guarded_int_gauge_vec_with_registry!(
542 "state_store_old_value_size",
543 "The size of old value",
544 &["table_id"],
545 registry
546 )
547 .unwrap();
548
549 let old_value_size = RelabeledGuardedIntGaugeVec::with_metric_level(
550 MetricLevel::Info,
551 old_value_size,
552 metric_level,
553 );
554
555 let opts = histogram_opts!(
556 "block_efficiency_histogram",
557 "Access ratio of in-memory block.",
558 exponential_buckets(0.001, 2.0, 11).unwrap(),
559 );
560 let block_efficiency_histogram = register_histogram_with_registry!(opts, registry).unwrap();
561
562 let event_handler_pending_event = register_int_gauge_vec_with_registry!(
563 "state_store_event_handler_pending_event",
564 "The number of sent but unhandled events",
565 &["event_type"],
566 registry,
567 )
568 .unwrap();
569
570 let opts = histogram_opts!(
571 "state_store_event_handler_latency",
572 "Latency to handle event",
573 exponential_buckets(0.001, 5.0, 7).unwrap(), );
575
576 let event_handler_latency =
577 register_histogram_vec_with_registry!(opts, &["event_type"], registry).unwrap();
578
579 let safe_version_hit = GenericCounter::new(
580 "state_store_safe_version_hit",
581 "The total count of a safe version that can be retrieved successfully",
582 )
583 .unwrap();
584 registry
585 .register(Box::new(safe_version_hit.clone()))
586 .unwrap();
587
588 let safe_version_miss = GenericCounter::new(
589 "state_store_safe_version_miss",
590 "The total count of a safe version that cannot be retrieved",
591 )
592 .unwrap();
593 registry
594 .register(Box::new(safe_version_miss.clone()))
595 .unwrap();
596
597 Self {
598 bloom_filter_true_negative_counts,
599 bloom_filter_check_counts,
600 iter_merge_sstable_counts,
601 vnode_pruning_counts,
602 sst_store_block_request_counts,
603 iter_scan_key_counts,
604 get_shared_buffer_hit_counts,
605 remote_read_time,
606 iter_fetch_meta_duration,
607 iter_fetch_meta_cache_unhits,
608 iter_slow_fetch_meta_cache_unhits,
609 vector_object_request_counts,
610 vector_request_stats,
611 vector_hnsw_graph_level_node_count,
612 vector_index_file_count,
613 vector_index_file_size,
614 read_req_bloom_filter_positive_counts,
615 read_req_positive_but_non_exist_counts,
616 read_req_check_bloom_filter_counts,
617 write_batch_tuple_counts,
618 write_batch_duration,
619 write_batch_size,
620 spill_task_counts_from_unsealed: spill_task_counts.with_label_values(&["unsealed"]),
621 spill_task_size_from_unsealed: spill_task_size.with_label_values(&["unsealed"]),
622 uploader_uploading_task_size,
623 uploader_uploading_task_count,
624 uploader_imm_size,
625 uploader_upload_task_latency,
626 uploader_syncing_epoch_count,
627 uploader_wait_poll_latency,
628 uploader_per_table_imm_size,
629 uploader_per_table_imm_count,
630 per_table_imm_size,
631 per_table_imm_count,
632 mem_table_spill_counts,
633 old_value_size,
634
635 block_efficiency_histogram,
636 event_handler_pending_event,
637 event_handler_latency,
638 safe_version_hit,
639 safe_version_miss,
640 }
641 }
642
643 pub fn unused() -> Self {
644 global_hummock_state_store_metrics(MetricLevel::Disabled)
645 }
646}
647
648pub trait MemoryCollector: Sync + Send {
649 fn get_meta_memory_usage(&self) -> u64;
650 fn get_data_memory_usage(&self) -> u64;
651 fn get_vector_meta_memory_usage(&self) -> u64;
652 fn get_vector_data_memory_usage(&self) -> u64;
653 fn get_uploading_memory_usage(&self) -> u64;
654 fn get_prefetch_memory_usage(&self) -> usize;
655 fn get_meta_cache_memory_usage_ratio(&self) -> f64;
656 fn get_block_cache_memory_usage_ratio(&self) -> f64;
657 fn get_vector_meta_cache_memory_usage_ratio(&self) -> f64;
658 fn get_vector_data_cache_memory_usage_ratio(&self) -> f64;
659 fn get_shared_buffer_usage_ratio(&self) -> f64;
660}
661
662#[derive(Clone)]
663struct StateStoreCollector {
664 memory_collector: Arc<dyn MemoryCollector>,
665 collectors: Vec<Arc<dyn Collector>>,
666 block_cache_size: IntGauge,
667 meta_cache_size: IntGauge,
668 vector_data_cache_size: IntGauge,
669 vector_meta_cache_size: IntGauge,
670 uploading_memory_size: IntGauge,
671 prefetch_memory_size: IntGauge,
672 meta_cache_usage_ratio: Gauge,
673 block_cache_usage_ratio: Gauge,
674 vector_data_cache_usage_ratio: Gauge,
675 vector_meta_cache_usage_ratio: Gauge,
676 uploading_memory_usage_ratio: Gauge,
677}
678
679impl StateStoreCollector {
680 pub fn new(memory_collector: Arc<dyn MemoryCollector>) -> Self {
681 let mut collectors = Vec::new();
682
683 let block_cache_size = IntGauge::with_opts(Opts::new(
684 "state_store_block_cache_size",
685 "the size of cache for data block cache",
686 ))
687 .unwrap();
688 collectors.push(Arc::new(block_cache_size.clone()) as _);
689
690 let block_cache_usage_ratio = Gauge::with_opts(Opts::new(
691 "state_store_block_cache_usage_ratio",
692 "the ratio of block cache to it's pre-allocated memory",
693 ))
694 .unwrap();
695 collectors.push(Arc::new(block_cache_usage_ratio.clone()) as _);
696
697 let meta_cache_size = IntGauge::with_opts(Opts::new(
698 "state_store_meta_cache_size",
699 "the size of cache for meta file cache",
700 ))
701 .unwrap();
702 collectors.push(Arc::new(meta_cache_size.clone()) as _);
703
704 let meta_cache_usage_ratio = Gauge::with_opts(Opts::new(
705 "state_store_meta_cache_usage_ratio",
706 "the ratio of meta cache to it's pre-allocated memory",
707 ))
708 .unwrap();
709 collectors.push(Arc::new(meta_cache_usage_ratio.clone()) as _);
710
711 let vector_data_cache_size = IntGauge::with_opts(Opts::new(
712 "state_store_vector_data_cache_size",
713 "the size of cache for vector data file cache",
714 ))
715 .unwrap();
716 collectors.push(Arc::new(vector_data_cache_size.clone()) as _);
717
718 let vector_data_cache_usage_ratio = Gauge::with_opts(Opts::new(
719 "state_store_vector_data_cache_usage_ratio",
720 "the ratio of vector data cache to it's pre-allocated memory",
721 ))
722 .unwrap();
723 collectors.push(Arc::new(vector_data_cache_usage_ratio.clone()) as _);
724
725 let vector_meta_cache_size = IntGauge::with_opts(Opts::new(
726 "state_store_vector_meta_cache_size",
727 "the size of cache for vector meta file cache",
728 ))
729 .unwrap();
730 collectors.push(Arc::new(vector_meta_cache_size.clone()) as _);
731
732 let vector_meta_cache_usage_ratio = Gauge::with_opts(Opts::new(
733 "state_store_vector_meta_cache_usage_ratio",
734 "the ratio of vector meta cache to it's pre-allocated memory",
735 ))
736 .unwrap();
737 collectors.push(Arc::new(vector_meta_cache_usage_ratio.clone()) as _);
738
739 let uploading_memory_size = IntGauge::with_opts(Opts::new(
740 "uploading_memory_size",
741 "the size of uploading SSTs memory usage",
742 ))
743 .unwrap();
744 collectors.push(Arc::new(uploading_memory_size.clone()) as _);
745
746 let uploading_memory_usage_ratio = Gauge::with_opts(Opts::new(
747 "state_store_uploading_memory_usage_ratio",
748 "the ratio of uploading SSTs memory usage to it's pre-allocated memory",
749 ))
750 .unwrap();
751 collectors.push(Arc::new(uploading_memory_usage_ratio.clone()) as _);
752
753 let prefetch_memory_size = IntGauge::with_opts(Opts::new(
754 "state_store_prefetch_memory_size",
755 "the size of prefetch memory usage",
756 ))
757 .unwrap();
758 collectors.push(Arc::new(prefetch_memory_size.clone()) as _);
759
760 Self {
761 memory_collector,
762 collectors,
763 block_cache_size,
764 meta_cache_size,
765 vector_data_cache_size,
766 vector_meta_cache_size,
767 uploading_memory_size,
768 prefetch_memory_size,
769 meta_cache_usage_ratio,
770 block_cache_usage_ratio,
771
772 vector_data_cache_usage_ratio,
773 vector_meta_cache_usage_ratio,
774 uploading_memory_usage_ratio,
775 }
776 }
777}
778
779impl Collector for StateStoreCollector {
780 fn desc(&self) -> Vec<&Desc> {
781 self.collectors.iter().flat_map(|c| c.desc()).collect()
782 }
783
784 fn collect(&self) -> Vec<proto::MetricFamily> {
785 self.block_cache_size
786 .set(self.memory_collector.get_data_memory_usage() as i64);
787 self.meta_cache_size
788 .set(self.memory_collector.get_meta_memory_usage() as i64);
789 self.vector_data_cache_size
790 .set(self.memory_collector.get_vector_data_memory_usage() as _);
791 self.vector_meta_cache_size
792 .set(self.memory_collector.get_vector_meta_memory_usage() as _);
793 self.uploading_memory_size
794 .set(self.memory_collector.get_uploading_memory_usage() as i64);
795 self.prefetch_memory_size
796 .set(self.memory_collector.get_prefetch_memory_usage() as i64);
797 self.meta_cache_usage_ratio
798 .set(self.memory_collector.get_meta_cache_memory_usage_ratio());
799 self.block_cache_usage_ratio
800 .set(self.memory_collector.get_block_cache_memory_usage_ratio());
801 self.vector_meta_cache_usage_ratio.set(
802 self.memory_collector
803 .get_vector_meta_cache_memory_usage_ratio(),
804 );
805 self.vector_data_cache_usage_ratio.set(
806 self.memory_collector
807 .get_vector_data_cache_memory_usage_ratio(),
808 );
809 self.uploading_memory_usage_ratio
810 .set(self.memory_collector.get_shared_buffer_usage_ratio());
811 self.collectors.iter().flat_map(|c| c.collect()).collect()
813 }
814}
815
816pub fn monitor_cache(memory_collector: Arc<dyn MemoryCollector>) {
817 let collector = Box::new(StateStoreCollector::new(memory_collector));
818 if let Err(e) = GLOBAL_METRICS_REGISTRY.register(collector) {
819 warn!(
820 "unable to monitor cache. May have been registered if in all-in-one deployment: {}",
821 e.as_report()
822 );
823 }
824}