risingwave_common_metrics/
lib.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(trait_alias)]
18
19use std::ops::Deref;
20use std::sync::LazyLock;
21
22use hytra::TrAdder;
23use prometheus::core::{Atomic, AtomicU64, GenericCounter, GenericGauge};
24use prometheus::proto::Metric;
25use prometheus::register_int_counter_with_registry;
26use tracing::Subscriber;
27use tracing_subscriber::Layer;
28use tracing_subscriber::layer::Context;
29use tracing_subscriber::registry::LookupSpan;
30
31mod error_metrics;
32mod gauge_ext;
33mod guarded_metrics;
34mod metrics;
35pub mod monitor;
36mod relabeled_metric;
37
38pub use error_metrics::*;
39pub use gauge_ext::*;
40pub use guarded_metrics::*;
41pub use metrics::*;
42pub use relabeled_metric::*;
43
44#[derive(Debug)]
45pub struct TrAdderAtomic(TrAdder<i64>);
46
47impl Atomic for TrAdderAtomic {
48    type T = i64;
49
50    fn new(val: i64) -> Self {
51        let v = TrAdderAtomic(TrAdder::new());
52        v.0.inc(val);
53        v
54    }
55
56    fn set(&self, _val: i64) {
57        panic!("TrAdderAtomic doesn't support set operation.")
58    }
59
60    fn get(&self) -> i64 {
61        self.0.get()
62    }
63
64    fn inc_by(&self, delta: i64) {
65        self.0.inc(delta)
66    }
67
68    fn dec_by(&self, delta: i64) {
69        self.0.inc(-delta)
70    }
71}
72
73pub type TrAdderGauge = GenericGauge<TrAdderAtomic>;
74
75/// [`MetricsLayer`] is a struct used for monitoring the frequency of certain specific logs and
76/// counting them using Prometheus metrics. Currently, it is used to monitor the frequency of retry
77/// occurrences of aws sdk.
78pub struct MetricsLayer {
79    pub aws_sdk_retry_counts: GenericCounter<AtomicU64>,
80}
81
82impl<S> Layer<S> for MetricsLayer
83where
84    S: Subscriber + for<'a> LookupSpan<'a>,
85{
86    fn on_event(&self, _event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
87        // Currently one retry will only generate one debug log,
88        // so we can monitor the number of retry only through the metadata target.
89        // Refer to <https://docs.rs/aws-smithy-client/0.55.3/src/aws_smithy_client/retry.rs.html>
90        self.aws_sdk_retry_counts.inc();
91    }
92}
93
94impl MetricsLayer {
95    #[allow(clippy::new_without_default)]
96    pub fn new() -> Self {
97        static AWS_SDK_RETRY_COUNTS: LazyLock<GenericCounter<AtomicU64>> = LazyLock::new(|| {
98            let registry = crate::monitor::GLOBAL_METRICS_REGISTRY.deref();
99            register_int_counter_with_registry!(
100                "aws_sdk_retry_counts",
101                "Total number of aws sdk retry happens",
102                registry
103            )
104            .unwrap()
105        });
106
107        Self {
108            aws_sdk_retry_counts: AWS_SDK_RETRY_COUNTS.deref().clone(),
109        }
110    }
111}
112
113#[derive(Debug, Default, Clone, Copy, serde::Serialize, serde::Deserialize)]
114pub enum MetricLevel {
115    #[default]
116    Disabled = 0,
117    Critical = 1,
118    Info = 2,
119    Debug = 3,
120}
121
122impl clap::ValueEnum for MetricLevel {
123    fn value_variants<'a>() -> &'a [Self] {
124        &[Self::Disabled, Self::Critical, Self::Info, Self::Debug]
125    }
126
127    fn to_possible_value<'a>(&self) -> ::std::option::Option<clap::builder::PossibleValue> {
128        match self {
129            Self::Disabled => Some(clap::builder::PossibleValue::new("disabled").alias("0")),
130            Self::Critical => Some(clap::builder::PossibleValue::new("critical")),
131            Self::Info => Some(clap::builder::PossibleValue::new("info").alias("1")),
132            Self::Debug => Some(clap::builder::PossibleValue::new("debug")),
133        }
134    }
135}
136
137impl PartialEq<Self> for MetricLevel {
138    fn eq(&self, other: &Self) -> bool {
139        (*self as u8).eq(&(*other as u8))
140    }
141}
142
143impl PartialOrd for MetricLevel {
144    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
145        (*self as u8).partial_cmp(&(*other as u8))
146    }
147}
148
149pub fn get_label<T: std::str::FromStr>(metric: &Metric, label: &str) -> Option<T> {
150    metric
151        .get_label()
152        .iter()
153        .find(|lp| lp.name() == label)
154        .and_then(|lp| lp.value().parse::<T>().ok())
155}
156
157// Must ensure the label exists and can be parsed into `T`
158pub fn get_label_infallible<T: std::str::FromStr>(metric: &Metric, label: &str) -> T {
159    get_label(metric, label).expect("label not found or can't be parsed")
160}