risingwave_storage/monitor/
hummock_state_store_metrics.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, OnceLock};
16
17use prometheus::core::{AtomicU64, Collector, Desc, GenericCounter};
18use prometheus::{
19    Gauge, Histogram, HistogramVec, IntGauge, IntGaugeVec, Opts, Registry, exponential_buckets,
20    histogram_opts, proto, register_histogram_vec_with_registry, register_histogram_with_registry,
21    register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
22    register_int_gauge_with_registry,
23};
24use risingwave_common::config::MetricLevel;
25use risingwave_common::metrics::{
26    RelabeledCounterVec, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
27    RelabeledGuardedIntGaugeVec, RelabeledHistogramVec, RelabeledMetricVec, UintGauge,
28};
29use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
30use risingwave_common::{
31    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
32    register_guarded_int_gauge_vec_with_registry,
33};
34use thiserror_ext::AsReport;
35use tracing::warn;
36
37/// [`HummockStateStoreMetrics`] stores the performance and IO metrics of `XXXStore` such as
38/// `RocksDBStateStore` and `TikvStateStore`.
39/// In practice, keep in mind that this represents the whole Hummock utilization of
40/// a `RisingWave` instance. More granular utilization of per `materialization view`
41/// job or an executor should be collected by views like `StateStats` and `JobStats`.
42#[derive(Debug, Clone)]
43pub struct HummockStateStoreMetrics {
44    pub bloom_filter_true_negative_counts: RelabeledGuardedIntCounterVec,
45    pub bloom_filter_check_counts: RelabeledGuardedIntCounterVec,
46    pub iter_merge_sstable_counts: RelabeledHistogramVec,
47    pub sst_store_block_request_counts: RelabeledGuardedIntCounterVec,
48    pub iter_scan_key_counts: RelabeledGuardedIntCounterVec,
49    pub get_shared_buffer_hit_counts: RelabeledCounterVec,
50    pub remote_read_time: RelabeledHistogramVec,
51    pub iter_fetch_meta_duration: RelabeledGuardedHistogramVec,
52    pub iter_fetch_meta_cache_unhits: IntGauge,
53    pub iter_slow_fetch_meta_cache_unhits: IntGauge,
54
55    pub vector_object_request_counts: RelabeledGuardedIntCounterVec,
56    pub vector_request_stats: RelabeledGuardedHistogramVec,
57    pub vector_hnsw_graph_level_node_count: RelabeledGuardedIntGaugeVec,
58    pub vector_index_file_count: RelabeledGuardedIntGaugeVec,
59    pub vector_index_file_size: RelabeledGuardedIntGaugeVec,
60
61    pub read_req_bloom_filter_positive_counts: RelabeledGuardedIntCounterVec,
62    pub read_req_positive_but_non_exist_counts: RelabeledGuardedIntCounterVec,
63    pub read_req_check_bloom_filter_counts: RelabeledGuardedIntCounterVec,
64
65    pub write_batch_tuple_counts: RelabeledGuardedIntCounterVec,
66    pub write_batch_duration: RelabeledGuardedHistogramVec,
67    pub write_batch_size: RelabeledGuardedHistogramVec,
68
69    // spill task counts from unsealed
70    pub spill_task_counts_from_unsealed: GenericCounter<AtomicU64>,
71    // spill task size from unsealed
72    pub spill_task_size_from_unsealed: GenericCounter<AtomicU64>,
73
74    // uploading task
75    pub uploader_uploading_task_size: UintGauge,
76    pub uploader_uploading_task_count: IntGauge,
77    pub uploader_imm_size: UintGauge,
78    pub uploader_upload_task_latency: Histogram,
79    pub uploader_syncing_epoch_count: IntGauge,
80    pub uploader_wait_poll_latency: Histogram,
81    pub uploader_per_table_imm_size: RelabeledGuardedIntGaugeVec,
82    pub uploader_per_table_imm_count: RelabeledGuardedIntGaugeVec,
83
84    // memory
85    pub per_table_imm_size: RelabeledGuardedIntGaugeVec,
86    pub per_table_imm_count: RelabeledGuardedIntGaugeVec,
87    pub mem_table_spill_counts: RelabeledGuardedIntCounterVec,
88    pub old_value_size: RelabeledGuardedIntGaugeVec,
89
90    // block statistics
91    pub block_efficiency_histogram: Histogram,
92
93    pub event_handler_pending_event: IntGaugeVec,
94    pub event_handler_latency: HistogramVec,
95
96    pub safe_version_hit: GenericCounter<AtomicU64>,
97    pub safe_version_miss: GenericCounter<AtomicU64>,
98}
99
100pub static GLOBAL_HUMMOCK_STATE_STORE_METRICS: OnceLock<HummockStateStoreMetrics> = OnceLock::new();
101
102pub fn global_hummock_state_store_metrics(metric_level: MetricLevel) -> HummockStateStoreMetrics {
103    GLOBAL_HUMMOCK_STATE_STORE_METRICS
104        .get_or_init(|| HummockStateStoreMetrics::new(&GLOBAL_METRICS_REGISTRY, metric_level))
105        .clone()
106}
107
108impl HummockStateStoreMetrics {
109    pub fn new(registry: &Registry, metric_level: MetricLevel) -> Self {
110        // 10ms ~ max 2.7h
111        let time_buckets = exponential_buckets(0.01, 10.0, 7).unwrap();
112
113        // 1ms - 100s
114        let state_store_read_time_buckets = exponential_buckets(0.001, 10.0, 5).unwrap();
115
116        let bloom_filter_true_negative_counts = register_guarded_int_counter_vec_with_registry!(
117            "state_store_bloom_filter_true_negative_counts",
118            "Total number of sstables that have been considered true negative by bloom filters",
119            &["table_id", "type"],
120            registry
121        )
122        .unwrap();
123        let bloom_filter_true_negative_counts = RelabeledMetricVec::with_metric_level(
124            MetricLevel::Debug,
125            bloom_filter_true_negative_counts,
126            metric_level,
127        );
128
129        let bloom_filter_check_counts = register_guarded_int_counter_vec_with_registry!(
130            "state_store_bloom_filter_check_counts",
131            "Total number of read request to check bloom filters",
132            &["table_id", "type"],
133            registry
134        )
135        .unwrap();
136        let bloom_filter_check_counts = RelabeledMetricVec::with_metric_level(
137            MetricLevel::Debug,
138            bloom_filter_check_counts,
139            metric_level,
140        );
141
142        // ----- iter -----
143        let opts = histogram_opts!(
144            "state_store_iter_merge_sstable_counts",
145            "Number of child iterators merged into one MergeIterator",
146            vec![1.0, 10.0, 100.0, 1000.0, 10000.0]
147        );
148        let iter_merge_sstable_counts =
149            register_histogram_vec_with_registry!(opts, &["table_id", "type"], registry).unwrap();
150        let iter_merge_sstable_counts = RelabeledHistogramVec::with_metric_level(
151            MetricLevel::Debug,
152            iter_merge_sstable_counts,
153            metric_level,
154        );
155
156        // ----- sst store -----
157        let sst_store_block_request_counts = register_guarded_int_counter_vec_with_registry!(
158            "state_store_sst_store_block_request_counts",
159            "Total number of sst block requests that have been issued to sst store",
160            &["table_id", "type"],
161            registry
162        )
163        .unwrap();
164        let sst_store_block_request_counts = RelabeledGuardedIntCounterVec::with_metric_level(
165            MetricLevel::Info,
166            sst_store_block_request_counts,
167            metric_level,
168        );
169
170        let iter_scan_key_counts = register_guarded_int_counter_vec_with_registry!(
171            "state_store_iter_scan_key_counts",
172            "Total number of keys read by iterator",
173            &["table_id", "type"],
174            registry
175        )
176        .unwrap();
177        let iter_scan_key_counts = RelabeledGuardedIntCounterVec::with_metric_level(
178            MetricLevel::Info,
179            iter_scan_key_counts,
180            metric_level,
181        );
182
183        let get_shared_buffer_hit_counts = register_int_counter_vec_with_registry!(
184            "state_store_get_shared_buffer_hit_counts",
185            "Total number of get requests that have been fulfilled by shared buffer",
186            &["table_id"],
187            registry
188        )
189        .unwrap();
190        let get_shared_buffer_hit_counts = RelabeledCounterVec::with_metric_level(
191            MetricLevel::Debug,
192            get_shared_buffer_hit_counts,
193            metric_level,
194        );
195
196        let opts = histogram_opts!(
197            "state_store_remote_read_time_per_task",
198            "Total time of operations which read from remote storage when enable prefetch",
199            time_buckets.clone(),
200        );
201        let remote_read_time =
202            register_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
203        let remote_read_time = RelabeledHistogramVec::with_metric_level(
204            MetricLevel::Debug,
205            remote_read_time,
206            metric_level,
207        );
208
209        let opts = histogram_opts!(
210            "state_store_iter_fetch_meta_duration",
211            "Histogram of iterator fetch SST meta time that have been issued to state store",
212            state_store_read_time_buckets,
213        );
214        let iter_fetch_meta_duration =
215            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
216        let iter_fetch_meta_duration = RelabeledGuardedHistogramVec::with_metric_level(
217            MetricLevel::Info,
218            iter_fetch_meta_duration,
219            metric_level,
220        );
221
222        let iter_fetch_meta_cache_unhits = register_int_gauge_with_registry!(
223            "state_store_iter_fetch_meta_cache_unhits",
224            "Number of SST meta cache unhit during one iterator meta fetch",
225            registry
226        )
227        .unwrap();
228
229        let iter_slow_fetch_meta_cache_unhits = register_int_gauge_with_registry!(
230            "state_store_iter_slow_fetch_meta_cache_unhits",
231            "Number of SST meta cache unhit during a iterator meta fetch which is slow (costs >5 seconds)",
232            registry
233        )
234        .unwrap();
235
236        // ----- vector -----
237        let vector_object_request_counts = register_guarded_int_counter_vec_with_registry!(
238            "state_store_vector_object_request_counts",
239            "Metrics about vector object requests that have been issued",
240            &["table_id", "type", "mode"],
241            registry
242        )
243        .unwrap();
244        let vector_object_request_counts = RelabeledGuardedIntCounterVec::with_metric_level(
245            MetricLevel::Critical,
246            vector_object_request_counts,
247            metric_level,
248        );
249
250        let opts = histogram_opts!(
251            "state_store_vector_request_stats",
252            "Metrics about vector requests",
253            exponential_buckets(100.0, 10.0, 5).unwrap(),
254        );
255
256        let vector_request_stats = register_guarded_histogram_vec_with_registry!(
257            opts,
258            &["table_id", "type", "mode", "top_n", "ef"],
259            registry
260        )
261        .unwrap();
262        let vector_request_stats = RelabeledGuardedHistogramVec::with_metric_level(
263            MetricLevel::Critical,
264            vector_request_stats,
265            metric_level,
266        );
267
268        let vector_hnsw_graph_level_node_count = register_guarded_int_gauge_vec_with_registry!(
269            "state_store_vector_hnsw_graph_level_node_count",
270            "Number of nodes in each level of hnsw graph",
271            &["table_id", "level"],
272            registry
273        )
274        .unwrap();
275        let vector_hnsw_graph_level_node_count = RelabeledGuardedIntGaugeVec::with_metric_level(
276            MetricLevel::Critical,
277            vector_hnsw_graph_level_node_count,
278            metric_level,
279        );
280
281        let vector_index_file_count = register_guarded_int_gauge_vec_with_registry!(
282            "state_store_vector_index_file_count",
283            "Number of vector file",
284            &["table_id"],
285            registry
286        )
287        .unwrap();
288        let vector_index_file_count = RelabeledGuardedIntGaugeVec::with_metric_level(
289            MetricLevel::Critical,
290            vector_index_file_count,
291            metric_level,
292        );
293
294        let vector_index_file_size = register_guarded_int_gauge_vec_with_registry!(
295            "state_store_vector_index_file_size",
296            "total size of vector index file",
297            &["table_id", "type"],
298            registry
299        )
300        .unwrap();
301        let vector_index_file_size = RelabeledGuardedIntGaugeVec::with_metric_level(
302            MetricLevel::Critical,
303            vector_index_file_size,
304            metric_level,
305        );
306
307        // ----- write_batch -----
308        let write_batch_tuple_counts = register_guarded_int_counter_vec_with_registry!(
309            "state_store_write_batch_tuple_counts",
310            "Total number of batched write kv pairs requests that have been issued to state store",
311            &["table_id"],
312            registry
313        )
314        .unwrap();
315        let write_batch_tuple_counts = RelabeledGuardedIntCounterVec::with_metric_level(
316            MetricLevel::Debug,
317            write_batch_tuple_counts,
318            metric_level,
319        );
320
321        let opts = histogram_opts!(
322            "state_store_write_batch_duration",
323            "Total time of batched write that have been issued to state store. With shared buffer on, this is the latency writing to the shared buffer",
324            time_buckets.clone()
325        );
326        let write_batch_duration =
327            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
328        let write_batch_duration = RelabeledGuardedHistogramVec::with_metric_level(
329            MetricLevel::Debug,
330            write_batch_duration,
331            metric_level,
332        );
333
334        let opts = histogram_opts!(
335            "state_store_write_batch_size",
336            "Total size of batched write that have been issued to state store",
337            exponential_buckets(256.0, 16.0, 7).unwrap() // min 256B ~ max 4GB
338        );
339        let write_batch_size =
340            register_guarded_histogram_vec_with_registry!(opts, &["table_id"], registry).unwrap();
341        let write_batch_size = RelabeledGuardedHistogramVec::with_metric_level(
342            MetricLevel::Debug,
343            write_batch_size,
344            metric_level,
345        );
346
347        let spill_task_counts = register_int_counter_vec_with_registry!(
348            "state_store_spill_task_counts",
349            "Total number of started spill tasks",
350            &["uploader_stage"],
351            registry
352        )
353        .unwrap();
354
355        let spill_task_size = register_int_counter_vec_with_registry!(
356            "state_store_spill_task_size",
357            "Total task of started spill tasks",
358            &["uploader_stage"],
359            registry
360        )
361        .unwrap();
362
363        let uploader_uploading_task_size = UintGauge::new(
364            "state_store_uploader_uploading_task_size",
365            "Total size of uploader uploading tasks",
366        )
367        .unwrap();
368        registry
369            .register(Box::new(uploader_uploading_task_size.clone()))
370            .unwrap();
371
372        let uploader_uploading_task_count = register_int_gauge_with_registry!(
373            "state_store_uploader_uploading_task_count",
374            "Total number of uploader uploading tasks",
375            registry
376        )
377        .unwrap();
378
379        let uploader_imm_size = UintGauge::new(
380            "state_store_uploader_imm_size",
381            "Total size of imms tracked by uploader",
382        )
383        .unwrap();
384        registry
385            .register(Box::new(uploader_imm_size.clone()))
386            .unwrap();
387
388        let opts = histogram_opts!(
389            "state_store_uploader_upload_task_latency",
390            "Latency of uploader uploading tasks",
391            time_buckets
392        );
393
394        let uploader_upload_task_latency =
395            register_histogram_with_registry!(opts, registry).unwrap();
396
397        let opts = histogram_opts!(
398            "state_store_uploader_wait_poll_latency",
399            "Latency of upload uploading task being polled after finish",
400            exponential_buckets(0.001, 5.0, 7).unwrap(), // 1ms - 15s
401        );
402
403        let uploader_wait_poll_latency = register_histogram_with_registry!(opts, registry).unwrap();
404
405        let uploader_syncing_epoch_count = register_int_gauge_with_registry!(
406            "state_store_uploader_syncing_epoch_count",
407            "Total number of syncing epoch",
408            registry
409        )
410        .unwrap();
411
412        let uploader_per_table_imm_size = register_guarded_int_gauge_vec_with_registry!(
413            "state_store_uploader_per_table_imm_size",
414            "Total uploader-tracked imm size per table",
415            &["table_id"],
416            registry
417        )
418        .unwrap();
419
420        let uploader_per_table_imm_size = RelabeledGuardedIntGaugeVec::with_metric_level(
421            MetricLevel::Debug,
422            uploader_per_table_imm_size,
423            metric_level,
424        );
425
426        let uploader_per_table_imm_count = register_guarded_int_gauge_vec_with_registry!(
427            "state_store_uploader_per_table_imm_count",
428            "Total uploader-tracked imm count per table",
429            &["table_id"],
430            registry
431        )
432        .unwrap();
433
434        let uploader_per_table_imm_count = RelabeledGuardedIntGaugeVec::with_metric_level(
435            MetricLevel::Debug,
436            uploader_per_table_imm_count,
437            metric_level,
438        );
439
440        let per_table_imm_size = register_guarded_int_gauge_vec_with_registry!(
441            "state_store_per_table_imm_size",
442            "Total imm size per table",
443            &["table_id", "fragment_id"],
444            registry
445        )
446        .unwrap();
447
448        let per_table_imm_size = RelabeledGuardedIntGaugeVec::with_metric_level_relabel_n(
449            MetricLevel::Debug,
450            per_table_imm_size,
451            metric_level,
452            1,
453        );
454
455        let per_table_imm_count = register_guarded_int_gauge_vec_with_registry!(
456            "state_store_per_table_imm_count",
457            "Total imm count per table",
458            &["table_id"],
459            registry
460        )
461        .unwrap();
462
463        let per_table_imm_count = RelabeledGuardedIntGaugeVec::with_metric_level(
464            MetricLevel::Debug,
465            per_table_imm_count,
466            metric_level,
467        );
468
469        let read_req_bloom_filter_positive_counts = register_guarded_int_counter_vec_with_registry!(
470            "state_store_read_req_bloom_filter_positive_counts",
471            "Total number of read request with at least one SST bloom filter check returns positive",
472            &["table_id", "type"],
473            registry
474        )
475        .unwrap();
476        let read_req_bloom_filter_positive_counts =
477            RelabeledGuardedIntCounterVec::with_metric_level_relabel_n(
478                MetricLevel::Info,
479                read_req_bloom_filter_positive_counts,
480                metric_level,
481                1,
482            );
483
484        let read_req_positive_but_non_exist_counts = register_guarded_int_counter_vec_with_registry!(
485            "state_store_read_req_positive_but_non_exist_counts",
486            "Total number of read request on non-existent key/prefix with at least one SST bloom filter check returns positive",
487            &["table_id", "type"],
488            registry
489        )
490        .unwrap();
491        let read_req_positive_but_non_exist_counts =
492            RelabeledGuardedIntCounterVec::with_metric_level(
493                MetricLevel::Info,
494                read_req_positive_but_non_exist_counts,
495                metric_level,
496            );
497
498        let read_req_check_bloom_filter_counts = register_guarded_int_counter_vec_with_registry!(
499            "state_store_read_req_check_bloom_filter_counts",
500            "Total number of read request that checks bloom filter with a prefix hint",
501            &["table_id", "type"],
502            registry
503        )
504        .unwrap();
505
506        let read_req_check_bloom_filter_counts = RelabeledGuardedIntCounterVec::with_metric_level(
507            MetricLevel::Info,
508            read_req_check_bloom_filter_counts,
509            metric_level,
510        );
511
512        let mem_table_spill_counts = register_guarded_int_counter_vec_with_registry!(
513            "state_store_mem_table_spill_counts",
514            "Total number of mem table spill occurs for one table",
515            &["table_id"],
516            registry
517        )
518        .unwrap();
519
520        let mem_table_spill_counts = RelabeledGuardedIntCounterVec::with_metric_level(
521            MetricLevel::Info,
522            mem_table_spill_counts,
523            metric_level,
524        );
525
526        let old_value_size = register_guarded_int_gauge_vec_with_registry!(
527            "state_store_old_value_size",
528            "The size of old value",
529            &["table_id"],
530            registry
531        )
532        .unwrap();
533
534        let old_value_size = RelabeledGuardedIntGaugeVec::with_metric_level(
535            MetricLevel::Info,
536            old_value_size,
537            metric_level,
538        );
539
540        let opts = histogram_opts!(
541            "block_efficiency_histogram",
542            "Access ratio of in-memory block.",
543            exponential_buckets(0.001, 2.0, 11).unwrap(),
544        );
545        let block_efficiency_histogram = register_histogram_with_registry!(opts, registry).unwrap();
546
547        let event_handler_pending_event = register_int_gauge_vec_with_registry!(
548            "state_store_event_handler_pending_event",
549            "The number of sent but unhandled events",
550            &["event_type"],
551            registry,
552        )
553        .unwrap();
554
555        let opts = histogram_opts!(
556            "state_store_event_handler_latency",
557            "Latency to handle event",
558            exponential_buckets(0.001, 5.0, 7).unwrap(), // 1ms - 15s
559        );
560
561        let event_handler_latency =
562            register_histogram_vec_with_registry!(opts, &["event_type"], registry).unwrap();
563
564        let safe_version_hit = GenericCounter::new(
565            "state_store_safe_version_hit",
566            "The total count of a safe version that can be retrieved successfully",
567        )
568        .unwrap();
569        registry
570            .register(Box::new(safe_version_hit.clone()))
571            .unwrap();
572
573        let safe_version_miss = GenericCounter::new(
574            "state_store_safe_version_miss",
575            "The total count of a safe version that cannot be retrieved",
576        )
577        .unwrap();
578        registry
579            .register(Box::new(safe_version_miss.clone()))
580            .unwrap();
581
582        Self {
583            bloom_filter_true_negative_counts,
584            bloom_filter_check_counts,
585            iter_merge_sstable_counts,
586            sst_store_block_request_counts,
587            iter_scan_key_counts,
588            get_shared_buffer_hit_counts,
589            remote_read_time,
590            iter_fetch_meta_duration,
591            iter_fetch_meta_cache_unhits,
592            iter_slow_fetch_meta_cache_unhits,
593            vector_object_request_counts,
594            vector_request_stats,
595            vector_hnsw_graph_level_node_count,
596            vector_index_file_count,
597            vector_index_file_size,
598            read_req_bloom_filter_positive_counts,
599            read_req_positive_but_non_exist_counts,
600            read_req_check_bloom_filter_counts,
601            write_batch_tuple_counts,
602            write_batch_duration,
603            write_batch_size,
604            spill_task_counts_from_unsealed: spill_task_counts.with_label_values(&["unsealed"]),
605            spill_task_size_from_unsealed: spill_task_size.with_label_values(&["unsealed"]),
606            uploader_uploading_task_size,
607            uploader_uploading_task_count,
608            uploader_imm_size,
609            uploader_upload_task_latency,
610            uploader_syncing_epoch_count,
611            uploader_wait_poll_latency,
612            uploader_per_table_imm_size,
613            uploader_per_table_imm_count,
614            per_table_imm_size,
615            per_table_imm_count,
616            mem_table_spill_counts,
617            old_value_size,
618
619            block_efficiency_histogram,
620            event_handler_pending_event,
621            event_handler_latency,
622            safe_version_hit,
623            safe_version_miss,
624        }
625    }
626
627    pub fn unused() -> Self {
628        global_hummock_state_store_metrics(MetricLevel::Disabled)
629    }
630}
631
632pub trait MemoryCollector: Sync + Send {
633    fn get_meta_memory_usage(&self) -> u64;
634    fn get_data_memory_usage(&self) -> u64;
635    fn get_vector_meta_memory_usage(&self) -> u64;
636    fn get_vector_data_memory_usage(&self) -> u64;
637    fn get_uploading_memory_usage(&self) -> u64;
638    fn get_prefetch_memory_usage(&self) -> usize;
639    fn get_meta_cache_memory_usage_ratio(&self) -> f64;
640    fn get_block_cache_memory_usage_ratio(&self) -> f64;
641    fn get_vector_meta_cache_memory_usage_ratio(&self) -> f64;
642    fn get_vector_data_cache_memory_usage_ratio(&self) -> f64;
643    fn get_shared_buffer_usage_ratio(&self) -> f64;
644}
645
646#[derive(Clone)]
647struct StateStoreCollector {
648    memory_collector: Arc<dyn MemoryCollector>,
649    collectors: Vec<Arc<dyn Collector>>,
650    block_cache_size: IntGauge,
651    meta_cache_size: IntGauge,
652    vector_data_cache_size: IntGauge,
653    vector_meta_cache_size: IntGauge,
654    uploading_memory_size: IntGauge,
655    prefetch_memory_size: IntGauge,
656    meta_cache_usage_ratio: Gauge,
657    block_cache_usage_ratio: Gauge,
658    vector_data_cache_usage_ratio: Gauge,
659    vector_meta_cache_usage_ratio: Gauge,
660    uploading_memory_usage_ratio: Gauge,
661}
662
663impl StateStoreCollector {
664    pub fn new(memory_collector: Arc<dyn MemoryCollector>) -> Self {
665        let mut collectors = Vec::new();
666
667        let block_cache_size = IntGauge::with_opts(Opts::new(
668            "state_store_block_cache_size",
669            "the size of cache for data block cache",
670        ))
671        .unwrap();
672        collectors.push(Arc::new(block_cache_size.clone()) as _);
673
674        let block_cache_usage_ratio = Gauge::with_opts(Opts::new(
675            "state_store_block_cache_usage_ratio",
676            "the ratio of block cache to it's pre-allocated memory",
677        ))
678        .unwrap();
679        collectors.push(Arc::new(block_cache_usage_ratio.clone()) as _);
680
681        let meta_cache_size = IntGauge::with_opts(Opts::new(
682            "state_store_meta_cache_size",
683            "the size of cache for meta file cache",
684        ))
685        .unwrap();
686        collectors.push(Arc::new(meta_cache_size.clone()) as _);
687
688        let meta_cache_usage_ratio = Gauge::with_opts(Opts::new(
689            "state_store_meta_cache_usage_ratio",
690            "the ratio of meta cache to it's pre-allocated memory",
691        ))
692        .unwrap();
693        collectors.push(Arc::new(meta_cache_usage_ratio.clone()) as _);
694
695        let vector_data_cache_size = IntGauge::with_opts(Opts::new(
696            "state_store_vector_data_cache_size",
697            "the size of cache for vector data file cache",
698        ))
699        .unwrap();
700        collectors.push(Arc::new(vector_data_cache_size.clone()) as _);
701
702        let vector_data_cache_usage_ratio = Gauge::with_opts(Opts::new(
703            "state_store_vector_data_cache_usage_ratio",
704            "the ratio of vector data cache to it's pre-allocated memory",
705        ))
706        .unwrap();
707        collectors.push(Arc::new(vector_data_cache_usage_ratio.clone()) as _);
708
709        let vector_meta_cache_size = IntGauge::with_opts(Opts::new(
710            "state_store_vector_meta_cache_size",
711            "the size of cache for vector meta file cache",
712        ))
713        .unwrap();
714        collectors.push(Arc::new(vector_meta_cache_size.clone()) as _);
715
716        let vector_meta_cache_usage_ratio = Gauge::with_opts(Opts::new(
717            "state_store_vector_meta_cache_usage_ratio",
718            "the ratio of vector meta cache to it's pre-allocated memory",
719        ))
720        .unwrap();
721        collectors.push(Arc::new(vector_meta_cache_usage_ratio.clone()) as _);
722
723        let uploading_memory_size = IntGauge::with_opts(Opts::new(
724            "uploading_memory_size",
725            "the size of uploading SSTs memory usage",
726        ))
727        .unwrap();
728        collectors.push(Arc::new(uploading_memory_size.clone()) as _);
729
730        let uploading_memory_usage_ratio = Gauge::with_opts(Opts::new(
731            "state_store_uploading_memory_usage_ratio",
732            "the ratio of uploading SSTs memory usage to it's pre-allocated memory",
733        ))
734        .unwrap();
735        collectors.push(Arc::new(uploading_memory_usage_ratio.clone()) as _);
736
737        let prefetch_memory_size = IntGauge::with_opts(Opts::new(
738            "state_store_prefetch_memory_size",
739            "the size of prefetch memory usage",
740        ))
741        .unwrap();
742        collectors.push(Arc::new(prefetch_memory_size.clone()) as _);
743
744        Self {
745            memory_collector,
746            collectors,
747            block_cache_size,
748            meta_cache_size,
749            vector_data_cache_size,
750            vector_meta_cache_size,
751            uploading_memory_size,
752            prefetch_memory_size,
753            meta_cache_usage_ratio,
754            block_cache_usage_ratio,
755
756            vector_data_cache_usage_ratio,
757            vector_meta_cache_usage_ratio,
758            uploading_memory_usage_ratio,
759        }
760    }
761}
762
763impl Collector for StateStoreCollector {
764    fn desc(&self) -> Vec<&Desc> {
765        self.collectors.iter().flat_map(|c| c.desc()).collect()
766    }
767
768    fn collect(&self) -> Vec<proto::MetricFamily> {
769        self.block_cache_size
770            .set(self.memory_collector.get_data_memory_usage() as i64);
771        self.meta_cache_size
772            .set(self.memory_collector.get_meta_memory_usage() as i64);
773        self.vector_data_cache_size
774            .set(self.memory_collector.get_vector_data_memory_usage() as _);
775        self.vector_meta_cache_size
776            .set(self.memory_collector.get_vector_meta_memory_usage() as _);
777        self.uploading_memory_size
778            .set(self.memory_collector.get_uploading_memory_usage() as i64);
779        self.prefetch_memory_size
780            .set(self.memory_collector.get_prefetch_memory_usage() as i64);
781        self.meta_cache_usage_ratio
782            .set(self.memory_collector.get_meta_cache_memory_usage_ratio());
783        self.block_cache_usage_ratio
784            .set(self.memory_collector.get_block_cache_memory_usage_ratio());
785        self.vector_meta_cache_usage_ratio.set(
786            self.memory_collector
787                .get_vector_meta_cache_memory_usage_ratio(),
788        );
789        self.vector_data_cache_usage_ratio.set(
790            self.memory_collector
791                .get_vector_data_cache_memory_usage_ratio(),
792        );
793        self.uploading_memory_usage_ratio
794            .set(self.memory_collector.get_shared_buffer_usage_ratio());
795        // collect MetricFamilies.
796        self.collectors.iter().flat_map(|c| c.collect()).collect()
797    }
798}
799
800pub fn monitor_cache(memory_collector: Arc<dyn MemoryCollector>) {
801    let collector = Box::new(StateStoreCollector::new(memory_collector));
802    if let Err(e) = GLOBAL_METRICS_REGISTRY.register(collector) {
803        warn!(
804            "unable to monitor cache. May have been registered if in all-in-one deployment: {}",
805            e.as_report()
806        );
807    }
808}