risingwave_common_metrics/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17use std::ops::Deref;
18use std::sync::LazyLock;
19
20use hytra::TrAdder;
21use prometheus::core::{Atomic, AtomicU64, GenericCounter, GenericGauge};
22use prometheus::proto::Metric;
23use prometheus::register_int_counter_with_registry;
24use tracing::Subscriber;
25use tracing_subscriber::Layer;
26use tracing_subscriber::layer::Context;
27use tracing_subscriber::registry::LookupSpan;
28
29mod error_metrics;
30mod gauge_ext;
31mod guarded_metrics;
32mod metrics;
33pub mod monitor;
34mod relabeled_metric;
35
36pub use error_metrics::*;
37pub use gauge_ext::*;
38pub use guarded_metrics::*;
39pub use metrics::*;
40pub use relabeled_metric::*;
41
42#[derive(Debug)]
43pub struct TrAdderAtomic(TrAdder<i64>);
44
45impl Atomic for TrAdderAtomic {
46    type T = i64;
47
48    fn new(val: i64) -> Self {
49        let v = TrAdderAtomic(TrAdder::new());
50        v.0.inc(val);
51        v
52    }
53
54    fn set(&self, _val: i64) {
55        panic!("TrAdderAtomic doesn't support set operation.")
56    }
57
58    fn get(&self) -> i64 {
59        self.0.get()
60    }
61
62    fn inc_by(&self, delta: i64) {
63        self.0.inc(delta)
64    }
65
66    fn dec_by(&self, delta: i64) {
67        self.0.inc(-delta)
68    }
69}
70
71pub type TrAdderGauge = GenericGauge<TrAdderAtomic>;
72
73/// [`MetricsLayer`] is a struct used for monitoring the frequency of certain specific logs and
74/// counting them using Prometheus metrics. Currently, it is used to monitor the frequency of retry
75/// occurrences of aws sdk.
76pub struct MetricsLayer {
77    pub aws_sdk_retry_counts: GenericCounter<AtomicU64>,
78}
79
80impl<S> Layer<S> for MetricsLayer
81where
82    S: Subscriber + for<'a> LookupSpan<'a>,
83{
84    fn on_event(&self, _event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
85        // Currently one retry will only generate one debug log,
86        // so we can monitor the number of retry only through the metadata target.
87        // Refer to <https://docs.rs/aws-smithy-client/0.55.3/src/aws_smithy_client/retry.rs.html>
88        self.aws_sdk_retry_counts.inc();
89    }
90}
91
92impl MetricsLayer {
93    #[allow(clippy::new_without_default)]
94    pub fn new() -> Self {
95        static AWS_SDK_RETRY_COUNTS: LazyLock<GenericCounter<AtomicU64>> = LazyLock::new(|| {
96            let registry = crate::monitor::GLOBAL_METRICS_REGISTRY.deref();
97            register_int_counter_with_registry!(
98                "aws_sdk_retry_counts",
99                "Total number of aws sdk retry happens",
100                registry
101            )
102            .unwrap()
103        });
104
105        Self {
106            aws_sdk_retry_counts: AWS_SDK_RETRY_COUNTS.deref().clone(),
107        }
108    }
109}
110
111#[derive(Debug, Default, Clone, Copy, serde::Serialize, serde::Deserialize)]
112pub enum MetricLevel {
113    #[default]
114    Disabled = 0,
115    Critical = 1,
116    Info = 2,
117    Debug = 3,
118}
119
120impl clap::ValueEnum for MetricLevel {
121    fn value_variants<'a>() -> &'a [Self] {
122        &[Self::Disabled, Self::Critical, Self::Info, Self::Debug]
123    }
124
125    fn to_possible_value<'a>(&self) -> ::std::option::Option<clap::builder::PossibleValue> {
126        match self {
127            Self::Disabled => Some(clap::builder::PossibleValue::new("disabled").alias("0")),
128            Self::Critical => Some(clap::builder::PossibleValue::new("critical")),
129            Self::Info => Some(clap::builder::PossibleValue::new("info").alias("1")),
130            Self::Debug => Some(clap::builder::PossibleValue::new("debug")),
131        }
132    }
133}
134
135impl PartialEq<Self> for MetricLevel {
136    fn eq(&self, other: &Self) -> bool {
137        (*self as u8).eq(&(*other as u8))
138    }
139}
140
141impl PartialOrd for MetricLevel {
142    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
143        (*self as u8).partial_cmp(&(*other as u8))
144    }
145}
146
147pub fn get_label<T: std::str::FromStr>(metric: &Metric, label: &str) -> Option<T> {
148    metric
149        .get_label()
150        .iter()
151        .find(|lp| lp.get_name() == label)
152        .and_then(|lp| lp.get_value().parse::<T>().ok())
153}
154
155// Must ensure the label exists and can be parsed into `T`
156pub fn get_label_infallible<T: std::str::FromStr>(metric: &Metric, label: &str) -> T {
157    get_label(metric, label).expect("label not found or can't be parsed")
158}