risingwave_common_metrics/
lib.rs1#![feature(type_alias_impl_trait)]
16#![feature(impl_trait_in_assoc_type)]
17use std::ops::Deref;
18use std::sync::LazyLock;
19
20use hytra::TrAdder;
21use prometheus::core::{Atomic, AtomicU64, GenericCounter, GenericGauge};
22use prometheus::proto::Metric;
23use prometheus::register_int_counter_with_registry;
24use tracing::Subscriber;
25use tracing_subscriber::Layer;
26use tracing_subscriber::layer::Context;
27use tracing_subscriber::registry::LookupSpan;
28
29mod error_metrics;
30mod gauge_ext;
31mod guarded_metrics;
32mod metrics;
33pub mod monitor;
34mod relabeled_metric;
35
36pub use error_metrics::*;
37pub use gauge_ext::*;
38pub use guarded_metrics::*;
39pub use metrics::*;
40pub use relabeled_metric::*;
41
42#[derive(Debug)]
43pub struct TrAdderAtomic(TrAdder<i64>);
44
45impl Atomic for TrAdderAtomic {
46 type T = i64;
47
48 fn new(val: i64) -> Self {
49 let v = TrAdderAtomic(TrAdder::new());
50 v.0.inc(val);
51 v
52 }
53
54 fn set(&self, _val: i64) {
55 panic!("TrAdderAtomic doesn't support set operation.")
56 }
57
58 fn get(&self) -> i64 {
59 self.0.get()
60 }
61
62 fn inc_by(&self, delta: i64) {
63 self.0.inc(delta)
64 }
65
66 fn dec_by(&self, delta: i64) {
67 self.0.inc(-delta)
68 }
69}
70
71pub type TrAdderGauge = GenericGauge<TrAdderAtomic>;
72
73pub struct MetricsLayer {
77 pub aws_sdk_retry_counts: GenericCounter<AtomicU64>,
78}
79
80impl<S> Layer<S> for MetricsLayer
81where
82 S: Subscriber + for<'a> LookupSpan<'a>,
83{
84 fn on_event(&self, _event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
85 self.aws_sdk_retry_counts.inc();
89 }
90}
91
92impl MetricsLayer {
93 #[allow(clippy::new_without_default)]
94 pub fn new() -> Self {
95 static AWS_SDK_RETRY_COUNTS: LazyLock<GenericCounter<AtomicU64>> = LazyLock::new(|| {
96 let registry = crate::monitor::GLOBAL_METRICS_REGISTRY.deref();
97 register_int_counter_with_registry!(
98 "aws_sdk_retry_counts",
99 "Total number of aws sdk retry happens",
100 registry
101 )
102 .unwrap()
103 });
104
105 Self {
106 aws_sdk_retry_counts: AWS_SDK_RETRY_COUNTS.deref().clone(),
107 }
108 }
109}
110
111#[derive(Debug, Default, Clone, Copy, serde::Serialize, serde::Deserialize)]
112pub enum MetricLevel {
113 #[default]
114 Disabled = 0,
115 Critical = 1,
116 Info = 2,
117 Debug = 3,
118}
119
120impl clap::ValueEnum for MetricLevel {
121 fn value_variants<'a>() -> &'a [Self] {
122 &[Self::Disabled, Self::Critical, Self::Info, Self::Debug]
123 }
124
125 fn to_possible_value<'a>(&self) -> ::std::option::Option<clap::builder::PossibleValue> {
126 match self {
127 Self::Disabled => Some(clap::builder::PossibleValue::new("disabled").alias("0")),
128 Self::Critical => Some(clap::builder::PossibleValue::new("critical")),
129 Self::Info => Some(clap::builder::PossibleValue::new("info").alias("1")),
130 Self::Debug => Some(clap::builder::PossibleValue::new("debug")),
131 }
132 }
133}
134
135impl PartialEq<Self> for MetricLevel {
136 fn eq(&self, other: &Self) -> bool {
137 (*self as u8).eq(&(*other as u8))
138 }
139}
140
141impl PartialOrd for MetricLevel {
142 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
143 (*self as u8).partial_cmp(&(*other as u8))
144 }
145}
146
147pub fn get_label<T: std::str::FromStr>(metric: &Metric, label: &str) -> Option<T> {
148 metric
149 .get_label()
150 .iter()
151 .find(|lp| lp.get_name() == label)
152 .and_then(|lp| lp.get_value().parse::<T>().ok())
153}
154
155pub fn get_label_infallible<T: std::str::FromStr>(metric: &Metric, label: &str) -> T {
157 get_label(metric, label).expect("label not found or can't be parsed")
158}