risingwave_common_metrics/
guarded_metrics.rs1use std::any::type_name;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::ops::Deref;
19use std::sync::Arc;
20
21use parking_lot::Mutex;
22use prometheus::core::{
23 Atomic, AtomicF64, AtomicI64, AtomicU64, Collector, Desc, GenericCounter, GenericLocalCounter,
24 MetricVec, MetricVecBuilder,
25};
26use prometheus::local::{LocalHistogram, LocalIntCounter};
27use prometheus::proto::MetricFamily;
28use prometheus::{Gauge, Histogram, IntCounter, IntGauge};
29use thiserror_ext::AsReport;
30use tracing::warn;
31
32#[macro_export]
33macro_rules! register_guarded_histogram_vec_with_registry {
34 ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
35 $crate::register_guarded_histogram_vec_with_registry! {
36 {prometheus::histogram_opts!($NAME, $HELP)},
37 $LABELS_NAMES,
38 $REGISTRY
39 }
40 }};
41 ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $BUCKETS:expr, $REGISTRY:expr $(,)?) => {{
42 $crate::register_guarded_histogram_vec_with_registry! {
43 {prometheus::histogram_opts!($NAME, $HELP, $BUCKETS)},
44 $LABELS_NAMES,
45 $REGISTRY
46 }
47 }};
48 ($HOPTS:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
49 let inner = prometheus::HistogramVec::new($HOPTS, $LABELS_NAMES);
50 inner.and_then(|inner| {
51 let inner = $crate::__extract_histogram_builder(inner);
52 let label_guarded = $crate::LabelGuardedHistogramVec::new(inner, { $LABELS_NAMES });
53 let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
54 result.map(move |()| label_guarded)
55 })
56 }};
57}
58
59#[macro_export]
60macro_rules! register_guarded_gauge_vec_with_registry {
61 ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
62 let inner = prometheus::GaugeVec::new(prometheus::opts!($NAME, $HELP), $LABELS_NAMES);
63 inner.and_then(|inner| {
64 let inner = $crate::__extract_gauge_builder(inner);
65 let label_guarded = $crate::LabelGuardedGaugeVec::new(inner, { $LABELS_NAMES });
66 let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
67 result.map(move |()| label_guarded)
68 })
69 }};
70}
71
72#[macro_export]
73macro_rules! register_guarded_int_gauge_vec_with_registry {
74 ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
75 let inner = prometheus::IntGaugeVec::new(prometheus::opts!($NAME, $HELP), $LABELS_NAMES);
76 inner.and_then(|inner| {
77 let inner = $crate::__extract_gauge_builder(inner);
78 let label_guarded = $crate::LabelGuardedIntGaugeVec::new(inner, { $LABELS_NAMES });
79 let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
80 result.map(move |()| label_guarded)
81 })
82 }};
83}
84
85#[macro_export]
86macro_rules! register_guarded_uint_gauge_vec_with_registry {
87 ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
88 let inner = prometheus::core::GenericGaugeVec::<prometheus::core::AtomicU64>::new(
89 prometheus::opts!($NAME, $HELP),
90 $LABELS_NAMES,
91 );
92 inner.and_then(|inner| {
93 let inner = $crate::__extract_gauge_builder(inner);
94 let label_guarded = $crate::LabelGuardedUintGaugeVec::new(inner, { $LABELS_NAMES });
95 let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
96 result.map(move |()| label_guarded)
97 })
98 }};
99}
100
101#[macro_export]
102macro_rules! register_guarded_int_counter_vec_with_registry {
103 ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
104 let inner = prometheus::IntCounterVec::new(prometheus::opts!($NAME, $HELP), $LABELS_NAMES);
105 inner.and_then(|inner| {
106 let inner = $crate::__extract_counter_builder(inner);
107 let label_guarded = $crate::LabelGuardedIntCounterVec::new(inner, { $LABELS_NAMES });
108 let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
109 result.map(move |()| label_guarded)
110 })
111 }};
112}
113
114mod tait {
116 use prometheus::core::{
117 Atomic, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec, MetricVec,
118 MetricVecBuilder,
119 };
120 use prometheus::{Histogram, HistogramVec};
121
122 pub type VecBuilderOfCounter<P: Atomic> = impl MetricVecBuilder<M = GenericCounter<P>>;
123 pub type VecBuilderOfGauge<P: Atomic> = impl MetricVecBuilder<M = GenericGauge<P>>;
124 pub type VecBuilderOfHistogram = impl MetricVecBuilder<M = Histogram>;
125
126 #[define_opaque(VecBuilderOfCounter)]
127 pub fn __extract_counter_builder<P: Atomic>(
128 vec: GenericCounterVec<P>,
129 ) -> MetricVec<VecBuilderOfCounter<P>> {
130 vec
131 }
132
133 #[define_opaque(VecBuilderOfGauge)]
134 pub fn __extract_gauge_builder<P: Atomic>(
135 vec: GenericGaugeVec<P>,
136 ) -> MetricVec<VecBuilderOfGauge<P>> {
137 vec
138 }
139
140 #[define_opaque(VecBuilderOfHistogram)]
141 pub fn __extract_histogram_builder(vec: HistogramVec) -> MetricVec<VecBuilderOfHistogram> {
142 vec
143 }
144}
145pub use tait::*;
146
147use crate::UintGauge;
148
149pub type LabelGuardedHistogramVec = LabelGuardedMetricVec<VecBuilderOfHistogram>;
150pub type LabelGuardedIntCounterVec = LabelGuardedMetricVec<VecBuilderOfCounter<AtomicU64>>;
151pub type LabelGuardedIntGaugeVec = LabelGuardedMetricVec<VecBuilderOfGauge<AtomicI64>>;
152pub type LabelGuardedUintGaugeVec = LabelGuardedMetricVec<VecBuilderOfGauge<AtomicU64>>;
153pub type LabelGuardedGaugeVec = LabelGuardedMetricVec<VecBuilderOfGauge<AtomicF64>>;
154
155pub type LabelGuardedHistogram = LabelGuardedMetric<Histogram>;
156pub type LabelGuardedIntCounter = LabelGuardedMetric<IntCounter>;
157pub type LabelGuardedIntGauge = LabelGuardedMetric<IntGauge>;
158pub type LabelGuardedUintGauge = LabelGuardedMetric<UintGauge>;
159pub type LabelGuardedGauge = LabelGuardedMetric<Gauge>;
160
161pub type LabelGuardedLocalHistogram = LabelGuardedMetric<LocalHistogram>;
162pub type LabelGuardedLocalIntCounter = LabelGuardedMetric<LocalIntCounter>;
163
164fn gen_test_label<const N: usize>() -> [&'static str; N] {
165 const TEST_LABELS: [&str; 5] = ["test1", "test2", "test3", "test4", "test5"];
166 (0..N)
167 .map(|i| TEST_LABELS[i])
168 .collect::<Vec<_>>()
169 .try_into()
170 .unwrap()
171}
172
173#[derive(Default)]
174struct LabelGuardedMetricsInfo {
175 labeled_metrics_count: HashMap<Box<[String]>, usize>,
176 uncollected_removed_labels: HashSet<Box<[String]>>,
177}
178
179impl LabelGuardedMetricsInfo {
180 fn register_new_label<V: AsRef<str>>(mutex: &Arc<Mutex<Self>>, labels: &[V]) -> LabelGuard {
181 let mut guard = mutex.lock();
182 let label_string = labels
183 .iter()
184 .map(|label| label.as_ref().to_owned())
185 .collect::<Vec<_>>()
186 .into_boxed_slice();
187 guard.uncollected_removed_labels.remove(&label_string);
188 *guard
189 .labeled_metrics_count
190 .entry(label_string.clone())
191 .or_insert(0) += 1;
192 LabelGuard {
193 labels: label_string,
194 info: mutex.clone(),
195 }
196 }
197}
198
199#[derive(Clone)]
216pub struct LabelGuardedMetricVec<T: MetricVecBuilder> {
217 inner: MetricVec<T>,
218 info: Arc<Mutex<LabelGuardedMetricsInfo>>,
219 labels: Box<[&'static str]>,
220}
221
222impl<T: MetricVecBuilder> Debug for LabelGuardedMetricVec<T> {
223 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
224 f.debug_struct(format!("LabelGuardedMetricVec<{}>", type_name::<T>()).as_str())
225 .field("label", &self.labels)
226 .finish()
227 }
228}
229
230impl<T: MetricVecBuilder> Collector for LabelGuardedMetricVec<T> {
231 fn desc(&self) -> Vec<&Desc> {
232 self.inner.desc()
233 }
234
235 fn collect(&self) -> Vec<MetricFamily> {
236 let mut guard = self.info.lock();
237 let ret = self.inner.collect();
238 for labels in guard.uncollected_removed_labels.drain() {
239 if let Err(e) = self.inner.remove_label_values(&labels) {
240 warn!(
241 error = %e.as_report(),
242 "err when delete metrics of {:?} of labels {:?}",
243 self.inner.desc().first().expect("should have desc").fq_name,
244 self.labels,
245 );
246 }
247 }
248 ret
249 }
250}
251
252impl<T: MetricVecBuilder> LabelGuardedMetricVec<T> {
253 pub fn new(inner: MetricVec<T>, labels: &[&'static str]) -> Self {
254 Self {
255 inner,
256 info: Default::default(),
257 labels: labels.to_vec().into_boxed_slice(),
258 }
259 }
260
261 pub fn with_guarded_label_values<V: AsRef<str> + std::fmt::Debug>(
272 &self,
273 labels: &[V],
274 ) -> LabelGuardedMetric<T::M> {
275 let guard = LabelGuardedMetricsInfo::register_new_label(&self.info, labels);
276 let inner = self.inner.with_label_values(labels);
277 LabelGuardedMetric {
278 inner,
279 _guard: Arc::new(guard),
280 }
281 }
282
283 pub fn with_test_label<const N: usize>(&self) -> LabelGuardedMetric<T::M> {
284 let labels = gen_test_label::<N>();
285 self.with_guarded_label_values(&labels)
286 }
287}
288
289impl LabelGuardedIntCounterVec {
290 pub fn test_int_counter_vec<const N: usize>() -> Self {
291 let registry = prometheus::Registry::new();
292 let labels = gen_test_label::<N>();
293 register_guarded_int_counter_vec_with_registry!("test", "test", &labels, ®istry).unwrap()
294 }
295}
296
297impl LabelGuardedIntGaugeVec {
298 pub fn test_int_gauge_vec<const N: usize>() -> Self {
299 let registry = prometheus::Registry::new();
300 let labels = gen_test_label::<N>();
301 register_guarded_int_gauge_vec_with_registry!("test", "test", &labels, ®istry).unwrap()
302 }
303}
304
305impl LabelGuardedGaugeVec {
306 pub fn test_gauge_vec<const N: usize>() -> Self {
307 let registry = prometheus::Registry::new();
308 let labels = gen_test_label::<N>();
309 register_guarded_gauge_vec_with_registry!("test", "test", &labels, ®istry).unwrap()
310 }
311}
312
313impl LabelGuardedHistogramVec {
314 pub fn test_histogram_vec<const N: usize>() -> Self {
315 let registry = prometheus::Registry::new();
316 let labels = gen_test_label::<N>();
317 register_guarded_histogram_vec_with_registry!("test", "test", &labels, ®istry).unwrap()
318 }
319}
320
321#[derive(Clone)]
322struct LabelGuard {
323 labels: Box<[String]>,
324 info: Arc<Mutex<LabelGuardedMetricsInfo>>,
325}
326
327impl Drop for LabelGuard {
328 fn drop(&mut self) {
329 let mut guard = self.info.lock();
330 let count = guard.labeled_metrics_count.get_mut(&self.labels).expect(
331 "should exist because the current existing dropping one means the count is not zero",
332 );
333 *count -= 1;
334 if *count == 0 {
335 guard
336 .labeled_metrics_count
337 .remove(&self.labels)
338 .expect("should exist");
339 guard.uncollected_removed_labels.insert(self.labels.clone());
340 }
341 }
342}
343
344#[derive(Clone)]
345pub struct LabelGuardedMetric<T> {
346 inner: T,
347 _guard: Arc<LabelGuard>,
348}
349
350impl<T> Debug for LabelGuardedMetric<T> {
351 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
352 f.debug_struct("LabelGuardedMetric").finish()
353 }
354}
355
356impl<T> Deref for LabelGuardedMetric<T> {
357 type Target = T;
358
359 fn deref(&self) -> &Self::Target {
360 &self.inner
361 }
362}
363
364impl LabelGuardedHistogram {
365 pub fn test_histogram<const N: usize>() -> Self {
366 LabelGuardedHistogramVec::test_histogram_vec::<N>().with_test_label::<N>()
367 }
368}
369
370impl LabelGuardedIntCounter {
371 pub fn test_int_counter<const N: usize>() -> Self {
372 LabelGuardedIntCounterVec::test_int_counter_vec::<N>().with_test_label::<N>()
373 }
374}
375
376impl LabelGuardedIntGauge {
377 pub fn test_int_gauge<const N: usize>() -> Self {
378 LabelGuardedIntGaugeVec::test_int_gauge_vec::<N>().with_test_label::<N>()
379 }
380}
381
382impl LabelGuardedGauge {
383 pub fn test_gauge<const N: usize>() -> Self {
384 LabelGuardedGaugeVec::test_gauge_vec::<N>().with_test_label::<N>()
385 }
386}
387
388pub trait MetricWithLocal {
389 type Local;
390 fn local(&self) -> Self::Local;
391}
392
393impl MetricWithLocal for Histogram {
394 type Local = LocalHistogram;
395
396 fn local(&self) -> Self::Local {
397 self.local()
398 }
399}
400
401impl<P: Atomic> MetricWithLocal for GenericCounter<P> {
402 type Local = GenericLocalCounter<P>;
403
404 fn local(&self) -> Self::Local {
405 self.local()
406 }
407}
408
409impl<T: MetricWithLocal> LabelGuardedMetric<T> {
410 pub fn local(&self) -> LabelGuardedMetric<T::Local> {
411 LabelGuardedMetric {
412 inner: self.inner.local(),
413 _guard: self._guard.clone(),
414 }
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use prometheus::core::Collector;
421
422 use crate::LabelGuardedIntCounterVec;
423
424 #[test]
425 fn test_label_guarded_metrics_drop() {
426 let vec = LabelGuardedIntCounterVec::test_int_counter_vec::<3>();
427 let m1_1 = vec.with_guarded_label_values(&["1", "2", "3"]);
428 assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
429 let m1_2 = vec.with_guarded_label_values(&["1", "2", "3"]);
430 let m1_3 = m1_2.clone();
431 assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
432 let m2 = vec.with_guarded_label_values(&["2", "2", "3"]);
433 assert_eq!(2, vec.collect().pop().unwrap().get_metric().len());
434 drop(m1_3);
435 assert_eq!(2, vec.collect().pop().unwrap().get_metric().len());
436 assert_eq!(2, vec.collect().pop().unwrap().get_metric().len());
437 drop(m2);
438 assert_eq!(2, vec.collect().pop().unwrap().get_metric().len());
439 assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
440 drop(m1_1);
441 assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
442 assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
443 drop(m1_2);
444 assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
445 assert_eq!(0, vec.collect().pop().unwrap().get_metric().len());
446 }
447}