1use std::sync::{Arc, OnceLock};
16
17use prometheus::core::{AtomicU64, Collector, Desc, GenericCounter};
18use prometheus::{
19 Gauge, Histogram, HistogramVec, IntGauge, IntGaugeVec, Opts, Registry, exponential_buckets,
20 histogram_opts, proto, register_histogram_vec_with_registry, register_histogram_with_registry,
21 register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
22 register_int_gauge_with_registry,
23};
24use risingwave_common::config::MetricLevel;
25use risingwave_common::metrics::{
26 RelabeledCounterVec, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
27 RelabeledGuardedIntGaugeVec, RelabeledHistogramVec, RelabeledMetricVec, UintGauge,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::{
31 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
32 register_guarded_int_gauge_vec_with_registry,
33};
34use thiserror_ext::AsReport;
35use tracing::warn;
36
37#[derive(Debug, Clone)]
43pub struct HummockStateStoreMetrics {
44 pub bloom_filter_true_negative_counts: RelabeledGuardedIntCounterVec,
45 pub bloom_filter_check_counts: RelabeledGuardedIntCounterVec,
46 pub iter_merge_sstable_counts: RelabeledHistogramVec,
47 pub sst_store_block_request_counts: RelabeledGuardedIntCounterVec,
48 pub iter_scan_key_counts: RelabeledGuardedIntCounterVec,
49 pub get_shared_buffer_hit_counts: RelabeledCounterVec,
50 pub remote_read_time: RelabeledHistogramVec,
51 pub iter_fetch_meta_duration: RelabeledGuardedHistogramVec,
52 pub iter_fetch_meta_cache_unhits: IntGauge,
53 pub iter_slow_fetch_meta_cache_unhits: IntGauge,
54
55 pub vector_object_request_counts: RelabeledGuardedIntCounterVec,
56 pub vector_request_stats: RelabeledGuardedHistogramVec,
57 pub vector_hnsw_graph_level_node_count: RelabeledGuardedIntGaugeVec,
58 pub vector_index_file_count: RelabeledGuardedIntGaugeVec,
59 pub vector_index_file_size: RelabeledGuardedIntGaugeVec,
60
61 pub read_req_bloom_filter_positive_counts: RelabeledGuardedIntCounterVec,
62 pub read_req_positive_but_non_exist_counts: RelabeledGuardedIntCounterVec,
63 pub read_req_check_bloom_filter_counts: RelabeledGuardedIntCounterVec,
64
65 pub write_batch_tuple_counts: RelabeledGuardedIntCounterVec,
66 pub write_batch_duration: RelabeledGuardedHistogramVec,
67 pub write_batch_size: RelabeledGuardedHistogramVec,
68
69 pub spill_task_counts_from_unsealed: GenericCounter<AtomicU64>,
71 pub spill_task_size_from_unsealed: GenericCounter<AtomicU64>,
73
74 pub uploader_uploading_task_size: UintGauge,
76 pub uploader_uploading_task_count: IntGauge,
77 pub uploader_imm_size: UintGauge,
78 pub uploader_upload_task_latency: Histogram,
79 pub uploader_syncing_epoch_count: IntGauge,
80 pub uploader_wait_poll_latency: Histogram,
81 pub uploader_per_table_imm_size: RelabeledGuardedIntGaugeVec,
82 pub uploader_per_table_imm_count: RelabeledGuardedIntGaugeVec,
83
84 pub per_table_imm_size: RelabeledGuardedIntGaugeVec,
86 pub per_table_imm_count: RelabeledGuardedIntGaugeVec,
87 pub mem_table_spill_counts: RelabeledGuardedIntCounterVec,
88 pub old_value_size: RelabeledGuardedIntGaugeVec,
89
90 pub block_efficiency_histogram: Histogram,
92
93 pub event_handler_pending_event: IntGaugeVec,
94 pub event_handler_latency: HistogramVec,
95
96 pub safe_version_hit: GenericCounter<AtomicU64>,
97 pub safe_version_miss: GenericCounter<AtomicU64>,
98}
99
100pub static GLOBAL_HUMMOCK_STATE_STORE_METRICS: OnceLock<HummockStateStoreMetrics> = OnceLock::new();
101
102pub fn global_hummock_state_store_metrics(metric_level: MetricLevel) -> HummockStateStoreMetrics {
103 GLOBAL_HUMMOCK_STATE_STORE_METRICS
104 .get_or_init(|| HummockStateStoreMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
105 .clone()
106}
107
108impl HummockStateStoreMetrics {
109 pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
110 let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
112
113 let state_store_read_time_buckets = exponential_buckets(0.001, 10.0, 5).unwrap();
115
116 let bloom_filter_true_negative_counts = register_guarded_int_counter_vec_with_registry!(
117 "state_store_bloom_filter_true_negative_counts",
118 "Total number of sstables that have been considered true negative by bloom filters",
119 &["table_id", "type"],
120 registry
121 )
122 .unwrap();
123 let bloom_filter_true_negative_counts = RelabeledMetricVec::with_metric_level(
124 MetricLevel::Debug,
125 bloom_filter_true_negative_counts,
126 metric_level,
127 );
128
129 let bloom_filter_check_counts = register_guarded_int_counter_vec_with_registry!(
130 "state_store_bloom_filter_check_counts",
131 "Total number of read request to check bloom filters",
132 &["table_id", "type"],
133 registry
134 )
135 .unwrap();
136 let bloom_filter_check_counts = RelabeledMetricVec::with_metric_level(
137 MetricLevel::Debug,
138 bloom_filter_check_counts,
139 metric_level,
140 );
141
142 let opts = histogram_opts!(
144 "state_store_iter_merge_sstable_counts",
145 "Number of child iterators merged into one MergeIterator",
146 vec![1.0, 10.0, 100.0, 1000.0, 10000.0]
147 );
148 let iter_merge_sstable_counts =
149 register_histogram_vec_with_registry!(opts, &["table_id", "type"], registry).unwrap();
150 let iter_merge_sstable_counts = RelabeledHistogramVec::with_metric_level(
151 MetricLevel::Debug,
152 iter_merge_sstable_counts,
153 metric_level,
154 );
155
156 let sst_store_block_request_counts = register_guarded_int_counter_vec_with_registry!(
158 "state_store_sst_store_block_request_counts",
159 "Total number of sst block requests that have been issued to sst store",
160 &["table_id", "type"],
161 registry
162 )
163 .unwrap();
164 let sst_store_block_request_counts = RelabeledGuardedIntCounterVec::with_metric_level(
165 MetricLevel::Info,
166 sst_store_block_request_counts,
167 metric_level,
168 );
169
170 let iter_scan_key_counts = register_guarded_int_counter_vec_with_registry!(
171 "state_store_iter_scan_key_counts",
172 "Total number of keys read by iterator",
173 &["table_id", "type"],
174 registry
175 )
176 .unwrap();
177 let iter_scan_key_counts = RelabeledGuardedIntCounterVec::with_metric_level(
178 MetricLevel::Info,
179 iter_scan_key_counts,
180 metric_level,
181 );
182
183 let get_shared_buffer_hit_counts = register_int_counter_vec_with_registry!(
184 "state_store_get_shared_buffer_hit_counts",
185 "Total number of get requests that have been fulfilled by shared buffer",
186 &["table_id"],
187 registry
188 )
189 .unwrap();
190 let get_shared_buffer_hit_counts = RelabeledCounterVec::with_metric_level(
191 MetricLevel::Debug,
192 get_shared_buffer_hit_counts,
193 metric_level,
194 );
195
196 let opts = histogram_opts!(
197 "state_store_remote_read_time_per_task",
198 "Total time of operations which read from remote storage when enable prefetch",
199 time_buckets.clone(),
200 );
201 let remote_read_time =
202 register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
203 let remote_read_time = RelabeledHistogramVec::with_metric_level(
204 MetricLevel::Debug,
205 remote_read_time,
206 metric_level,
207 );
208
209 let opts = histogram_opts!(
210 "state_store_iter_fetch_meta_duration",
211 "Histogram of iterator fetch SST meta time that have been issued to state store",
212 state_store_read_time_buckets,
213 );
214 let iter_fetch_meta_duration =
215 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
216 let iter_fetch_meta_duration = RelabeledGuardedHistogramVec::with_metric_level(
217 MetricLevel::Info,
218 iter_fetch_meta_duration,
219 metric_level,
220 );
221
222 let iter_fetch_meta_cache_unhits = register_int_gauge_with_registry!(
223 "state_store_iter_fetch_meta_cache_unhits",
224 "Number of SST meta cache unhit during one iterator meta fetch",
225 registry
226 )
227 .unwrap();
228
229 let iter_slow_fetch_meta_cache_unhits = register_int_gauge_with_registry!(
230 "state_store_iter_slow_fetch_meta_cache_unhits",
231 "Number of SST meta cache unhit during a iterator meta fetch which is slow (costs >5 seconds)",
232 registry
233 )
234 .unwrap();
235
236 let vector_object_request_counts = register_guarded_int_counter_vec_with_registry!(
238 "state_store_vector_object_request_counts",
239 "Metrics about vector object requests that have been issued",
240 &["table_id", "type", "mode"],
241 registry
242 )
243 .unwrap();
244 let vector_object_request_counts = RelabeledGuardedIntCounterVec::with_metric_level(
245 MetricLevel::Critical,
246 vector_object_request_counts,
247 metric_level,
248 );
249
250 let opts = histogram_opts!(
251 "state_store_vector_request_stats",
252 "Metrics about vector requests",
253 exponential_buckets(100.0, 10.0, 5).unwrap(),
254 );
255
256 let vector_request_stats = register_guarded_histogram_vec_with_registry!(
257 opts,
258 &["table_id", "type", "mode", "top_n", "ef"],
259 registry
260 )
261 .unwrap();
262 let vector_request_stats = RelabeledGuardedHistogramVec::with_metric_level(
263 MetricLevel::Critical,
264 vector_request_stats,
265 metric_level,
266 );
267
268 let vector_hnsw_graph_level_node_count = register_guarded_int_gauge_vec_with_registry!(
269 "state_store_vector_hnsw_graph_level_node_count",
270 "Number of nodes in each level of hnsw graph",
271 &["table_id", "level"],
272 registry
273 )
274 .unwrap();
275 let vector_hnsw_graph_level_node_count = RelabeledGuardedIntGaugeVec::with_metric_level(
276 MetricLevel::Critical,
277 vector_hnsw_graph_level_node_count,
278 metric_level,
279 );
280
281 let vector_index_file_count = register_guarded_int_gauge_vec_with_registry!(
282 "state_store_vector_index_file_count",
283 "Number of vector file",
284 &["table_id"],
285 registry
286 )
287 .unwrap();
288 let vector_index_file_count = RelabeledGuardedIntGaugeVec::with_metric_level(
289 MetricLevel::Critical,
290 vector_index_file_count,
291 metric_level,
292 );
293
294 let vector_index_file_size = register_guarded_int_gauge_vec_with_registry!(
295 "state_store_vector_index_file_size",
296 "total size of vector index file",
297 &["table_id", "type"],
298 registry
299 )
300 .unwrap();
301 let vector_index_file_size = RelabeledGuardedIntGaugeVec::with_metric_level(
302 MetricLevel::Critical,
303 vector_index_file_size,
304 metric_level,
305 );
306
307 let write_batch_tuple_counts = register_guarded_int_counter_vec_with_registry!(
309 "state_store_write_batch_tuple_counts",
310 "Total number of batched write kv pairs requests that have been issued to state store",
311 &["table_id"],
312 registry
313 )
314 .unwrap();
315 let write_batch_tuple_counts = RelabeledGuardedIntCounterVec::with_metric_level(
316 MetricLevel::Debug,
317 write_batch_tuple_counts,
318 metric_level,
319 );
320
321 let opts = histogram_opts!(
322 "state_store_write_batch_duration",
323 "Total time of batched write that have been issued to state store. With shared buffer on, this is the latency writing to the shared buffer",
324 time_buckets.clone()
325 );
326 let write_batch_duration =
327 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
328 let write_batch_duration = RelabeledGuardedHistogramVec::with_metric_level(
329 MetricLevel::Debug,
330 write_batch_duration,
331 metric_level,
332 );
333
334 let opts = histogram_opts!(
335 "state_store_write_batch_size",
336 "Total size of batched write that have been issued to state store",
337 exponential_buckets(256.0, 16.0, 7).unwrap() );
339 let write_batch_size =
340 register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
341 let write_batch_size = RelabeledGuardedHistogramVec::with_metric_level(
342 MetricLevel::Debug,
343 write_batch_size,
344 metric_level,
345 );
346
347 let spill_task_counts = register_int_counter_vec_with_registry!(
348 "state_store_spill_task_counts",
349 "Total number of started spill tasks",
350 &["uploader_stage"],
351 registry
352 )
353 .unwrap();
354
355 let spill_task_size = register_int_counter_vec_with_registry!(
356 "state_store_spill_task_size",
357 "Total task of started spill tasks",
358 &["uploader_stage"],
359 registry
360 )
361 .unwrap();
362
363 let uploader_uploading_task_size = UintGauge::new(
364 "state_store_uploader_uploading_task_size",
365 "Total size of uploader uploading tasks",
366 )
367 .unwrap();
368 registry
369 .register(Box::new(uploader_uploading_task_size.clone()))
370 .unwrap();
371
372 let uploader_uploading_task_count = register_int_gauge_with_registry!(
373 "state_store_uploader_uploading_task_count",
374 "Total number of uploader uploading tasks",
375 registry
376 )
377 .unwrap();
378
379 let uploader_imm_size = UintGauge::new(
380 "state_store_uploader_imm_size",
381 "Total size of imms tracked by uploader",
382 )
383 .unwrap();
384 registry
385 .register(Box::new(uploader_imm_size.clone()))
386 .unwrap();
387
388 let opts = histogram_opts!(
389 "state_store_uploader_upload_task_latency",
390 "Latency of uploader uploading tasks",
391 time_buckets
392 );
393
394 let uploader_upload_task_latency =
395 register_histogram_with_registry!(opts, registry).unwrap();
396
397 let opts = histogram_opts!(
398 "state_store_uploader_wait_poll_latency",
399 "Latency of upload uploading task being polled after finish",
400 exponential_buckets(0.001, 5.0, 7).unwrap(), );
402
403 let uploader_wait_poll_latency = register_histogram_with_registry!(opts, registry).unwrap();
404
405 let uploader_syncing_epoch_count = register_int_gauge_with_registry!(
406 "state_store_uploader_syncing_epoch_count",
407 "Total number of syncing epoch",
408 registry
409 )
410 .unwrap();
411
412 let uploader_per_table_imm_size = register_guarded_int_gauge_vec_with_registry!(
413 "state_store_uploader_per_table_imm_size",
414 "Total uploader-tracked imm size per table",
415 &["table_id"],
416 registry
417 )
418 .unwrap();
419
420 let uploader_per_table_imm_size = RelabeledGuardedIntGaugeVec::with_metric_level(
421 MetricLevel::Debug,
422 uploader_per_table_imm_size,
423 metric_level,
424 );
425
426 let uploader_per_table_imm_count = register_guarded_int_gauge_vec_with_registry!(
427 "state_store_uploader_per_table_imm_count",
428 "Total uploader-tracked imm count per table",
429 &["table_id"],
430 registry
431 )
432 .unwrap();
433
434 let uploader_per_table_imm_count = RelabeledGuardedIntGaugeVec::with_metric_level(
435 MetricLevel::Debug,
436 uploader_per_table_imm_count,
437 metric_level,
438 );
439
440 let per_table_imm_size = register_guarded_int_gauge_vec_with_registry!(
441 "state_store_per_table_imm_size",
442 "Total imm size per table",
443 &["table_id", "fragment_id"],
444 registry
445 )
446 .unwrap();
447
448 let per_table_imm_size = RelabeledGuardedIntGaugeVec::with_metric_level_relabel_n(
449 MetricLevel::Debug,
450 per_table_imm_size,
451 metric_level,
452 1,
453 );
454
455 let per_table_imm_count = register_guarded_int_gauge_vec_with_registry!(
456 "state_store_per_table_imm_count",
457 "Total imm count per table",
458 &["table_id"],
459 registry
460 )
461 .unwrap();
462
463 let per_table_imm_count = RelabeledGuardedIntGaugeVec::with_metric_level(
464 MetricLevel::Debug,
465 per_table_imm_count,
466 metric_level,
467 );
468
469 let read_req_bloom_filter_positive_counts = register_guarded_int_counter_vec_with_registry!(
470 "state_store_read_req_bloom_filter_positive_counts",
471 "Total number of read request with at least one SST bloom filter check returns positive",
472 &["table_id", "type"],
473 registry
474 )
475 .unwrap();
476 let read_req_bloom_filter_positive_counts =
477 RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
478 MetricLevel::Info,
479 read_req_bloom_filter_positive_counts,
480 metric_level,
481 1,
482 );
483
484 let read_req_positive_but_non_exist_counts = register_guarded_int_counter_vec_with_registry!(
485 "state_store_read_req_positive_but_non_exist_counts",
486 "Total number of read request on non-existent key/prefix with at least one SST bloom filter check returns positive",
487 &["table_id", "type"],
488 registry
489 )
490 .unwrap();
491 let read_req_positive_but_non_exist_counts =
492 RelabeledGuardedIntCounterVec::with_metric_level(
493 MetricLevel::Info,
494 read_req_positive_but_non_exist_counts,
495 metric_level,
496 );
497
498 let read_req_check_bloom_filter_counts = register_guarded_int_counter_vec_with_registry!(
499 "state_store_read_req_check_bloom_filter_counts",
500 "Total number of read request that checks bloom filter with a prefix hint",
501 &["table_id", "type"],
502 registry
503 )
504 .unwrap();
505
506 let read_req_check_bloom_filter_counts = RelabeledGuardedIntCounterVec::with_metric_level(
507 MetricLevel::Info,
508 read_req_check_bloom_filter_counts,
509 metric_level,
510 );
511
512 let mem_table_spill_counts = register_guarded_int_counter_vec_with_registry!(
513 "state_store_mem_table_spill_counts",
514 "Total number of mem table spill occurs for one table",
515 &["table_id"],
516 registry
517 )
518 .unwrap();
519
520 let mem_table_spill_counts = RelabeledGuardedIntCounterVec::with_metric_level(
521 MetricLevel::Info,
522 mem_table_spill_counts,
523 metric_level,
524 );
525
526 let old_value_size = register_guarded_int_gauge_vec_with_registry!(
527 "state_store_old_value_size",
528 "The size of old value",
529 &["table_id"],
530 registry
531 )
532 .unwrap();
533
534 let old_value_size = RelabeledGuardedIntGaugeVec::with_metric_level(
535 MetricLevel::Info,
536 old_value_size,
537 metric_level,
538 );
539
540 let opts = histogram_opts!(
541 "block_efficiency_histogram",
542 "Access ratio of in-memory block.",
543 exponential_buckets(0.001, 2.0, 11).unwrap(),
544 );
545 let block_efficiency_histogram = register_histogram_with_registry!(opts, registry).unwrap();
546
547 let event_handler_pending_event = register_int_gauge_vec_with_registry!(
548 "state_store_event_handler_pending_event",
549 "The number of sent but unhandled events",
550 &["event_type"],
551 registry,
552 )
553 .unwrap();
554
555 let opts = histogram_opts!(
556 "state_store_event_handler_latency",
557 "Latency to handle event",
558 exponential_buckets(0.001, 5.0, 7).unwrap(), );
560
561 let event_handler_latency =
562 register_histogram_vec_with_registry!(opts, &["event_type"], registry).unwrap();
563
564 let safe_version_hit = GenericCounter::new(
565 "state_store_safe_version_hit",
566 "The total count of a safe version that can be retrieved successfully",
567 )
568 .unwrap();
569 registry
570 .register(Box::new(safe_version_hit.clone()))
571 .unwrap();
572
573 let safe_version_miss = GenericCounter::new(
574 "state_store_safe_version_miss",
575 "The total count of a safe version that cannot be retrieved",
576 )
577 .unwrap();
578 registry
579 .register(Box::new(safe_version_miss.clone()))
580 .unwrap();
581
582 Self {
583 bloom_filter_true_negative_counts,
584 bloom_filter_check_counts,
585 iter_merge_sstable_counts,
586 sst_store_block_request_counts,
587 iter_scan_key_counts,
588 get_shared_buffer_hit_counts,
589 remote_read_time,
590 iter_fetch_meta_duration,
591 iter_fetch_meta_cache_unhits,
592 iter_slow_fetch_meta_cache_unhits,
593 vector_object_request_counts,
594 vector_request_stats,
595 vector_hnsw_graph_level_node_count,
596 vector_index_file_count,
597 vector_index_file_size,
598 read_req_bloom_filter_positive_counts,
599 read_req_positive_but_non_exist_counts,
600 read_req_check_bloom_filter_counts,
601 write_batch_tuple_counts,
602 write_batch_duration,
603 write_batch_size,
604 spill_task_counts_from_unsealed: spill_task_counts.with_label_values(&["unsealed"]),
605 spill_task_size_from_unsealed: spill_task_size.with_label_values(&["unsealed"]),
606 uploader_uploading_task_size,
607 uploader_uploading_task_count,
608 uploader_imm_size,
609 uploader_upload_task_latency,
610 uploader_syncing_epoch_count,
611 uploader_wait_poll_latency,
612 uploader_per_table_imm_size,
613 uploader_per_table_imm_count,
614 per_table_imm_size,
615 per_table_imm_count,
616 mem_table_spill_counts,
617 old_value_size,
618
619 block_efficiency_histogram,
620 event_handler_pending_event,
621 event_handler_latency,
622 safe_version_hit,
623 safe_version_miss,
624 }
625 }
626
627 pub fn unused() -> Self {
628 global_hummock_state_store_metrics(MetricLevel::Disabled)
629 }
630}
631
632pub trait MemoryCollector: Sync + Send {
633 fn get_meta_memory_usage(&self) -> u64;
634 fn get_data_memory_usage(&self) -> u64;
635 fn get_vector_meta_memory_usage(&self) -> u64;
636 fn get_vector_data_memory_usage(&self) -> u64;
637 fn get_uploading_memory_usage(&self) -> u64;
638 fn get_prefetch_memory_usage(&self) -> usize;
639 fn get_meta_cache_memory_usage_ratio(&self) -> f64;
640 fn get_block_cache_memory_usage_ratio(&self) -> f64;
641 fn get_vector_meta_cache_memory_usage_ratio(&self) -> f64;
642 fn get_vector_data_cache_memory_usage_ratio(&self) -> f64;
643 fn get_shared_buffer_usage_ratio(&self) -> f64;
644}
645
646#[derive(Clone)]
647struct StateStoreCollector {
648 memory_collector: Arc<dyn MemoryCollector>,
649 collectors: Vec<Arc<dyn Collector>>,
650 block_cache_size: IntGauge,
651 meta_cache_size: IntGauge,
652 vector_data_cache_size: IntGauge,
653 vector_meta_cache_size: IntGauge,
654 uploading_memory_size: IntGauge,
655 prefetch_memory_size: IntGauge,
656 meta_cache_usage_ratio: Gauge,
657 block_cache_usage_ratio: Gauge,
658 vector_data_cache_usage_ratio: Gauge,
659 vector_meta_cache_usage_ratio: Gauge,
660 uploading_memory_usage_ratio: Gauge,
661}
662
663impl StateStoreCollector {
664 pub fn new(memory_collector: Arc<dyn MemoryCollector>) -> Self {
665 let mut collectors = Vec::new();
666
667 let block_cache_size = IntGauge::with_opts(Opts::new(
668 "state_store_block_cache_size",
669 "the size of cache for data block cache",
670 ))
671 .unwrap();
672 collectors.push(Arc::new(block_cache_size.clone()) as _);
673
674 let block_cache_usage_ratio = Gauge::with_opts(Opts::new(
675 "state_store_block_cache_usage_ratio",
676 "the ratio of block cache to it's pre-allocated memory",
677 ))
678 .unwrap();
679 collectors.push(Arc::new(block_cache_usage_ratio.clone()) as _);
680
681 let meta_cache_size = IntGauge::with_opts(Opts::new(
682 "state_store_meta_cache_size",
683 "the size of cache for meta file cache",
684 ))
685 .unwrap();
686 collectors.push(Arc::new(meta_cache_size.clone()) as _);
687
688 let meta_cache_usage_ratio = Gauge::with_opts(Opts::new(
689 "state_store_meta_cache_usage_ratio",
690 "the ratio of meta cache to it's pre-allocated memory",
691 ))
692 .unwrap();
693 collectors.push(Arc::new(meta_cache_usage_ratio.clone()) as _);
694
695 let vector_data_cache_size = IntGauge::with_opts(Opts::new(
696 "state_store_vector_data_cache_size",
697 "the size of cache for vector data file cache",
698 ))
699 .unwrap();
700 collectors.push(Arc::new(vector_data_cache_size.clone()) as _);
701
702 let vector_data_cache_usage_ratio = Gauge::with_opts(Opts::new(
703 "state_store_vector_data_cache_usage_ratio",
704 "the ratio of vector data cache to it's pre-allocated memory",
705 ))
706 .unwrap();
707 collectors.push(Arc::new(vector_data_cache_usage_ratio.clone()) as _);
708
709 let vector_meta_cache_size = IntGauge::with_opts(Opts::new(
710 "state_store_vector_meta_cache_size",
711 "the size of cache for vector meta file cache",
712 ))
713 .unwrap();
714 collectors.push(Arc::new(vector_meta_cache_size.clone()) as _);
715
716 let vector_meta_cache_usage_ratio = Gauge::with_opts(Opts::new(
717 "state_store_vector_meta_cache_usage_ratio",
718 "the ratio of vector meta cache to it's pre-allocated memory",
719 ))
720 .unwrap();
721 collectors.push(Arc::new(vector_meta_cache_usage_ratio.clone()) as _);
722
723 let uploading_memory_size = IntGauge::with_opts(Opts::new(
724 "uploading_memory_size",
725 "the size of uploading SSTs memory usage",
726 ))
727 .unwrap();
728 collectors.push(Arc::new(uploading_memory_size.clone()) as _);
729
730 let uploading_memory_usage_ratio = Gauge::with_opts(Opts::new(
731 "state_store_uploading_memory_usage_ratio",
732 "the ratio of uploading SSTs memory usage to it's pre-allocated memory",
733 ))
734 .unwrap();
735 collectors.push(Arc::new(uploading_memory_usage_ratio.clone()) as _);
736
737 let prefetch_memory_size = IntGauge::with_opts(Opts::new(
738 "state_store_prefetch_memory_size",
739 "the size of prefetch memory usage",
740 ))
741 .unwrap();
742 collectors.push(Arc::new(prefetch_memory_size.clone()) as _);
743
744 Self {
745 memory_collector,
746 collectors,
747 block_cache_size,
748 meta_cache_size,
749 vector_data_cache_size,
750 vector_meta_cache_size,
751 uploading_memory_size,
752 prefetch_memory_size,
753 meta_cache_usage_ratio,
754 block_cache_usage_ratio,
755
756 vector_data_cache_usage_ratio,
757 vector_meta_cache_usage_ratio,
758 uploading_memory_usage_ratio,
759 }
760 }
761}
762
763impl Collector for StateStoreCollector {
764 fn desc(&self) -> Vec<&Desc> {
765 self.collectors.iter().flat_map(|c| c.desc()).collect()
766 }
767
768 fn collect(&self) -> Vec<proto::MetricFamily> {
769 self.block_cache_size
770 .set(self.memory_collector.get_data_memory_usage() as i64);
771 self.meta_cache_size
772 .set(self.memory_collector.get_meta_memory_usage() as i64);
773 self.vector_data_cache_size
774 .set(self.memory_collector.get_vector_data_memory_usage() as _);
775 self.vector_meta_cache_size
776 .set(self.memory_collector.get_vector_meta_memory_usage() as _);
777 self.uploading_memory_size
778 .set(self.memory_collector.get_uploading_memory_usage() as i64);
779 self.prefetch_memory_size
780 .set(self.memory_collector.get_prefetch_memory_usage() as i64);
781 self.meta_cache_usage_ratio
782 .set(self.memory_collector.get_meta_cache_memory_usage_ratio());
783 self.block_cache_usage_ratio
784 .set(self.memory_collector.get_block_cache_memory_usage_ratio());
785 self.vector_meta_cache_usage_ratio.set(
786 self.memory_collector
787 .get_vector_meta_cache_memory_usage_ratio(),
788 );
789 self.vector_data_cache_usage_ratio.set(
790 self.memory_collector
791 .get_vector_data_cache_memory_usage_ratio(),
792 );
793 self.uploading_memory_usage_ratio
794 .set(self.memory_collector.get_shared_buffer_usage_ratio());
795 self.collectors.iter().flat_map(|c| c.collect()).collect()
797 }
798}
799
800pub fn monitor_cache(memory_collector: Arc<dyn MemoryCollector>) {
801 let collector = Box::new(StateStoreCollector::new(memory_collector));
802 if let Err(e) = GLOBAL_METRICS_REGISTRY.register(collector) {
803 warn!(
804 "unable to monitor cache. May have been registered if in all-in-one deployment: {}",
805 e.as_report()
806 );
807 }
808}