risingwave_storage/monitor/
hummock_state_store_metrics.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, OnceLock};
16
17use prometheus::core::{AtomicU64, Collector, Desc, GenericCounter};
18use prometheus::{
19    Gauge, Histogram, HistogramVec, IntGauge, IntGaugeVec, Opts, Registry, exponential_buckets,
20    histogram_opts, proto, register_histogram_vec_with_registry, register_histogram_with_registry,
21    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
22    register_int_gauge_with_registry,
23};
24use risingwave_common::config::MetricLevel;
25use risingwave_common::metrics::{
26    RelabeledCounterVec, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
27    RelabeledGuardedIntGaugeVec, RelabeledHistogramVec, RelabeledMetricVec, UintGauge,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::{
31    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
32    register_guarded_int_gauge_vec_with_registry,
33};
34use thiserror_ext::AsReport;
35use tracing::warn;
36
37/// [`HummockStateStoreMetrics`] stores the performance and IO metrics of `XXXStore` such as
38/// `RocksDBStateStore` and `TikvStateStore`.
39/// In practice, keep in mind that this represents the whole Hummock utilization of
40/// a `RisingWave` instance. More granular utilization of per `materialization view`
41/// job or an executor should be collected by views like `StateStats` and `JobStats`.
42#[derive(Debug, Clone)]
43pub struct HummockStateStoreMetrics {
44    pub bloom_filter_true_negative_counts: RelabeledGuardedIntCounterVec,
45    pub bloom_filter_check_counts: RelabeledGuardedIntCounterVec,
46    pub iter_merge_sstable_counts: RelabeledHistogramVec,
47    pub vnode_pruning_counts: RelabeledGuardedIntCounterVec,
48    pub sst_store_block_request_counts: RelabeledGuardedIntCounterVec,
49    pub iter_scan_key_counts: RelabeledGuardedIntCounterVec,
50    pub get_shared_buffer_hit_counts: RelabeledCounterVec,
51    pub remote_read_time: RelabeledHistogramVec,
52    pub iter_fetch_meta_duration: RelabeledGuardedHistogramVec,
53    pub iter_fetch_meta_cache_unhits: IntGauge,
54    pub iter_slow_fetch_meta_cache_unhits: IntGauge,
55
56    pub vector_object_request_counts: RelabeledGuardedIntCounterVec,
57    pub vector_request_stats: RelabeledGuardedHistogramVec,
58    pub vector_hnsw_graph_level_node_count: RelabeledGuardedIntGaugeVec,
59    pub vector_index_file_count: RelabeledGuardedIntGaugeVec,
60    pub vector_index_file_size: RelabeledGuardedIntGaugeVec,
61
62    pub read_req_bloom_filter_positive_counts: RelabeledGuardedIntCounterVec,
63    pub read_req_positive_but_non_exist_counts: RelabeledGuardedIntCounterVec,
64    pub read_req_check_bloom_filter_counts: RelabeledGuardedIntCounterVec,
65
66    pub write_batch_tuple_counts: RelabeledGuardedIntCounterVec,
67    pub write_batch_duration: RelabeledGuardedHistogramVec,
68    pub write_batch_size: RelabeledGuardedHistogramVec,
69
70    // spill task counts from unsealed
71    pub spill_task_counts_from_unsealed: GenericCounter<AtomicU64>,
72    // spill task size from unsealed
73    pub spill_task_size_from_unsealed: GenericCounter<AtomicU64>,
74
75    // uploading task
76    pub uploader_uploading_task_size: UintGauge,
77    pub uploader_uploading_task_count: IntGauge,
78    pub uploader_imm_size: UintGauge,
79    pub uploader_upload_task_latency: Histogram,
80    pub uploader_syncing_epoch_count: IntGauge,
81    pub uploader_wait_poll_latency: Histogram,
82    pub uploader_per_table_imm_size: RelabeledGuardedIntGaugeVec,
83    pub uploader_per_table_imm_count: RelabeledGuardedIntGaugeVec,
84
85    // memory
86    pub per_table_imm_size: RelabeledGuardedIntGaugeVec,
87    pub per_table_imm_count: RelabeledGuardedIntGaugeVec,
88    pub mem_table_spill_counts: RelabeledGuardedIntCounterVec,
89    pub old_value_size: RelabeledGuardedIntGaugeVec,
90
91    // block statistics
92    pub block_efficiency_histogram: Histogram,
93
94    pub event_handler_pending_event: IntGaugeVec,
95    pub event_handler_latency: HistogramVec,
96
97    pub safe_version_hit: GenericCounter<AtomicU64>,
98    pub safe_version_miss: GenericCounter<AtomicU64>,
99}
100
101pub static GLOBAL_HUMMOCK_STATE_STORE_METRICS: OnceLock<HummockStateStoreMetrics> = OnceLock::new();
102
103pub fn global_hummock_state_store_metrics(metric_level: MetricLevel) -> HummockStateStoreMetrics {
104    GLOBAL_HUMMOCK_STATE_STORE_METRICS
105        .get_or_init(|| HummockStateStoreMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
106        .clone()
107}
108
109impl HummockStateStoreMetrics {
110    pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
111        // 10ms ~ max 2.7h
112        let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
113
114        // 1ms - 100s
115        let state_store_read_time_buckets = exponential_buckets(0.001, 10.0, 5).unwrap();
116
117        let bloom_filter_true_negative_counts = register_guarded_int_counter_vec_with_registry!(
118            "state_store_bloom_filter_true_negative_counts",
119            "Total number of sstables that have been considered true negative by bloom filters",
120            &["table_id", "type"],
121            registry
122        )
123        .unwrap();
124        let bloom_filter_true_negative_counts = RelabeledMetricVec::with_metric_level(
125            MetricLevel::Debug,
126            bloom_filter_true_negative_counts,
127            metric_level,
128        );
129
130        let bloom_filter_check_counts = register_guarded_int_counter_vec_with_registry!(
131            "state_store_bloom_filter_check_counts",
132            "Total number of read request to check bloom filters",
133            &["table_id", "type"],
134            registry
135        )
136        .unwrap();
137        let bloom_filter_check_counts = RelabeledMetricVec::with_metric_level(
138            MetricLevel::Debug,
139            bloom_filter_check_counts,
140            metric_level,
141        );
142
143        // ----- iter -----
144        let opts = histogram_opts!(
145            "state_store_iter_merge_sstable_counts",
146            "Number of child iterators merged into one MergeIterator",
147            vec![1.0, 10.0, 100.0, 1000.0, 10000.0]
148        );
149        let iter_merge_sstable_counts =
150            register_histogram_vec_with_registry!(opts, &["table_id", "type"], registry).unwrap();
151        let iter_merge_sstable_counts = RelabeledHistogramVec::with_metric_level(
152            MetricLevel::Debug,
153            iter_merge_sstable_counts,
154            metric_level,
155        );
156
157        let vnode_pruning_counts = register_guarded_int_counter_vec_with_registry!(
158            "state_store_vnode_pruning_counts",
159            "Total number of SST pruning operations by vnode key range hints",
160            &["table_id", "operation", "result"],
161            registry
162        )
163        .unwrap();
164
165        let vnode_pruning_counts = RelabeledGuardedIntCounterVec::with_metric_level(
166            MetricLevel::Debug,
167            vnode_pruning_counts,
168            metric_level,
169        );
170
171        // ----- sst store -----
172        let sst_store_block_request_counts = register_guarded_int_counter_vec_with_registry!(
173            "state_store_sst_store_block_request_counts",
174            "Total number of sst block requests that have been issued to sst store",
175            &["table_id", "type"],
176            registry
177        )
178        .unwrap();
179        let sst_store_block_request_counts = RelabeledGuardedIntCounterVec::with_metric_level(
180            MetricLevel::Info,
181            sst_store_block_request_counts,
182            metric_level,
183        );
184
185        let iter_scan_key_counts = register_guarded_int_counter_vec_with_registry!(
186            "state_store_iter_scan_key_counts",
187            "Total number of keys read by iterator",
188            &["table_id", "type"],
189            registry
190        )
191        .unwrap();
192        let iter_scan_key_counts = RelabeledGuardedIntCounterVec::with_metric_level(
193            MetricLevel::Info,
194            iter_scan_key_counts,
195            metric_level,
196        );
197
198        let get_shared_buffer_hit_counts = register_int_counter_vec_with_registry!(
199            "state_store_get_shared_buffer_hit_counts",
200            "Total number of get requests that have been fulfilled by shared buffer",
201            &["table_id"],
202            registry
203        )
204        .unwrap();
205        let get_shared_buffer_hit_counts = RelabeledCounterVec::with_metric_level(
206            MetricLevel::Debug,
207            get_shared_buffer_hit_counts,
208            metric_level,
209        );
210
211        let opts = histogram_opts!(
212            "state_store_remote_read_time_per_task",
213            "Total time of operations which read from remote storage when enable prefetch",
214            time_buckets.clone(),
215        );
216        let remote_read_time =
217            register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
218        let remote_read_time = RelabeledHistogramVec::with_metric_level(
219            MetricLevel::Debug,
220            remote_read_time,
221            metric_level,
222        );
223
224        let opts = histogram_opts!(
225            "state_store_iter_fetch_meta_duration",
226            "Histogram of iterator fetch SST meta time that have been issued to state store",
227            state_store_read_time_buckets,
228        );
229        let iter_fetch_meta_duration =
230            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
231        let iter_fetch_meta_duration = RelabeledGuardedHistogramVec::with_metric_level(
232            MetricLevel::Info,
233            iter_fetch_meta_duration,
234            metric_level,
235        );
236
237        let iter_fetch_meta_cache_unhits = register_int_gauge_with_registry!(
238            "state_store_iter_fetch_meta_cache_unhits",
239            "Number of SST meta cache unhit during one iterator meta fetch",
240            registry
241        )
242        .unwrap();
243
244        let iter_slow_fetch_meta_cache_unhits = register_int_gauge_with_registry!(
245            "state_store_iter_slow_fetch_meta_cache_unhits",
246            "Number of SST meta cache unhit during a iterator meta fetch which is slow (costs >5 seconds)",
247            registry
248        )
249        .unwrap();
250
251        // ----- vector -----
252        let vector_object_request_counts = register_guarded_int_counter_vec_with_registry!(
253            "state_store_vector_object_request_counts",
254            "Metrics about vector object requests that have been issued",
255            &["table_id", "type", "mode"],
256            registry
257        )
258        .unwrap();
259        let vector_object_request_counts = RelabeledGuardedIntCounterVec::with_metric_level(
260            MetricLevel::Critical,
261            vector_object_request_counts,
262            metric_level,
263        );
264
265        let opts = histogram_opts!(
266            "state_store_vector_request_stats",
267            "Metrics about vector requests",
268            exponential_buckets(100.0, 10.0, 5).unwrap(),
269        );
270
271        let vector_request_stats = register_guarded_histogram_vec_with_registry!(
272            opts,
273            &["table_id", "type", "mode", "top_n", "ef"],
274            registry
275        )
276        .unwrap();
277        let vector_request_stats = RelabeledGuardedHistogramVec::with_metric_level(
278            MetricLevel::Critical,
279            vector_request_stats,
280            metric_level,
281        );
282
283        let vector_hnsw_graph_level_node_count = register_guarded_int_gauge_vec_with_registry!(
284            "state_store_vector_hnsw_graph_level_node_count",
285            "Number of nodes in each level of hnsw graph",
286            &["table_id", "level"],
287            registry
288        )
289        .unwrap();
290        let vector_hnsw_graph_level_node_count = RelabeledGuardedIntGaugeVec::with_metric_level(
291            MetricLevel::Critical,
292            vector_hnsw_graph_level_node_count,
293            metric_level,
294        );
295
296        let vector_index_file_count = register_guarded_int_gauge_vec_with_registry!(
297            "state_store_vector_index_file_count",
298            "Number of vector file",
299            &["table_id"],
300            registry
301        )
302        .unwrap();
303        let vector_index_file_count = RelabeledGuardedIntGaugeVec::with_metric_level(
304            MetricLevel::Critical,
305            vector_index_file_count,
306            metric_level,
307        );
308
309        let vector_index_file_size = register_guarded_int_gauge_vec_with_registry!(
310            "state_store_vector_index_file_size",
311            "total size of vector index file",
312            &["table_id", "type"],
313            registry
314        )
315        .unwrap();
316        let vector_index_file_size = RelabeledGuardedIntGaugeVec::with_metric_level(
317            MetricLevel::Critical,
318            vector_index_file_size,
319            metric_level,
320        );
321
322        // ----- write_batch -----
323        let write_batch_tuple_counts = register_guarded_int_counter_vec_with_registry!(
324            "state_store_write_batch_tuple_counts",
325            "Total number of batched write kv pairs requests that have been issued to state store",
326            &["table_id"],
327            registry
328        )
329        .unwrap();
330        let write_batch_tuple_counts = RelabeledGuardedIntCounterVec::with_metric_level(
331            MetricLevel::Debug,
332            write_batch_tuple_counts,
333            metric_level,
334        );
335
336        let opts = histogram_opts!(
337            "state_store_write_batch_duration",
338            "Total time of batched write that have been issued to state store. With shared buffer on, this is the latency writing to the shared buffer",
339            time_buckets.clone()
340        );
341        let write_batch_duration =
342            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
343        let write_batch_duration = RelabeledGuardedHistogramVec::with_metric_level(
344            MetricLevel::Debug,
345            write_batch_duration,
346            metric_level,
347        );
348
349        let opts = histogram_opts!(
350            "state_store_write_batch_size",
351            "Total size of batched write that have been issued to state store",
352            exponential_buckets(256.0, 16.0, 7).unwrap() // min 256B ~ max 4GB
353        );
354        let write_batch_size =
355            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
356        let write_batch_size = RelabeledGuardedHistogramVec::with_metric_level(
357            MetricLevel::Debug,
358            write_batch_size,
359            metric_level,
360        );
361
362        let spill_task_counts = register_int_counter_vec_with_registry!(
363            "state_store_spill_task_counts",
364            "Total number of started spill tasks",
365            &["uploader_stage"],
366            registry
367        )
368        .unwrap();
369
370        let spill_task_size = register_int_counter_vec_with_registry!(
371            "state_store_spill_task_size",
372            "Total task of started spill tasks",
373            &["uploader_stage"],
374            registry
375        )
376        .unwrap();
377
378        let uploader_uploading_task_size = UintGauge::new(
379            "state_store_uploader_uploading_task_size",
380            "Total size of uploader uploading tasks",
381        )
382        .unwrap();
383        registry
384            .register(Box::new(uploader_uploading_task_size.clone()))
385            .unwrap();
386
387        let uploader_uploading_task_count = register_int_gauge_with_registry!(
388            "state_store_uploader_uploading_task_count",
389            "Total number of uploader uploading tasks",
390            registry
391        )
392        .unwrap();
393
394        let uploader_imm_size = UintGauge::new(
395            "state_store_uploader_imm_size",
396            "Total size of imms tracked by uploader",
397        )
398        .unwrap();
399        registry
400            .register(Box::new(uploader_imm_size.clone()))
401            .unwrap();
402
403        let opts = histogram_opts!(
404            "state_store_uploader_upload_task_latency",
405            "Latency of uploader uploading tasks",
406            time_buckets
407        );
408
409        let uploader_upload_task_latency =
410            register_histogram_with_registry!(opts, registry).unwrap();
411
412        let opts = histogram_opts!(
413            "state_store_uploader_wait_poll_latency",
414            "Latency of upload uploading task being polled after finish",
415            exponential_buckets(0.001, 5.0, 7).unwrap(), // 1ms - 15s
416        );
417
418        let uploader_wait_poll_latency = register_histogram_with_registry!(opts, registry).unwrap();
419
420        let uploader_syncing_epoch_count = register_int_gauge_with_registry!(
421            "state_store_uploader_syncing_epoch_count",
422            "Total number of syncing epoch",
423            registry
424        )
425        .unwrap();
426
427        let uploader_per_table_imm_size = register_guarded_int_gauge_vec_with_registry!(
428            "state_store_uploader_per_table_imm_size",
429            "Total uploader-tracked imm size per table",
430            &["table_id"],
431            registry
432        )
433        .unwrap();
434
435        let uploader_per_table_imm_size = RelabeledGuardedIntGaugeVec::with_metric_level(
436            MetricLevel::Debug,
437            uploader_per_table_imm_size,
438            metric_level,
439        );
440
441        let uploader_per_table_imm_count = register_guarded_int_gauge_vec_with_registry!(
442            "state_store_uploader_per_table_imm_count",
443            "Total uploader-tracked imm count per table",
444            &["table_id"],
445            registry
446        )
447        .unwrap();
448
449        let uploader_per_table_imm_count = RelabeledGuardedIntGaugeVec::with_metric_level(
450            MetricLevel::Debug,
451            uploader_per_table_imm_count,
452            metric_level,
453        );
454
455        let per_table_imm_size = register_guarded_int_gauge_vec_with_registry!(
456            "state_store_per_table_imm_size",
457            "Total imm size per table",
458            &["table_id", "fragment_id"],
459            registry
460        )
461        .unwrap();
462
463        let per_table_imm_size = RelabeledGuardedIntGaugeVec::with_metric_level_relabel_n(
464            MetricLevel::Debug,
465            per_table_imm_size,
466            metric_level,
467            1,
468        );
469
470        let per_table_imm_count = register_guarded_int_gauge_vec_with_registry!(
471            "state_store_per_table_imm_count",
472            "Total imm count per table",
473            &["table_id"],
474            registry
475        )
476        .unwrap();
477
478        let per_table_imm_count = RelabeledGuardedIntGaugeVec::with_metric_level(
479            MetricLevel::Debug,
480            per_table_imm_count,
481            metric_level,
482        );
483
484        let read_req_bloom_filter_positive_counts = register_guarded_int_counter_vec_with_registry!(
485            "state_store_read_req_bloom_filter_positive_counts",
486            "Total number of read request with at least one SST bloom filter check returns positive",
487            &["table_id", "type"],
488            registry
489        )
490        .unwrap();
491        let read_req_bloom_filter_positive_counts =
492            RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
493                MetricLevel::Info,
494                read_req_bloom_filter_positive_counts,
495                metric_level,
496                1,
497            );
498
499        let read_req_positive_but_non_exist_counts = register_guarded_int_counter_vec_with_registry!(
500            "state_store_read_req_positive_but_non_exist_counts",
501            "Total number of read request on non-existent key/prefix with at least one SST bloom filter check returns positive",
502            &["table_id", "type"],
503            registry
504        )
505        .unwrap();
506        let read_req_positive_but_non_exist_counts =
507            RelabeledGuardedIntCounterVec::with_metric_level(
508                MetricLevel::Info,
509                read_req_positive_but_non_exist_counts,
510                metric_level,
511            );
512
513        let read_req_check_bloom_filter_counts = register_guarded_int_counter_vec_with_registry!(
514            "state_store_read_req_check_bloom_filter_counts",
515            "Total number of read request that checks bloom filter with a prefix hint",
516            &["table_id", "type"],
517            registry
518        )
519        .unwrap();
520
521        let read_req_check_bloom_filter_counts = RelabeledGuardedIntCounterVec::with_metric_level(
522            MetricLevel::Info,
523            read_req_check_bloom_filter_counts,
524            metric_level,
525        );
526
527        let mem_table_spill_counts = register_guarded_int_counter_vec_with_registry!(
528            "state_store_mem_table_spill_counts",
529            "Total number of mem table spill occurs for one table",
530            &["table_id"],
531            registry
532        )
533        .unwrap();
534
535        let mem_table_spill_counts = RelabeledGuardedIntCounterVec::with_metric_level(
536            MetricLevel::Info,
537            mem_table_spill_counts,
538            metric_level,
539        );
540
541        let old_value_size = register_guarded_int_gauge_vec_with_registry!(
542            "state_store_old_value_size",
543            "The size of old value",
544            &["table_id"],
545            registry
546        )
547        .unwrap();
548
549        let old_value_size = RelabeledGuardedIntGaugeVec::with_metric_level(
550            MetricLevel::Info,
551            old_value_size,
552            metric_level,
553        );
554
555        let opts = histogram_opts!(
556            "block_efficiency_histogram",
557            "Access ratio of in-memory block.",
558            exponential_buckets(0.001, 2.0, 11).unwrap(),
559        );
560        let block_efficiency_histogram = register_histogram_with_registry!(opts, registry).unwrap();
561
562        let event_handler_pending_event = register_int_gauge_vec_with_registry!(
563            "state_store_event_handler_pending_event",
564            "The number of sent but unhandled events",
565            &["event_type"],
566            registry,
567        )
568        .unwrap();
569
570        let opts = histogram_opts!(
571            "state_store_event_handler_latency",
572            "Latency to handle event",
573            exponential_buckets(0.001, 5.0, 7).unwrap(), // 1ms - 15s
574        );
575
576        let event_handler_latency =
577            register_histogram_vec_with_registry!(opts, &["event_type"], registry).unwrap();
578
579        let safe_version_hit = GenericCounter::new(
580            "state_store_safe_version_hit",
581            "The total count of a safe version that can be retrieved successfully",
582        )
583        .unwrap();
584        registry
585            .register(Box::new(safe_version_hit.clone()))
586            .unwrap();
587
588        let safe_version_miss = GenericCounter::new(
589            "state_store_safe_version_miss",
590            "The total count of a safe version that cannot be retrieved",
591        )
592        .unwrap();
593        registry
594            .register(Box::new(safe_version_miss.clone()))
595            .unwrap();
596
597        Self {
598            bloom_filter_true_negative_counts,
599            bloom_filter_check_counts,
600            iter_merge_sstable_counts,
601            vnode_pruning_counts,
602            sst_store_block_request_counts,
603            iter_scan_key_counts,
604            get_shared_buffer_hit_counts,
605            remote_read_time,
606            iter_fetch_meta_duration,
607            iter_fetch_meta_cache_unhits,
608            iter_slow_fetch_meta_cache_unhits,
609            vector_object_request_counts,
610            vector_request_stats,
611            vector_hnsw_graph_level_node_count,
612            vector_index_file_count,
613            vector_index_file_size,
614            read_req_bloom_filter_positive_counts,
615            read_req_positive_but_non_exist_counts,
616            read_req_check_bloom_filter_counts,
617            write_batch_tuple_counts,
618            write_batch_duration,
619            write_batch_size,
620            spill_task_counts_from_unsealed: spill_task_counts.with_label_values(&["unsealed"]),
621            spill_task_size_from_unsealed: spill_task_size.with_label_values(&["unsealed"]),
622            uploader_uploading_task_size,
623            uploader_uploading_task_count,
624            uploader_imm_size,
625            uploader_upload_task_latency,
626            uploader_syncing_epoch_count,
627            uploader_wait_poll_latency,
628            uploader_per_table_imm_size,
629            uploader_per_table_imm_count,
630            per_table_imm_size,
631            per_table_imm_count,
632            mem_table_spill_counts,
633            old_value_size,
634
635            block_efficiency_histogram,
636            event_handler_pending_event,
637            event_handler_latency,
638            safe_version_hit,
639            safe_version_miss,
640        }
641    }
642
643    pub fn unused() -> Self {
644        global_hummock_state_store_metrics(MetricLevel::Disabled)
645    }
646}
647
648pub trait MemoryCollector: Sync + Send {
649    fn get_meta_memory_usage(&self) -> u64;
650    fn get_data_memory_usage(&self) -> u64;
651    fn get_vector_meta_memory_usage(&self) -> u64;
652    fn get_vector_data_memory_usage(&self) -> u64;
653    fn get_uploading_memory_usage(&self) -> u64;
654    fn get_prefetch_memory_usage(&self) -> usize;
655    fn get_meta_cache_memory_usage_ratio(&self) -> f64;
656    fn get_block_cache_memory_usage_ratio(&self) -> f64;
657    fn get_vector_meta_cache_memory_usage_ratio(&self) -> f64;
658    fn get_vector_data_cache_memory_usage_ratio(&self) -> f64;
659    fn get_shared_buffer_usage_ratio(&self) -> f64;
660}
661
662#[derive(Clone)]
663struct StateStoreCollector {
664    memory_collector: Arc<dyn MemoryCollector>,
665    collectors: Vec<Arc<dyn Collector>>,
666    block_cache_size: IntGauge,
667    meta_cache_size: IntGauge,
668    vector_data_cache_size: IntGauge,
669    vector_meta_cache_size: IntGauge,
670    uploading_memory_size: IntGauge,
671    prefetch_memory_size: IntGauge,
672    meta_cache_usage_ratio: Gauge,
673    block_cache_usage_ratio: Gauge,
674    vector_data_cache_usage_ratio: Gauge,
675    vector_meta_cache_usage_ratio: Gauge,
676    uploading_memory_usage_ratio: Gauge,
677}
678
679impl StateStoreCollector {
680    pub fn new(memory_collector: Arc<dyn MemoryCollector>) -> Self {
681        let mut collectors = Vec::new();
682
683        let block_cache_size = IntGauge::with_opts(Opts::new(
684            "state_store_block_cache_size",
685            "the size of cache for data block cache",
686        ))
687        .unwrap();
688        collectors.push(Arc::new(block_cache_size.clone()) as _);
689
690        let block_cache_usage_ratio = Gauge::with_opts(Opts::new(
691            "state_store_block_cache_usage_ratio",
692            "the ratio of block cache to it's pre-allocated memory",
693        ))
694        .unwrap();
695        collectors.push(Arc::new(block_cache_usage_ratio.clone()) as _);
696
697        let meta_cache_size = IntGauge::with_opts(Opts::new(
698            "state_store_meta_cache_size",
699            "the size of cache for meta file cache",
700        ))
701        .unwrap();
702        collectors.push(Arc::new(meta_cache_size.clone()) as _);
703
704        let meta_cache_usage_ratio = Gauge::with_opts(Opts::new(
705            "state_store_meta_cache_usage_ratio",
706            "the ratio of meta cache to it's pre-allocated memory",
707        ))
708        .unwrap();
709        collectors.push(Arc::new(meta_cache_usage_ratio.clone()) as _);
710
711        let vector_data_cache_size = IntGauge::with_opts(Opts::new(
712            "state_store_vector_data_cache_size",
713            "the size of cache for vector data file cache",
714        ))
715        .unwrap();
716        collectors.push(Arc::new(vector_data_cache_size.clone()) as _);
717
718        let vector_data_cache_usage_ratio = Gauge::with_opts(Opts::new(
719            "state_store_vector_data_cache_usage_ratio",
720            "the ratio of vector data cache to it's pre-allocated memory",
721        ))
722        .unwrap();
723        collectors.push(Arc::new(vector_data_cache_usage_ratio.clone()) as _);
724
725        let vector_meta_cache_size = IntGauge::with_opts(Opts::new(
726            "state_store_vector_meta_cache_size",
727            "the size of cache for vector meta file cache",
728        ))
729        .unwrap();
730        collectors.push(Arc::new(vector_meta_cache_size.clone()) as _);
731
732        let vector_meta_cache_usage_ratio = Gauge::with_opts(Opts::new(
733            "state_store_vector_meta_cache_usage_ratio",
734            "the ratio of vector meta cache to it's pre-allocated memory",
735        ))
736        .unwrap();
737        collectors.push(Arc::new(vector_meta_cache_usage_ratio.clone()) as _);
738
739        let uploading_memory_size = IntGauge::with_opts(Opts::new(
740            "uploading_memory_size",
741            "the size of uploading SSTs memory usage",
742        ))
743        .unwrap();
744        collectors.push(Arc::new(uploading_memory_size.clone()) as _);
745
746        let uploading_memory_usage_ratio = Gauge::with_opts(Opts::new(
747            "state_store_uploading_memory_usage_ratio",
748            "the ratio of uploading SSTs memory usage to it's pre-allocated memory",
749        ))
750        .unwrap();
751        collectors.push(Arc::new(uploading_memory_usage_ratio.clone()) as _);
752
753        let prefetch_memory_size = IntGauge::with_opts(Opts::new(
754            "state_store_prefetch_memory_size",
755            "the size of prefetch memory usage",
756        ))
757        .unwrap();
758        collectors.push(Arc::new(prefetch_memory_size.clone()) as _);
759
760        Self {
761            memory_collector,
762            collectors,
763            block_cache_size,
764            meta_cache_size,
765            vector_data_cache_size,
766            vector_meta_cache_size,
767            uploading_memory_size,
768            prefetch_memory_size,
769            meta_cache_usage_ratio,
770            block_cache_usage_ratio,
771
772            vector_data_cache_usage_ratio,
773            vector_meta_cache_usage_ratio,
774            uploading_memory_usage_ratio,
775        }
776    }
777}
778
779impl Collector for StateStoreCollector {
780    fn desc(&self) -> Vec<&Desc> {
781        self.collectors.iter().flat_map(|c| c.desc()).collect()
782    }
783
784    fn collect(&self) -> Vec<proto::MetricFamily> {
785        self.block_cache_size
786            .set(self.memory_collector.get_data_memory_usage() as i64);
787        self.meta_cache_size
788            .set(self.memory_collector.get_meta_memory_usage() as i64);
789        self.vector_data_cache_size
790            .set(self.memory_collector.get_vector_data_memory_usage() as _);
791        self.vector_meta_cache_size
792            .set(self.memory_collector.get_vector_meta_memory_usage() as _);
793        self.uploading_memory_size
794            .set(self.memory_collector.get_uploading_memory_usage() as i64);
795        self.prefetch_memory_size
796            .set(self.memory_collector.get_prefetch_memory_usage() as i64);
797        self.meta_cache_usage_ratio
798            .set(self.memory_collector.get_meta_cache_memory_usage_ratio());
799        self.block_cache_usage_ratio
800            .set(self.memory_collector.get_block_cache_memory_usage_ratio());
801        self.vector_meta_cache_usage_ratio.set(
802            self.memory_collector
803                .get_vector_meta_cache_memory_usage_ratio(),
804        );
805        self.vector_data_cache_usage_ratio.set(
806            self.memory_collector
807                .get_vector_data_cache_memory_usage_ratio(),
808        );
809        self.uploading_memory_usage_ratio
810            .set(self.memory_collector.get_shared_buffer_usage_ratio());
811        // collect MetricFamilies.
812        self.collectors.iter().flat_map(|c| c.collect()).collect()
813    }
814}
815
816pub fn monitor_cache(memory_collector: Arc<dyn MemoryCollector>) {
817    let collector = Box::new(StateStoreCollector::new(memory_collector));
818    if let Err(e) = GLOBAL_METRICS_REGISTRY.register(collector) {
819        warn!(
820            "unable to monitor cache. May have been registered if in all-in-one deployment: {}",
821            e.as_report()
822        );
823    }
824}