risingwave_object_store/object/
object_metrics.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::LazyLock;

use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec};
use prometheus::{
    exponential_buckets, histogram_opts, register_histogram_vec_with_registry,
    register_int_counter_vec_with_registry, register_int_counter_with_registry, HistogramVec,
    Registry,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;

pub static GLOBAL_OBJECT_STORE_METRICS: LazyLock<ObjectStoreMetrics> =
    LazyLock::new(|| ObjectStoreMetrics::new(&GLOBAL_METRICS_REGISTRY));

#[derive(Clone)]
pub struct ObjectStoreMetrics {
    pub write_bytes: GenericCounter<AtomicU64>,
    pub read_bytes: GenericCounter<AtomicU64>,
    pub operation_latency: HistogramVec,
    pub operation_size: HistogramVec,
    pub failure_count: GenericCounterVec<AtomicU64>,
    pub request_retry_count: GenericCounterVec<AtomicU64>,
}

impl ObjectStoreMetrics {
    fn new(registry: &Registry) -> Self {
        let read_bytes = register_int_counter_with_registry!(
            "object_store_read_bytes",
            "Total bytes of requests read from object store",
            registry
        )
        .unwrap();
        let write_bytes = register_int_counter_with_registry!(
            "object_store_write_bytes",
            "Total bytes of requests read from object store",
            registry
        )
        .unwrap();

        let latency_opts = histogram_opts!(
            "object_store_operation_latency",
            "Total latency of operation on object store",
            exponential_buckets(0.001, 2.0, 22).unwrap(), // max 209s
        );
        let operation_latency =
            register_histogram_vec_with_registry!(latency_opts, &["media_type", "type"], registry)
                .unwrap();
        let mut buckets = vec![];
        for i in 0..4 {
            buckets.push((4096 << (i * 2)) as f64);
        }
        for i in 0..4 {
            buckets.push((4096 << (i + 10)) as f64);
        }
        let mut step = *buckets.last().unwrap(); // 32MB
        for _ in 0..4 {
            let base = *buckets.last().unwrap() + step;
            for i in 0..4 {
                buckets.push(base + step * i as f64);
            }
            step *= 2.0;
        }
        let bytes_opts = histogram_opts!(
            "object_store_operation_bytes",
            "Size of operation result on object store",
            buckets, // max 1952MB
        );
        let operation_size =
            register_histogram_vec_with_registry!(bytes_opts, &["type"], registry).unwrap();

        let failure_count = register_int_counter_vec_with_registry!(
            "object_store_failure_count",
            "The number of failures of object store operations",
            &["type"],
            registry
        )
        .unwrap();

        let request_retry_count = register_int_counter_vec_with_registry!(
            "object_store_request_retry_count",
            "The number of retry times of object store request",
            &["type"],
            registry
        )
        .unwrap();

        Self {
            write_bytes,
            read_bytes,
            operation_latency,
            operation_size,
            failure_count,
            request_retry_count,
        }
    }

    /// Creates a new `HummockStateStoreMetrics` instance used in tests or other places.
    pub fn unused() -> Self {
        GLOBAL_OBJECT_STORE_METRICS.clone()
    }
}