risingwave_object_store/object/
object_metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec};
18use prometheus::{
19    HistogramVec, Registry, exponential_buckets, histogram_opts,
20    register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
21    register_int_counter_with_registry,
22};
23use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
24
25pub static GLOBAL_OBJECT_STORE_METRICS: LazyLock<ObjectStoreMetrics> =
26    LazyLock::new(|| ObjectStoreMetrics::new(&GLOBAL_METRICS_REGISTRY));
27
28#[derive(Clone)]
29pub struct ObjectStoreMetrics {
30    pub write_bytes: GenericCounter<AtomicU64>,
31    pub read_bytes: GenericCounter<AtomicU64>,
32    pub operation_latency: HistogramVec,
33    pub operation_size: HistogramVec,
34    pub failure_count: GenericCounterVec<AtomicU64>,
35    pub request_retry_count: GenericCounterVec<AtomicU64>,
36}
37
38impl ObjectStoreMetrics {
39    fn new(registry: &Registry) -> Self {
40        let read_bytes = register_int_counter_with_registry!(
41            "object_store_read_bytes",
42            "Total bytes of requests read from object store",
43            registry
44        )
45        .unwrap();
46        let write_bytes = register_int_counter_with_registry!(
47            "object_store_write_bytes",
48            "Total bytes of requests read from object store",
49            registry
50        )
51        .unwrap();
52
53        let latency_opts = histogram_opts!(
54            "object_store_operation_latency",
55            "Total latency of operation on object store",
56            exponential_buckets(0.001, 2.0, 22).unwrap(), // max 209s
57        );
58        let operation_latency =
59            register_histogram_vec_with_registry!(latency_opts, &["media_type", "type"], registry)
60                .unwrap();
61        let mut buckets = vec![];
62        for i in 0..4 {
63            buckets.push((4096 << (i * 2)) as f64);
64        }
65        for i in 0..4 {
66            buckets.push((4096 << (i + 10)) as f64);
67        }
68        let mut step = *buckets.last().unwrap(); // 32MB
69        for _ in 0..4 {
70            let base = *buckets.last().unwrap() + step;
71            for i in 0..4 {
72                buckets.push(base + step * i as f64);
73            }
74            step *= 2.0;
75        }
76        let bytes_opts = histogram_opts!(
77            "object_store_operation_bytes",
78            "Size of operation result on object store",
79            buckets, // max 1952MB
80        );
81        let operation_size =
82            register_histogram_vec_with_registry!(bytes_opts, &["type"], registry).unwrap();
83
84        let failure_count = register_int_counter_vec_with_registry!(
85            "object_store_failure_count",
86            "The number of failures of object store operations",
87            &["type"],
88            registry
89        )
90        .unwrap();
91
92        let request_retry_count = register_int_counter_vec_with_registry!(
93            "object_store_request_retry_count",
94            "The number of retry times of object store request",
95            &["type"],
96            registry
97        )
98        .unwrap();
99
100        Self {
101            write_bytes,
102            read_bytes,
103            operation_latency,
104            operation_size,
105            failure_count,
106            request_retry_count,
107        }
108    }
109
110    /// Creates a new `HummockStateStoreMetrics` instance used in tests or other places.
111    pub fn unused() -> Self {
112        GLOBAL_OBJECT_STORE_METRICS.clone()
113    }
114}