risingwave_storage/monitor/
hummock_state_store_metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, OnceLock};
16
17use prometheus::core::{AtomicU64, Collector, Desc, GenericCounter};
18use prometheus::{
19    Gauge, Histogram, HistogramVec, IntGauge, Opts, Registry, exponential_buckets, histogram_opts,
20    proto, register_histogram_vec_with_registry, register_histogram_with_registry,
21    register_int_counter_vec_with_registry, register_int_gauge_with_registry,
22};
23use risingwave_common::config::MetricLevel;
24use risingwave_common::metrics::{
25    RelabeledCounterVec, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
26    RelabeledGuardedIntGaugeVec, RelabeledHistogramVec, RelabeledMetricVec, UintGauge,
27};
28use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
29use risingwave_common::{
30    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
31    register_guarded_int_gauge_vec_with_registry,
32};
33use thiserror_ext::AsReport;
34use tracing::warn;
35
36/// [`HummockStateStoreMetrics`] stores the performance and IO metrics of `XXXStore` such as
37/// `RocksDBStateStore` and `TikvStateStore`.
38/// In practice, keep in mind that this represents the whole Hummock utilization of
39/// a `RisingWave` instance. More granular utilization of per `materialization view`
40/// job or an executor should be collected by views like `StateStats` and `JobStats`.
41#[derive(Debug, Clone)]
42pub struct HummockStateStoreMetrics {
43    pub bloom_filter_true_negative_counts: RelabeledGuardedIntCounterVec,
44    pub bloom_filter_check_counts: RelabeledGuardedIntCounterVec,
45    pub iter_merge_sstable_counts: RelabeledHistogramVec,
46    pub sst_store_block_request_counts: RelabeledGuardedIntCounterVec,
47    pub iter_scan_key_counts: RelabeledGuardedIntCounterVec,
48    pub get_shared_buffer_hit_counts: RelabeledCounterVec,
49    pub remote_read_time: RelabeledHistogramVec,
50    pub iter_fetch_meta_duration: RelabeledGuardedHistogramVec,
51    pub iter_fetch_meta_cache_unhits: IntGauge,
52    pub iter_slow_fetch_meta_cache_unhits: IntGauge,
53
54    pub vector_object_request_counts: RelabeledGuardedIntCounterVec,
55    pub vector_request_stats: RelabeledGuardedHistogramVec,
56    pub vector_hnsw_graph_level_node_count: RelabeledGuardedIntGaugeVec,
57    pub vector_index_file_count: RelabeledGuardedIntGaugeVec,
58    pub vector_index_file_size: RelabeledGuardedIntGaugeVec,
59
60    pub read_req_bloom_filter_positive_counts: RelabeledGuardedIntCounterVec,
61    pub read_req_positive_but_non_exist_counts: RelabeledGuardedIntCounterVec,
62    pub read_req_check_bloom_filter_counts: RelabeledGuardedIntCounterVec,
63
64    pub write_batch_tuple_counts: RelabeledCounterVec,
65    pub write_batch_duration: RelabeledHistogramVec,
66    pub write_batch_size: RelabeledHistogramVec,
67
68    // finished task counts
69    pub merge_imm_task_counts: RelabeledCounterVec,
70    // merge imm ops
71    pub merge_imm_batch_memory_sz: RelabeledCounterVec,
72
73    // spill task counts from unsealed
74    pub spill_task_counts_from_unsealed: GenericCounter<AtomicU64>,
75    // spill task size from unsealed
76    pub spill_task_size_from_unsealed: GenericCounter<AtomicU64>,
77    // spill task counts from sealed
78    pub spill_task_counts_from_sealed: GenericCounter<AtomicU64>,
79    // spill task size from sealed
80    pub spill_task_size_from_sealed: GenericCounter<AtomicU64>,
81
82    // uploading task
83    pub uploader_uploading_task_size: UintGauge,
84    pub uploader_uploading_task_count: IntGauge,
85    pub uploader_imm_size: UintGauge,
86    pub uploader_upload_task_latency: Histogram,
87    pub uploader_syncing_epoch_count: IntGauge,
88    pub uploader_wait_poll_latency: Histogram,
89
90    // memory
91    pub mem_table_spill_counts: RelabeledCounterVec,
92    pub old_value_size: IntGauge,
93
94    // block statistics
95    pub block_efficiency_histogram: Histogram,
96
97    pub event_handler_pending_event: IntGauge,
98    pub event_handler_latency: HistogramVec,
99
100    pub safe_version_hit: GenericCounter<AtomicU64>,
101    pub safe_version_miss: GenericCounter<AtomicU64>,
102}
103
104pub static GLOBAL_HUMMOCK_STATE_STORE_METRICS: OnceLock<HummockStateStoreMetrics> = OnceLock::new();
105
106pub fn global_hummock_state_store_metrics(metric_level: MetricLevel) -> HummockStateStoreMetrics {
107    GLOBAL_HUMMOCK_STATE_STORE_METRICS
108        .get_or_init(|| HummockStateStoreMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
109        .clone()
110}
111
112impl HummockStateStoreMetrics {
113    pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
114        // 10ms ~ max 2.7h
115        let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
116
117        // 1ms - 100s
118        let state_store_read_time_buckets = exponential_buckets(0.001, 10.0, 5).unwrap();
119
120        let bloom_filter_true_negative_counts = register_guarded_int_counter_vec_with_registry!(
121            "state_store_bloom_filter_true_negative_counts",
122            "Total number of sstables that have been considered true negative by bloom filters",
123            &["table_id", "type"],
124            registry
125        )
126        .unwrap();
127        let bloom_filter_true_negative_counts = RelabeledMetricVec::with_metric_level(
128            MetricLevel::Debug,
129            bloom_filter_true_negative_counts,
130            metric_level,
131        );
132
133        let bloom_filter_check_counts = register_guarded_int_counter_vec_with_registry!(
134            "state_store_bloom_filter_check_counts",
135            "Total number of read request to check bloom filters",
136            &["table_id", "type"],
137            registry
138        )
139        .unwrap();
140        let bloom_filter_check_counts = RelabeledMetricVec::with_metric_level(
141            MetricLevel::Debug,
142            bloom_filter_check_counts,
143            metric_level,
144        );
145
146        // ----- iter -----
147        let opts = histogram_opts!(
148            "state_store_iter_merge_sstable_counts",
149            "Number of child iterators merged into one MergeIterator",
150            vec![1.0, 10.0, 100.0, 1000.0, 10000.0]
151        );
152        let iter_merge_sstable_counts =
153            register_histogram_vec_with_registry!(opts, &["table_id", "type"], registry).unwrap();
154        let iter_merge_sstable_counts = RelabeledHistogramVec::with_metric_level(
155            MetricLevel::Debug,
156            iter_merge_sstable_counts,
157            metric_level,
158        );
159
160        // ----- sst store -----
161        let sst_store_block_request_counts = register_guarded_int_counter_vec_with_registry!(
162            "state_store_sst_store_block_request_counts",
163            "Total number of sst block requests that have been issued to sst store",
164            &["table_id", "type"],
165            registry
166        )
167        .unwrap();
168        let sst_store_block_request_counts = RelabeledGuardedIntCounterVec::with_metric_level(
169            MetricLevel::Critical,
170            sst_store_block_request_counts,
171            metric_level,
172        );
173
174        let iter_scan_key_counts = register_guarded_int_counter_vec_with_registry!(
175            "state_store_iter_scan_key_counts",
176            "Total number of keys read by iterator",
177            &["table_id", "type"],
178            registry
179        )
180        .unwrap();
181        let iter_scan_key_counts = RelabeledGuardedIntCounterVec::with_metric_level(
182            MetricLevel::Info,
183            iter_scan_key_counts,
184            metric_level,
185        );
186
187        let get_shared_buffer_hit_counts = register_int_counter_vec_with_registry!(
188            "state_store_get_shared_buffer_hit_counts",
189            "Total number of get requests that have been fulfilled by shared buffer",
190            &["table_id"],
191            registry
192        )
193        .unwrap();
194        let get_shared_buffer_hit_counts = RelabeledCounterVec::with_metric_level(
195            MetricLevel::Debug,
196            get_shared_buffer_hit_counts,
197            metric_level,
198        );
199
200        let opts = histogram_opts!(
201            "state_store_remote_read_time_per_task",
202            "Total time of operations which read from remote storage when enable prefetch",
203            time_buckets.clone(),
204        );
205        let remote_read_time =
206            register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
207        let remote_read_time = RelabeledHistogramVec::with_metric_level(
208            MetricLevel::Debug,
209            remote_read_time,
210            metric_level,
211        );
212
213        let opts = histogram_opts!(
214            "state_store_iter_fetch_meta_duration",
215            "Histogram of iterator fetch SST meta time that have been issued to state store",
216            state_store_read_time_buckets,
217        );
218        let iter_fetch_meta_duration =
219            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
220        let iter_fetch_meta_duration = RelabeledGuardedHistogramVec::with_metric_level(
221            MetricLevel::Info,
222            iter_fetch_meta_duration,
223            metric_level,
224        );
225
226        let iter_fetch_meta_cache_unhits = register_int_gauge_with_registry!(
227            "state_store_iter_fetch_meta_cache_unhits",
228            "Number of SST meta cache unhit during one iterator meta fetch",
229            registry
230        )
231        .unwrap();
232
233        let iter_slow_fetch_meta_cache_unhits = register_int_gauge_with_registry!(
234            "state_store_iter_slow_fetch_meta_cache_unhits",
235            "Number of SST meta cache unhit during a iterator meta fetch which is slow (costs >5 seconds)",
236            registry
237        )
238        .unwrap();
239
240        // ----- vector -----
241        let vector_object_request_counts = register_guarded_int_counter_vec_with_registry!(
242            "state_store_vector_object_request_counts",
243            "Metrics about vector object requests that have been issued",
244            &["table_id", "type", "mode"],
245            registry
246        )
247        .unwrap();
248        let vector_object_request_counts = RelabeledGuardedIntCounterVec::with_metric_level(
249            MetricLevel::Critical,
250            vector_object_request_counts,
251            metric_level,
252        );
253
254        let opts = histogram_opts!(
255            "state_store_vector_request_stats",
256            "Metrics about vector requests",
257            exponential_buckets(100.0, 10.0, 5).unwrap(),
258        );
259
260        let vector_request_stats = register_guarded_histogram_vec_with_registry!(
261            opts,
262            &["table_id", "type", "mode", "top_n", "ef"],
263            registry
264        )
265        .unwrap();
266        let vector_request_stats = RelabeledGuardedHistogramVec::with_metric_level(
267            MetricLevel::Critical,
268            vector_request_stats,
269            metric_level,
270        );
271
272        let vector_hnsw_graph_level_node_count = register_guarded_int_gauge_vec_with_registry!(
273            "state_store_vector_hnsw_graph_level_node_count",
274            "Number of nodes in each level of hnsw graph",
275            &["table_id", "level"],
276            registry
277        )
278        .unwrap();
279        let vector_hnsw_graph_level_node_count = RelabeledGuardedIntGaugeVec::with_metric_level(
280            MetricLevel::Critical,
281            vector_hnsw_graph_level_node_count,
282            metric_level,
283        );
284
285        let vector_index_file_count = register_guarded_int_gauge_vec_with_registry!(
286            "state_store_vector_index_file_count",
287            "Number of vector file",
288            &["table_id"],
289            registry
290        )
291        .unwrap();
292        let vector_index_file_count = RelabeledGuardedIntGaugeVec::with_metric_level(
293            MetricLevel::Critical,
294            vector_index_file_count,
295            metric_level,
296        );
297
298        let vector_index_file_size = register_guarded_int_gauge_vec_with_registry!(
299            "state_store_vector_index_file_size",
300            "total size of vector index file",
301            &["table_id", "type"],
302            registry
303        )
304        .unwrap();
305        let vector_index_file_size = RelabeledGuardedIntGaugeVec::with_metric_level(
306            MetricLevel::Critical,
307            vector_index_file_size,
308            metric_level,
309        );
310
311        // ----- write_batch -----
312        let write_batch_tuple_counts = register_int_counter_vec_with_registry!(
313            "state_store_write_batch_tuple_counts",
314            "Total number of batched write kv pairs requests that have been issued to state store",
315            &["table_id"],
316            registry
317        )
318        .unwrap();
319        let write_batch_tuple_counts = RelabeledCounterVec::with_metric_level(
320            MetricLevel::Debug,
321            write_batch_tuple_counts,
322            metric_level,
323        );
324
325        let opts = histogram_opts!(
326            "state_store_write_batch_duration",
327            "Total time of batched write that have been issued to state store. With shared buffer on, this is the latency writing to the shared buffer",
328            time_buckets.clone()
329        );
330        let write_batch_duration =
331            register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
332        let write_batch_duration = RelabeledHistogramVec::with_metric_level(
333            MetricLevel::Debug,
334            write_batch_duration,
335            metric_level,
336        );
337
338        let opts = histogram_opts!(
339            "state_store_write_batch_size",
340            "Total size of batched write that have been issued to state store",
341            exponential_buckets(256.0, 16.0, 7).unwrap() // min 256B ~ max 4GB
342        );
343        let write_batch_size =
344            register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
345        let write_batch_size = RelabeledHistogramVec::with_metric_level(
346            MetricLevel::Debug,
347            write_batch_size,
348            metric_level,
349        );
350
351        let merge_imm_task_counts = register_int_counter_vec_with_registry!(
352            "state_store_merge_imm_task_counts",
353            "Total number of merge imm task that have been finished",
354            &["table_id"],
355            registry
356        )
357        .unwrap();
358        let merge_imm_task_counts = RelabeledCounterVec::with_metric_level(
359            MetricLevel::Debug,
360            merge_imm_task_counts,
361            metric_level,
362        );
363
364        let merge_imm_batch_memory_sz = register_int_counter_vec_with_registry!(
365            "state_store_merge_imm_memory_sz",
366            "Number of imm batches that have been merged by a merge task",
367            &["table_id"],
368            registry
369        )
370        .unwrap();
371        let merge_imm_batch_memory_sz = RelabeledCounterVec::with_metric_level(
372            MetricLevel::Debug,
373            merge_imm_batch_memory_sz,
374            metric_level,
375        );
376
377        let spill_task_counts = register_int_counter_vec_with_registry!(
378            "state_store_spill_task_counts",
379            "Total number of started spill tasks",
380            &["uploader_stage"],
381            registry
382        )
383        .unwrap();
384
385        let spill_task_size = register_int_counter_vec_with_registry!(
386            "state_store_spill_task_size",
387            "Total task of started spill tasks",
388            &["uploader_stage"],
389            registry
390        )
391        .unwrap();
392
393        let uploader_uploading_task_size = UintGauge::new(
394            "state_store_uploader_uploading_task_size",
395            "Total size of uploader uploading tasks",
396        )
397        .unwrap();
398        registry
399            .register(Box::new(uploader_uploading_task_size.clone()))
400            .unwrap();
401
402        let uploader_uploading_task_count = register_int_gauge_with_registry!(
403            "state_store_uploader_uploading_task_count",
404            "Total number of uploader uploading tasks",
405            registry
406        )
407        .unwrap();
408
409        let uploader_imm_size = UintGauge::new(
410            "state_store_uploader_imm_size",
411            "Total size of imms tracked by uploader",
412        )
413        .unwrap();
414        registry
415            .register(Box::new(uploader_imm_size.clone()))
416            .unwrap();
417
418        let opts = histogram_opts!(
419            "state_store_uploader_upload_task_latency",
420            "Latency of uploader uploading tasks",
421            time_buckets
422        );
423
424        let uploader_upload_task_latency =
425            register_histogram_with_registry!(opts, registry).unwrap();
426
427        let opts = histogram_opts!(
428            "state_store_uploader_wait_poll_latency",
429            "Latency of upload uploading task being polled after finish",
430            exponential_buckets(0.001, 5.0, 7).unwrap(), // 1ms - 15s
431        );
432
433        let uploader_wait_poll_latency = register_histogram_with_registry!(opts, registry).unwrap();
434
435        let uploader_syncing_epoch_count = register_int_gauge_with_registry!(
436            "state_store_uploader_syncing_epoch_count",
437            "Total number of syncing epoch",
438            registry
439        )
440        .unwrap();
441
442        let read_req_bloom_filter_positive_counts = register_guarded_int_counter_vec_with_registry!(
443            "state_store_read_req_bloom_filter_positive_counts",
444            "Total number of read request with at least one SST bloom filter check returns positive",
445            &["table_id", "type"],
446            registry
447        )
448        .unwrap();
449        let read_req_bloom_filter_positive_counts =
450            RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
451                MetricLevel::Info,
452                read_req_bloom_filter_positive_counts,
453                metric_level,
454                1,
455            );
456
457        let read_req_positive_but_non_exist_counts = register_guarded_int_counter_vec_with_registry!(
458            "state_store_read_req_positive_but_non_exist_counts",
459            "Total number of read request on non-existent key/prefix with at least one SST bloom filter check returns positive",
460            &["table_id", "type"],
461            registry
462        )
463        .unwrap();
464        let read_req_positive_but_non_exist_counts =
465            RelabeledGuardedIntCounterVec::with_metric_level(
466                MetricLevel::Info,
467                read_req_positive_but_non_exist_counts,
468                metric_level,
469            );
470
471        let read_req_check_bloom_filter_counts = register_guarded_int_counter_vec_with_registry!(
472            "state_store_read_req_check_bloom_filter_counts",
473            "Total number of read request that checks bloom filter with a prefix hint",
474            &["table_id", "type"],
475            registry
476        )
477        .unwrap();
478
479        let read_req_check_bloom_filter_counts = RelabeledGuardedIntCounterVec::with_metric_level(
480            MetricLevel::Info,
481            read_req_check_bloom_filter_counts,
482            metric_level,
483        );
484
485        let mem_table_spill_counts = register_int_counter_vec_with_registry!(
486            "state_store_mem_table_spill_counts",
487            "Total number of mem table spill occurs for one table",
488            &["table_id"],
489            registry
490        )
491        .unwrap();
492
493        let mem_table_spill_counts = RelabeledCounterVec::with_metric_level(
494            MetricLevel::Info,
495            mem_table_spill_counts,
496            metric_level,
497        );
498
499        let old_value_size = register_int_gauge_with_registry!(
500            "state_store_old_value_size",
501            "The size of old value",
502            registry
503        )
504        .unwrap();
505
506        let opts = histogram_opts!(
507            "block_efficiency_histogram",
508            "Access ratio of in-memory block.",
509            exponential_buckets(0.001, 2.0, 11).unwrap(),
510        );
511        let block_efficiency_histogram = register_histogram_with_registry!(opts, registry).unwrap();
512
513        let event_handler_pending_event = register_int_gauge_with_registry!(
514            "state_store_event_handler_pending_event",
515            "The number of sent but unhandled events",
516            registry,
517        )
518        .unwrap();
519
520        let opts = histogram_opts!(
521            "state_store_event_handler_latency",
522            "Latency to handle event",
523            exponential_buckets(0.001, 5.0, 7).unwrap(), // 1ms - 15s
524        );
525
526        let event_handler_latency =
527            register_histogram_vec_with_registry!(opts, &["event_type"], registry).unwrap();
528
529        let safe_version_hit = GenericCounter::new(
530            "state_store_safe_version_hit",
531            "The total count of a safe version that can be retrieved successfully",
532        )
533        .unwrap();
534        registry
535            .register(Box::new(safe_version_hit.clone()))
536            .unwrap();
537
538        let safe_version_miss = GenericCounter::new(
539            "state_store_safe_version_miss",
540            "The total count of a safe version that cannot be retrieved",
541        )
542        .unwrap();
543        registry
544            .register(Box::new(safe_version_miss.clone()))
545            .unwrap();
546
547        Self {
548            bloom_filter_true_negative_counts,
549            bloom_filter_check_counts,
550            iter_merge_sstable_counts,
551            sst_store_block_request_counts,
552            iter_scan_key_counts,
553            get_shared_buffer_hit_counts,
554            remote_read_time,
555            iter_fetch_meta_duration,
556            iter_fetch_meta_cache_unhits,
557            iter_slow_fetch_meta_cache_unhits,
558            vector_object_request_counts,
559            vector_request_stats,
560            vector_hnsw_graph_level_node_count,
561            vector_index_file_count,
562            vector_index_file_size,
563            read_req_bloom_filter_positive_counts,
564            read_req_positive_but_non_exist_counts,
565            read_req_check_bloom_filter_counts,
566            write_batch_tuple_counts,
567            write_batch_duration,
568            write_batch_size,
569            merge_imm_task_counts,
570            merge_imm_batch_memory_sz,
571            spill_task_counts_from_sealed: spill_task_counts.with_label_values(&["sealed"]),
572            spill_task_counts_from_unsealed: spill_task_counts.with_label_values(&["unsealed"]),
573            spill_task_size_from_sealed: spill_task_size.with_label_values(&["sealed"]),
574            spill_task_size_from_unsealed: spill_task_size.with_label_values(&["unsealed"]),
575            uploader_uploading_task_size,
576            uploader_uploading_task_count,
577            uploader_imm_size,
578            uploader_upload_task_latency,
579            uploader_syncing_epoch_count,
580            uploader_wait_poll_latency,
581            mem_table_spill_counts,
582            old_value_size,
583
584            block_efficiency_histogram,
585            event_handler_pending_event,
586            event_handler_latency,
587            safe_version_hit,
588            safe_version_miss,
589        }
590    }
591
592    pub fn unused() -> Self {
593        global_hummock_state_store_metrics(MetricLevel::Disabled)
594    }
595}
596
597pub trait MemoryCollector: Sync + Send {
598    fn get_meta_memory_usage(&self) -> u64;
599    fn get_data_memory_usage(&self) -> u64;
600    fn get_vector_meta_memory_usage(&self) -> u64;
601    fn get_vector_data_memory_usage(&self) -> u64;
602    fn get_uploading_memory_usage(&self) -> u64;
603    fn get_prefetch_memory_usage(&self) -> usize;
604    fn get_meta_cache_memory_usage_ratio(&self) -> f64;
605    fn get_block_cache_memory_usage_ratio(&self) -> f64;
606    fn get_vector_meta_cache_memory_usage_ratio(&self) -> f64;
607    fn get_vector_data_cache_memory_usage_ratio(&self) -> f64;
608    fn get_shared_buffer_usage_ratio(&self) -> f64;
609}
610
611#[derive(Clone)]
612struct StateStoreCollector {
613    memory_collector: Arc<dyn MemoryCollector>,
614    collectors: Vec<Arc<dyn Collector>>,
615    block_cache_size: IntGauge,
616    meta_cache_size: IntGauge,
617    vector_data_cache_size: IntGauge,
618    vector_meta_cache_size: IntGauge,
619    uploading_memory_size: IntGauge,
620    prefetch_memory_size: IntGauge,
621    meta_cache_usage_ratio: Gauge,
622    block_cache_usage_ratio: Gauge,
623    vector_data_cache_usage_ratio: Gauge,
624    vector_meta_cache_usage_ratio: Gauge,
625    uploading_memory_usage_ratio: Gauge,
626}
627
628impl StateStoreCollector {
629    pub fn new(memory_collector: Arc<dyn MemoryCollector>) -> Self {
630        let mut collectors = Vec::new();
631
632        let block_cache_size = IntGauge::with_opts(Opts::new(
633            "state_store_block_cache_size",
634            "the size of cache for data block cache",
635        ))
636        .unwrap();
637        collectors.push(Arc::new(block_cache_size.clone()) as _);
638
639        let block_cache_usage_ratio = Gauge::with_opts(Opts::new(
640            "state_store_block_cache_usage_ratio",
641            "the ratio of block cache to it's pre-allocated memory",
642        ))
643        .unwrap();
644        collectors.push(Arc::new(block_cache_usage_ratio.clone()) as _);
645
646        let meta_cache_size = IntGauge::with_opts(Opts::new(
647            "state_store_meta_cache_size",
648            "the size of cache for meta file cache",
649        ))
650        .unwrap();
651        collectors.push(Arc::new(meta_cache_size.clone()) as _);
652
653        let meta_cache_usage_ratio = Gauge::with_opts(Opts::new(
654            "state_store_meta_cache_usage_ratio",
655            "the ratio of meta cache to it's pre-allocated memory",
656        ))
657        .unwrap();
658        collectors.push(Arc::new(meta_cache_usage_ratio.clone()) as _);
659
660        let vector_data_cache_size = IntGauge::with_opts(Opts::new(
661            "state_store_vector_data_cache_size",
662            "the size of cache for vector data file cache",
663        ))
664        .unwrap();
665        collectors.push(Arc::new(vector_data_cache_size.clone()) as _);
666
667        let vector_data_cache_usage_ratio = Gauge::with_opts(Opts::new(
668            "state_store_vector_data_cache_usage_ratio",
669            "the ratio of vector data cache to it's pre-allocated memory",
670        ))
671        .unwrap();
672        collectors.push(Arc::new(vector_data_cache_usage_ratio.clone()) as _);
673
674        let vector_meta_cache_size = IntGauge::with_opts(Opts::new(
675            "state_store_vector_meta_cache_size",
676            "the size of cache for vector meta file cache",
677        ))
678        .unwrap();
679        collectors.push(Arc::new(vector_meta_cache_size.clone()) as _);
680
681        let vector_meta_cache_usage_ratio = Gauge::with_opts(Opts::new(
682            "state_store_vector_meta_cache_usage_ratio",
683            "the ratio of vector meta cache to it's pre-allocated memory",
684        ))
685        .unwrap();
686        collectors.push(Arc::new(vector_meta_cache_usage_ratio.clone()) as _);
687
688        let uploading_memory_size = IntGauge::with_opts(Opts::new(
689            "uploading_memory_size",
690            "the size of uploading SSTs memory usage",
691        ))
692        .unwrap();
693        collectors.push(Arc::new(uploading_memory_size.clone()) as _);
694
695        let uploading_memory_usage_ratio = Gauge::with_opts(Opts::new(
696            "state_store_uploading_memory_usage_ratio",
697            "the ratio of uploading SSTs memory usage to it's pre-allocated memory",
698        ))
699        .unwrap();
700        collectors.push(Arc::new(uploading_memory_usage_ratio.clone()) as _);
701
702        let prefetch_memory_size = IntGauge::with_opts(Opts::new(
703            "state_store_prefetch_memory_size",
704            "the size of prefetch memory usage",
705        ))
706        .unwrap();
707        collectors.push(Arc::new(prefetch_memory_size.clone()) as _);
708
709        Self {
710            memory_collector,
711            collectors,
712            block_cache_size,
713            meta_cache_size,
714            vector_data_cache_size,
715            vector_meta_cache_size,
716            uploading_memory_size,
717            prefetch_memory_size,
718            meta_cache_usage_ratio,
719            block_cache_usage_ratio,
720
721            vector_data_cache_usage_ratio,
722            vector_meta_cache_usage_ratio,
723            uploading_memory_usage_ratio,
724        }
725    }
726}
727
728impl Collector for StateStoreCollector {
729    fn desc(&self) -> Vec<&Desc> {
730        self.collectors.iter().flat_map(|c| c.desc()).collect()
731    }
732
733    fn collect(&self) -> Vec<proto::MetricFamily> {
734        self.block_cache_size
735            .set(self.memory_collector.get_data_memory_usage() as i64);
736        self.meta_cache_size
737            .set(self.memory_collector.get_meta_memory_usage() as i64);
738        self.vector_data_cache_size
739            .set(self.memory_collector.get_vector_data_memory_usage() as _);
740        self.vector_meta_cache_size
741            .set(self.memory_collector.get_vector_meta_memory_usage() as _);
742        self.uploading_memory_size
743            .set(self.memory_collector.get_uploading_memory_usage() as i64);
744        self.prefetch_memory_size
745            .set(self.memory_collector.get_prefetch_memory_usage() as i64);
746        self.meta_cache_usage_ratio
747            .set(self.memory_collector.get_meta_cache_memory_usage_ratio());
748        self.block_cache_usage_ratio
749            .set(self.memory_collector.get_block_cache_memory_usage_ratio());
750        self.vector_meta_cache_usage_ratio.set(
751            self.memory_collector
752                .get_vector_meta_cache_memory_usage_ratio(),
753        );
754        self.vector_data_cache_usage_ratio.set(
755            self.memory_collector
756                .get_vector_data_cache_memory_usage_ratio(),
757        );
758        self.uploading_memory_usage_ratio
759            .set(self.memory_collector.get_shared_buffer_usage_ratio());
760        // collect MetricFamilies.
761        self.collectors.iter().flat_map(|c| c.collect()).collect()
762    }
763}
764
765pub fn monitor_cache(memory_collector: Arc<dyn MemoryCollector>) {
766    let collector = Box::new(StateStoreCollector::new(memory_collector));
767    if let Err(e) = GLOBAL_METRICS_REGISTRY.register(collector) {
768        warn!(
769            "unable to monitor cache. May have been registered if in all-in-one deployment: {}",
770            e.as_report()
771        );
772    }
773}