1use std::sync::{Arc, OnceLock};
16
17use prometheus::core::{AtomicU64, Collector, Desc, GenericCounter};
18use prometheus::{
19 Gauge, Histogram, HistogramVec, IntGauge, Opts, Registry, exponential_buckets, histogram_opts,
20 proto, register_histogram_vec_with_registry, register_histogram_with_registry,
21 register_int_counter_vec_with_registry, register_int_gauge_with_registry,
22};
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25 RelabeledCounterVec, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
26 RelabeledGuardedIntGaugeVec, RelabeledHistogramVec, RelabeledMetricVec, UintGauge,
27};
28use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
29use risingwave_common::{
30 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
31 register_guarded_int_gauge_vec_with_registry,
32};
33use thiserror_ext::AsReport;
34use tracing::warn;
35
36#[derive(Debug, Clone)]
42pub struct HummockStateStoreMetrics {
43 pub bloom_filter_true_negative_counts: RelabeledGuardedIntCounterVec,
44 pub bloom_filter_check_counts: RelabeledGuardedIntCounterVec,
45 pub iter_merge_sstable_counts: RelabeledHistogramVec,
46 pub sst_store_block_request_counts: RelabeledGuardedIntCounterVec,
47 pub iter_scan_key_counts: RelabeledGuardedIntCounterVec,
48 pub get_shared_buffer_hit_counts: RelabeledCounterVec,
49 pub remote_read_time: RelabeledHistogramVec,
50 pub iter_fetch_meta_duration: RelabeledGuardedHistogramVec,
51 pub iter_fetch_meta_cache_unhits: IntGauge,
52 pub iter_slow_fetch_meta_cache_unhits: IntGauge,
53
54 pub vector_object_request_counts: RelabeledGuardedIntCounterVec,
55 pub vector_request_stats: RelabeledGuardedHistogramVec,
56 pub vector_hnsw_graph_level_node_count: RelabeledGuardedIntGaugeVec,
57 pub vector_index_file_count: RelabeledGuardedIntGaugeVec,
58 pub vector_index_file_size: RelabeledGuardedIntGaugeVec,
59
60 pub read_req_bloom_filter_positive_counts: RelabeledGuardedIntCounterVec,
61 pub read_req_positive_but_non_exist_counts: RelabeledGuardedIntCounterVec,
62 pub read_req_check_bloom_filter_counts: RelabeledGuardedIntCounterVec,
63
64 pub write_batch_tuple_counts: RelabeledCounterVec,
65 pub write_batch_duration: RelabeledHistogramVec,
66 pub write_batch_size: RelabeledHistogramVec,
67
68 pub merge_imm_task_counts: RelabeledCounterVec,
70 pub merge_imm_batch_memory_sz: RelabeledCounterVec,
72
73 pub spill_task_counts_from_unsealed: GenericCounter<AtomicU64>,
75 pub spill_task_size_from_unsealed: GenericCounter<AtomicU64>,
77 pub spill_task_counts_from_sealed: GenericCounter<AtomicU64>,
79 pub spill_task_size_from_sealed: GenericCounter<AtomicU64>,
81
82 pub uploader_uploading_task_size: UintGauge,
84 pub uploader_uploading_task_count: IntGauge,
85 pub uploader_imm_size: UintGauge,
86 pub uploader_upload_task_latency: Histogram,
87 pub uploader_syncing_epoch_count: IntGauge,
88 pub uploader_wait_poll_latency: Histogram,
89
90 pub mem_table_spill_counts: RelabeledCounterVec,
92 pub old_value_size: IntGauge,
93
94 pub block_efficiency_histogram: Histogram,
96
97 pub event_handler_pending_event: IntGauge,
98 pub event_handler_latency: HistogramVec,
99
100 pub safe_version_hit: GenericCounter<AtomicU64>,
101 pub safe_version_miss: GenericCounter<AtomicU64>,
102}
103
104pub static GLOBAL_HUMMOCK_STATE_STORE_METRICS: OnceLock<HummockStateStoreMetrics> = OnceLock::new();
105
106pub fn global_hummock_state_store_metrics(metric_level: MetricLevel) -> HummockStateStoreMetrics {
107 GLOBAL_HUMMOCK_STATE_STORE_METRICS
108 .get_or_init(|| HummockStateStoreMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
109 .clone()
110}
111
112impl HummockStateStoreMetrics {
113 pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
114 let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
116
117 let state_store_read_time_buckets = exponential_buckets(0.001, 10.0, 5).unwrap();
119
120 let bloom_filter_true_negative_counts = register_guarded_int_counter_vec_with_registry!(
121 "state_store_bloom_filter_true_negative_counts",
122 "Total number of sstables that have been considered true negative by bloom filters",
123 &["table_id", "type"],
124 registry
125 )
126 .unwrap();
127 let bloom_filter_true_negative_counts = RelabeledMetricVec::with_metric_level(
128 MetricLevel::Debug,
129 bloom_filter_true_negative_counts,
130 metric_level,
131 );
132
133 let bloom_filter_check_counts = register_guarded_int_counter_vec_with_registry!(
134 "state_store_bloom_filter_check_counts",
135 "Total number of read request to check bloom filters",
136 &["table_id", "type"],
137 registry
138 )
139 .unwrap();
140 let bloom_filter_check_counts = RelabeledMetricVec::with_metric_level(
141 MetricLevel::Debug,
142 bloom_filter_check_counts,
143 metric_level,
144 );
145
146 let opts = histogram_opts!(
148 "state_store_iter_merge_sstable_counts",
149 "Number of child iterators merged into one MergeIterator",
150 vec![1.0, 10.0, 100.0, 1000.0, 10000.0]
151 );
152 let iter_merge_sstable_counts =
153 register_histogram_vec_with_registry!(opts, &["table_id", "type"], registry).unwrap();
154 let iter_merge_sstable_counts = RelabeledHistogramVec::with_metric_level(
155 MetricLevel::Debug,
156 iter_merge_sstable_counts,
157 metric_level,
158 );
159
160 let sst_store_block_request_counts = register_guarded_int_counter_vec_with_registry!(
162 "state_store_sst_store_block_request_counts",
163 "Total number of sst block requests that have been issued to sst store",
164 &["table_id", "type"],
165 registry
166 )
167 .unwrap();
168 let sst_store_block_request_counts = RelabeledGuardedIntCounterVec::with_metric_level(
169 MetricLevel::Critical,
170 sst_store_block_request_counts,
171 metric_level,
172 );
173
174 let iter_scan_key_counts = register_guarded_int_counter_vec_with_registry!(
175 "state_store_iter_scan_key_counts",
176 "Total number of keys read by iterator",
177 &["table_id", "type"],
178 registry
179 )
180 .unwrap();
181 let iter_scan_key_counts = RelabeledGuardedIntCounterVec::with_metric_level(
182 MetricLevel::Info,
183 iter_scan_key_counts,
184 metric_level,
185 );
186
187 let get_shared_buffer_hit_counts = register_int_counter_vec_with_registry!(
188 "state_store_get_shared_buffer_hit_counts",
189 "Total number of get requests that have been fulfilled by shared buffer",
190 &["table_id"],
191 registry
192 )
193 .unwrap();
194 let get_shared_buffer_hit_counts = RelabeledCounterVec::with_metric_level(
195 MetricLevel::Debug,
196 get_shared_buffer_hit_counts,
197 metric_level,
198 );
199
200 let opts = histogram_opts!(
201 "state_store_remote_read_time_per_task",
202 "Total time of operations which read from remote storage when enable prefetch",
203 time_buckets.clone(),
204 );
205 let remote_read_time =
206 register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
207 let remote_read_time = RelabeledHistogramVec::with_metric_level(
208 MetricLevel::Debug,
209 remote_read_time,
210 metric_level,
211 );
212
213 let opts = histogram_opts!(
214 "state_store_iter_fetch_meta_duration",
215 "Histogram of iterator fetch SST meta time that have been issued to state store",
216 state_store_read_time_buckets,
217 );
218 let iter_fetch_meta_duration =
219 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
220 let iter_fetch_meta_duration = RelabeledGuardedHistogramVec::with_metric_level(
221 MetricLevel::Info,
222 iter_fetch_meta_duration,
223 metric_level,
224 );
225
226 let iter_fetch_meta_cache_unhits = register_int_gauge_with_registry!(
227 "state_store_iter_fetch_meta_cache_unhits",
228 "Number of SST meta cache unhit during one iterator meta fetch",
229 registry
230 )
231 .unwrap();
232
233 let iter_slow_fetch_meta_cache_unhits = register_int_gauge_with_registry!(
234 "state_store_iter_slow_fetch_meta_cache_unhits",
235 "Number of SST meta cache unhit during a iterator meta fetch which is slow (costs >5 seconds)",
236 registry
237 )
238 .unwrap();
239
240 let vector_object_request_counts = register_guarded_int_counter_vec_with_registry!(
242 "state_store_vector_object_request_counts",
243 "Metrics about vector object requests that have been issued",
244 &["table_id", "type", "mode"],
245 registry
246 )
247 .unwrap();
248 let vector_object_request_counts = RelabeledGuardedIntCounterVec::with_metric_level(
249 MetricLevel::Critical,
250 vector_object_request_counts,
251 metric_level,
252 );
253
254 let opts = histogram_opts!(
255 "state_store_vector_request_stats",
256 "Metrics about vector requests",
257 exponential_buckets(100.0, 10.0, 5).unwrap(),
258 );
259
260 let vector_request_stats = register_guarded_histogram_vec_with_registry!(
261 opts,
262 &["table_id", "type", "mode", "top_n", "ef"],
263 registry
264 )
265 .unwrap();
266 let vector_request_stats = RelabeledGuardedHistogramVec::with_metric_level(
267 MetricLevel::Critical,
268 vector_request_stats,
269 metric_level,
270 );
271
272 let vector_hnsw_graph_level_node_count = register_guarded_int_gauge_vec_with_registry!(
273 "state_store_vector_hnsw_graph_level_node_count",
274 "Number of nodes in each level of hnsw graph",
275 &["table_id", "level"],
276 registry
277 )
278 .unwrap();
279 let vector_hnsw_graph_level_node_count = RelabeledGuardedIntGaugeVec::with_metric_level(
280 MetricLevel::Critical,
281 vector_hnsw_graph_level_node_count,
282 metric_level,
283 );
284
285 let vector_index_file_count = register_guarded_int_gauge_vec_with_registry!(
286 "state_store_vector_index_file_count",
287 "Number of vector file",
288 &["table_id"],
289 registry
290 )
291 .unwrap();
292 let vector_index_file_count = RelabeledGuardedIntGaugeVec::with_metric_level(
293 MetricLevel::Critical,
294 vector_index_file_count,
295 metric_level,
296 );
297
298 let vector_index_file_size = register_guarded_int_gauge_vec_with_registry!(
299 "state_store_vector_index_file_size",
300 "total size of vector index file",
301 &["table_id", "type"],
302 registry
303 )
304 .unwrap();
305 let vector_index_file_size = RelabeledGuardedIntGaugeVec::with_metric_level(
306 MetricLevel::Critical,
307 vector_index_file_size,
308 metric_level,
309 );
310
311 let write_batch_tuple_counts = register_int_counter_vec_with_registry!(
313 "state_store_write_batch_tuple_counts",
314 "Total number of batched write kv pairs requests that have been issued to state store",
315 &["table_id"],
316 registry
317 )
318 .unwrap();
319 let write_batch_tuple_counts = RelabeledCounterVec::with_metric_level(
320 MetricLevel::Debug,
321 write_batch_tuple_counts,
322 metric_level,
323 );
324
325 let opts = histogram_opts!(
326 "state_store_write_batch_duration",
327 "Total time of batched write that have been issued to state store. With shared buffer on, this is the latency writing to the shared buffer",
328 time_buckets.clone()
329 );
330 let write_batch_duration =
331 register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
332 let write_batch_duration = RelabeledHistogramVec::with_metric_level(
333 MetricLevel::Debug,
334 write_batch_duration,
335 metric_level,
336 );
337
338 let opts = histogram_opts!(
339 "state_store_write_batch_size",
340 "Total size of batched write that have been issued to state store",
341 exponential_buckets(256.0, 16.0, 7).unwrap() );
343 let write_batch_size =
344 register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
345 let write_batch_size = RelabeledHistogramVec::with_metric_level(
346 MetricLevel::Debug,
347 write_batch_size,
348 metric_level,
349 );
350
351 let merge_imm_task_counts = register_int_counter_vec_with_registry!(
352 "state_store_merge_imm_task_counts",
353 "Total number of merge imm task that have been finished",
354 &["table_id"],
355 registry
356 )
357 .unwrap();
358 let merge_imm_task_counts = RelabeledCounterVec::with_metric_level(
359 MetricLevel::Debug,
360 merge_imm_task_counts,
361 metric_level,
362 );
363
364 let merge_imm_batch_memory_sz = register_int_counter_vec_with_registry!(
365 "state_store_merge_imm_memory_sz",
366 "Number of imm batches that have been merged by a merge task",
367 &["table_id"],
368 registry
369 )
370 .unwrap();
371 let merge_imm_batch_memory_sz = RelabeledCounterVec::with_metric_level(
372 MetricLevel::Debug,
373 merge_imm_batch_memory_sz,
374 metric_level,
375 );
376
377 let spill_task_counts = register_int_counter_vec_with_registry!(
378 "state_store_spill_task_counts",
379 "Total number of started spill tasks",
380 &["uploader_stage"],
381 registry
382 )
383 .unwrap();
384
385 let spill_task_size = register_int_counter_vec_with_registry!(
386 "state_store_spill_task_size",
387 "Total task of started spill tasks",
388 &["uploader_stage"],
389 registry
390 )
391 .unwrap();
392
393 let uploader_uploading_task_size = UintGauge::new(
394 "state_store_uploader_uploading_task_size",
395 "Total size of uploader uploading tasks",
396 )
397 .unwrap();
398 registry
399 .register(Box::new(uploader_uploading_task_size.clone()))
400 .unwrap();
401
402 let uploader_uploading_task_count = register_int_gauge_with_registry!(
403 "state_store_uploader_uploading_task_count",
404 "Total number of uploader uploading tasks",
405 registry
406 )
407 .unwrap();
408
409 let uploader_imm_size = UintGauge::new(
410 "state_store_uploader_imm_size",
411 "Total size of imms tracked by uploader",
412 )
413 .unwrap();
414 registry
415 .register(Box::new(uploader_imm_size.clone()))
416 .unwrap();
417
418 let opts = histogram_opts!(
419 "state_store_uploader_upload_task_latency",
420 "Latency of uploader uploading tasks",
421 time_buckets
422 );
423
424 let uploader_upload_task_latency =
425 register_histogram_with_registry!(opts, registry).unwrap();
426
427 let opts = histogram_opts!(
428 "state_store_uploader_wait_poll_latency",
429 "Latency of upload uploading task being polled after finish",
430 exponential_buckets(0.001, 5.0, 7).unwrap(), );
432
433 let uploader_wait_poll_latency = register_histogram_with_registry!(opts, registry).unwrap();
434
435 let uploader_syncing_epoch_count = register_int_gauge_with_registry!(
436 "state_store_uploader_syncing_epoch_count",
437 "Total number of syncing epoch",
438 registry
439 )
440 .unwrap();
441
442 let read_req_bloom_filter_positive_counts = register_guarded_int_counter_vec_with_registry!(
443 "state_store_read_req_bloom_filter_positive_counts",
444 "Total number of read request with at least one SST bloom filter check returns positive",
445 &["table_id", "type"],
446 registry
447 )
448 .unwrap();
449 let read_req_bloom_filter_positive_counts =
450 RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
451 MetricLevel::Info,
452 read_req_bloom_filter_positive_counts,
453 metric_level,
454 1,
455 );
456
457 let read_req_positive_but_non_exist_counts = register_guarded_int_counter_vec_with_registry!(
458 "state_store_read_req_positive_but_non_exist_counts",
459 "Total number of read request on non-existent key/prefix with at least one SST bloom filter check returns positive",
460 &["table_id", "type"],
461 registry
462 )
463 .unwrap();
464 let read_req_positive_but_non_exist_counts =
465 RelabeledGuardedIntCounterVec::with_metric_level(
466 MetricLevel::Info,
467 read_req_positive_but_non_exist_counts,
468 metric_level,
469 );
470
471 let read_req_check_bloom_filter_counts = register_guarded_int_counter_vec_with_registry!(
472 "state_store_read_req_check_bloom_filter_counts",
473 "Total number of read request that checks bloom filter with a prefix hint",
474 &["table_id", "type"],
475 registry
476 )
477 .unwrap();
478
479 let read_req_check_bloom_filter_counts = RelabeledGuardedIntCounterVec::with_metric_level(
480 MetricLevel::Info,
481 read_req_check_bloom_filter_counts,
482 metric_level,
483 );
484
485 let mem_table_spill_counts = register_int_counter_vec_with_registry!(
486 "state_store_mem_table_spill_counts",
487 "Total number of mem table spill occurs for one table",
488 &["table_id"],
489 registry
490 )
491 .unwrap();
492
493 let mem_table_spill_counts = RelabeledCounterVec::with_metric_level(
494 MetricLevel::Info,
495 mem_table_spill_counts,
496 metric_level,
497 );
498
499 let old_value_size = register_int_gauge_with_registry!(
500 "state_store_old_value_size",
501 "The size of old value",
502 registry
503 )
504 .unwrap();
505
506 let opts = histogram_opts!(
507 "block_efficiency_histogram",
508 "Access ratio of in-memory block.",
509 exponential_buckets(0.001, 2.0, 11).unwrap(),
510 );
511 let block_efficiency_histogram = register_histogram_with_registry!(opts, registry).unwrap();
512
513 let event_handler_pending_event = register_int_gauge_with_registry!(
514 "state_store_event_handler_pending_event",
515 "The number of sent but unhandled events",
516 registry,
517 )
518 .unwrap();
519
520 let opts = histogram_opts!(
521 "state_store_event_handler_latency",
522 "Latency to handle event",
523 exponential_buckets(0.001, 5.0, 7).unwrap(), );
525
526 let event_handler_latency =
527 register_histogram_vec_with_registry!(opts, &["event_type"], registry).unwrap();
528
529 let safe_version_hit = GenericCounter::new(
530 "state_store_safe_version_hit",
531 "The total count of a safe version that can be retrieved successfully",
532 )
533 .unwrap();
534 registry
535 .register(Box::new(safe_version_hit.clone()))
536 .unwrap();
537
538 let safe_version_miss = GenericCounter::new(
539 "state_store_safe_version_miss",
540 "The total count of a safe version that cannot be retrieved",
541 )
542 .unwrap();
543 registry
544 .register(Box::new(safe_version_miss.clone()))
545 .unwrap();
546
547 Self {
548 bloom_filter_true_negative_counts,
549 bloom_filter_check_counts,
550 iter_merge_sstable_counts,
551 sst_store_block_request_counts,
552 iter_scan_key_counts,
553 get_shared_buffer_hit_counts,
554 remote_read_time,
555 iter_fetch_meta_duration,
556 iter_fetch_meta_cache_unhits,
557 iter_slow_fetch_meta_cache_unhits,
558 vector_object_request_counts,
559 vector_request_stats,
560 vector_hnsw_graph_level_node_count,
561 vector_index_file_count,
562 vector_index_file_size,
563 read_req_bloom_filter_positive_counts,
564 read_req_positive_but_non_exist_counts,
565 read_req_check_bloom_filter_counts,
566 write_batch_tuple_counts,
567 write_batch_duration,
568 write_batch_size,
569 merge_imm_task_counts,
570 merge_imm_batch_memory_sz,
571 spill_task_counts_from_sealed: spill_task_counts.with_label_values(&["sealed"]),
572 spill_task_counts_from_unsealed: spill_task_counts.with_label_values(&["unsealed"]),
573 spill_task_size_from_sealed: spill_task_size.with_label_values(&["sealed"]),
574 spill_task_size_from_unsealed: spill_task_size.with_label_values(&["unsealed"]),
575 uploader_uploading_task_size,
576 uploader_uploading_task_count,
577 uploader_imm_size,
578 uploader_upload_task_latency,
579 uploader_syncing_epoch_count,
580 uploader_wait_poll_latency,
581 mem_table_spill_counts,
582 old_value_size,
583
584 block_efficiency_histogram,
585 event_handler_pending_event,
586 event_handler_latency,
587 safe_version_hit,
588 safe_version_miss,
589 }
590 }
591
592 pub fn unused() -> Self {
593 global_hummock_state_store_metrics(MetricLevel::Disabled)
594 }
595}
596
597pub trait MemoryCollector: Sync + Send {
598 fn get_meta_memory_usage(&self) -> u64;
599 fn get_data_memory_usage(&self) -> u64;
600 fn get_vector_meta_memory_usage(&self) -> u64;
601 fn get_vector_data_memory_usage(&self) -> u64;
602 fn get_uploading_memory_usage(&self) -> u64;
603 fn get_prefetch_memory_usage(&self) -> usize;
604 fn get_meta_cache_memory_usage_ratio(&self) -> f64;
605 fn get_block_cache_memory_usage_ratio(&self) -> f64;
606 fn get_vector_meta_cache_memory_usage_ratio(&self) -> f64;
607 fn get_vector_data_cache_memory_usage_ratio(&self) -> f64;
608 fn get_shared_buffer_usage_ratio(&self) -> f64;
609}
610
611#[derive(Clone)]
612struct StateStoreCollector {
613 memory_collector: Arc<dyn MemoryCollector>,
614 collectors: Vec<Arc<dyn Collector>>,
615 block_cache_size: IntGauge,
616 meta_cache_size: IntGauge,
617 vector_data_cache_size: IntGauge,
618 vector_meta_cache_size: IntGauge,
619 uploading_memory_size: IntGauge,
620 prefetch_memory_size: IntGauge,
621 meta_cache_usage_ratio: Gauge,
622 block_cache_usage_ratio: Gauge,
623 vector_data_cache_usage_ratio: Gauge,
624 vector_meta_cache_usage_ratio: Gauge,
625 uploading_memory_usage_ratio: Gauge,
626}
627
628impl StateStoreCollector {
629 pub fn new(memory_collector: Arc<dyn MemoryCollector>) -> Self {
630 let mut collectors = Vec::new();
631
632 let block_cache_size = IntGauge::with_opts(Opts::new(
633 "state_store_block_cache_size",
634 "the size of cache for data block cache",
635 ))
636 .unwrap();
637 collectors.push(Arc::new(block_cache_size.clone()) as _);
638
639 let block_cache_usage_ratio = Gauge::with_opts(Opts::new(
640 "state_store_block_cache_usage_ratio",
641 "the ratio of block cache to it's pre-allocated memory",
642 ))
643 .unwrap();
644 collectors.push(Arc::new(block_cache_usage_ratio.clone()) as _);
645
646 let meta_cache_size = IntGauge::with_opts(Opts::new(
647 "state_store_meta_cache_size",
648 "the size of cache for meta file cache",
649 ))
650 .unwrap();
651 collectors.push(Arc::new(meta_cache_size.clone()) as _);
652
653 let meta_cache_usage_ratio = Gauge::with_opts(Opts::new(
654 "state_store_meta_cache_usage_ratio",
655 "the ratio of meta cache to it's pre-allocated memory",
656 ))
657 .unwrap();
658 collectors.push(Arc::new(meta_cache_usage_ratio.clone()) as _);
659
660 let vector_data_cache_size = IntGauge::with_opts(Opts::new(
661 "state_store_vector_data_cache_size",
662 "the size of cache for vector data file cache",
663 ))
664 .unwrap();
665 collectors.push(Arc::new(vector_data_cache_size.clone()) as _);
666
667 let vector_data_cache_usage_ratio = Gauge::with_opts(Opts::new(
668 "state_store_vector_data_cache_usage_ratio",
669 "the ratio of vector data cache to it's pre-allocated memory",
670 ))
671 .unwrap();
672 collectors.push(Arc::new(vector_data_cache_usage_ratio.clone()) as _);
673
674 let vector_meta_cache_size = IntGauge::with_opts(Opts::new(
675 "state_store_vector_meta_cache_size",
676 "the size of cache for vector meta file cache",
677 ))
678 .unwrap();
679 collectors.push(Arc::new(vector_meta_cache_size.clone()) as _);
680
681 let vector_meta_cache_usage_ratio = Gauge::with_opts(Opts::new(
682 "state_store_vector_meta_cache_usage_ratio",
683 "the ratio of vector meta cache to it's pre-allocated memory",
684 ))
685 .unwrap();
686 collectors.push(Arc::new(vector_meta_cache_usage_ratio.clone()) as _);
687
688 let uploading_memory_size = IntGauge::with_opts(Opts::new(
689 "uploading_memory_size",
690 "the size of uploading SSTs memory usage",
691 ))
692 .unwrap();
693 collectors.push(Arc::new(uploading_memory_size.clone()) as _);
694
695 let uploading_memory_usage_ratio = Gauge::with_opts(Opts::new(
696 "state_store_uploading_memory_usage_ratio",
697 "the ratio of uploading SSTs memory usage to it's pre-allocated memory",
698 ))
699 .unwrap();
700 collectors.push(Arc::new(uploading_memory_usage_ratio.clone()) as _);
701
702 let prefetch_memory_size = IntGauge::with_opts(Opts::new(
703 "state_store_prefetch_memory_size",
704 "the size of prefetch memory usage",
705 ))
706 .unwrap();
707 collectors.push(Arc::new(prefetch_memory_size.clone()) as _);
708
709 Self {
710 memory_collector,
711 collectors,
712 block_cache_size,
713 meta_cache_size,
714 vector_data_cache_size,
715 vector_meta_cache_size,
716 uploading_memory_size,
717 prefetch_memory_size,
718 meta_cache_usage_ratio,
719 block_cache_usage_ratio,
720
721 vector_data_cache_usage_ratio,
722 vector_meta_cache_usage_ratio,
723 uploading_memory_usage_ratio,
724 }
725 }
726}
727
728impl Collector for StateStoreCollector {
729 fn desc(&self) -> Vec<&Desc> {
730 self.collectors.iter().flat_map(|c| c.desc()).collect()
731 }
732
733 fn collect(&self) -> Vec<proto::MetricFamily> {
734 self.block_cache_size
735 .set(self.memory_collector.get_data_memory_usage() as i64);
736 self.meta_cache_size
737 .set(self.memory_collector.get_meta_memory_usage() as i64);
738 self.vector_data_cache_size
739 .set(self.memory_collector.get_vector_data_memory_usage() as _);
740 self.vector_meta_cache_size
741 .set(self.memory_collector.get_vector_meta_memory_usage() as _);
742 self.uploading_memory_size
743 .set(self.memory_collector.get_uploading_memory_usage() as i64);
744 self.prefetch_memory_size
745 .set(self.memory_collector.get_prefetch_memory_usage() as i64);
746 self.meta_cache_usage_ratio
747 .set(self.memory_collector.get_meta_cache_memory_usage_ratio());
748 self.block_cache_usage_ratio
749 .set(self.memory_collector.get_block_cache_memory_usage_ratio());
750 self.vector_meta_cache_usage_ratio.set(
751 self.memory_collector
752 .get_vector_meta_cache_memory_usage_ratio(),
753 );
754 self.vector_data_cache_usage_ratio.set(
755 self.memory_collector
756 .get_vector_data_cache_memory_usage_ratio(),
757 );
758 self.uploading_memory_usage_ratio
759 .set(self.memory_collector.get_shared_buffer_usage_ratio());
760 self.collectors.iter().flat_map(|c| c.collect()).collect()
762 }
763}
764
765pub fn monitor_cache(memory_collector: Arc<dyn MemoryCollector>) {
766 let collector = Box::new(StateStoreCollector::new(memory_collector));
767 if let Err(e) = GLOBAL_METRICS_REGISTRY.register(collector) {
768 warn!(
769 "unable to monitor cache. May have been registered if in all-in-one deployment: {}",
770 e.as_report()
771 );
772 }
773}