risingwave_common_metrics/
guarded_metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::type_name;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::ops::Deref;
19use std::sync::Arc;
20
21use itertools::Itertools;
22use parking_lot::Mutex;
23use prometheus::core::{
24    Atomic, AtomicF64, AtomicI64, AtomicU64, Collector, Desc, GenericCounter, GenericLocalCounter,
25    MetricVec, MetricVecBuilder,
26};
27use prometheus::local::{LocalHistogram, LocalIntCounter};
28use prometheus::proto::MetricFamily;
29use prometheus::{Gauge, Histogram, IntCounter, IntGauge};
30use thiserror_ext::AsReport;
31use tracing::warn;
32
33#[macro_export]
34macro_rules! register_guarded_histogram_vec_with_registry {
35    ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
36        $crate::register_guarded_histogram_vec_with_registry! {
37            {prometheus::histogram_opts!($NAME, $HELP)},
38            $LABELS_NAMES,
39            $REGISTRY
40        }
41    }};
42    ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $BUCKETS:expr, $REGISTRY:expr $(,)?) => {{
43        $crate::register_guarded_histogram_vec_with_registry! {
44            {prometheus::histogram_opts!($NAME, $HELP, $BUCKETS)},
45            $LABELS_NAMES,
46            $REGISTRY
47        }
48    }};
49    ($HOPTS:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
50        let inner = prometheus::HistogramVec::new($HOPTS, $LABELS_NAMES);
51        inner.and_then(|inner| {
52            let inner = $crate::__extract_histogram_builder(inner);
53            let label_guarded = $crate::LabelGuardedHistogramVec::new(inner, { $LABELS_NAMES });
54            let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
55            result.map(move |()| label_guarded)
56        })
57    }};
58}
59
60#[macro_export]
61macro_rules! register_guarded_gauge_vec_with_registry {
62    ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
63        let inner = prometheus::GaugeVec::new(prometheus::opts!($NAME, $HELP), $LABELS_NAMES);
64        inner.and_then(|inner| {
65            let inner = $crate::__extract_gauge_builder(inner);
66            let label_guarded = $crate::LabelGuardedGaugeVec::new(inner, { $LABELS_NAMES });
67            let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
68            result.map(move |()| label_guarded)
69        })
70    }};
71}
72
73#[macro_export]
74macro_rules! register_guarded_int_gauge_vec_with_registry {
75    ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
76        let inner = prometheus::IntGaugeVec::new(prometheus::opts!($NAME, $HELP), $LABELS_NAMES);
77        inner.and_then(|inner| {
78            let inner = $crate::__extract_gauge_builder(inner);
79            let label_guarded = $crate::LabelGuardedIntGaugeVec::new(inner, { $LABELS_NAMES });
80            let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
81            result.map(move |()| label_guarded)
82        })
83    }};
84}
85
86#[macro_export]
87macro_rules! register_guarded_uint_gauge_vec_with_registry {
88    ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
89        let inner = prometheus::core::GenericGaugeVec::<prometheus::core::AtomicU64>::new(
90            prometheus::opts!($NAME, $HELP),
91            $LABELS_NAMES,
92        );
93        inner.and_then(|inner| {
94            let inner = $crate::__extract_gauge_builder(inner);
95            let label_guarded = $crate::LabelGuardedUintGaugeVec::new(inner, { $LABELS_NAMES });
96            let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
97            result.map(move |()| label_guarded)
98        })
99    }};
100}
101
102#[macro_export]
103macro_rules! register_guarded_int_counter_vec_with_registry {
104    ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
105        let inner = prometheus::IntCounterVec::new(prometheus::opts!($NAME, $HELP), $LABELS_NAMES);
106        inner.and_then(|inner| {
107            let inner = $crate::__extract_counter_builder(inner);
108            let label_guarded = $crate::LabelGuardedIntCounterVec::new(inner, { $LABELS_NAMES });
109            let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
110            result.map(move |()| label_guarded)
111        })
112    }};
113}
114
115// put TAITs in a separate module to avoid "non-defining opaque type use in defining scope"
116mod tait {
117    use prometheus::core::{
118        Atomic, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec, MetricVec,
119        MetricVecBuilder,
120    };
121    use prometheus::{Histogram, HistogramVec};
122
123    pub type VecBuilderOfCounter<P: Atomic> = impl MetricVecBuilder<M = GenericCounter<P>>;
124    pub type VecBuilderOfGauge<P: Atomic> = impl MetricVecBuilder<M = GenericGauge<P>>;
125    pub type VecBuilderOfHistogram = impl MetricVecBuilder<M = Histogram>;
126
127    pub fn __extract_counter_builder<P: Atomic>(
128        vec: GenericCounterVec<P>,
129    ) -> MetricVec<VecBuilderOfCounter<P>> {
130        vec
131    }
132
133    pub fn __extract_gauge_builder<P: Atomic>(
134        vec: GenericGaugeVec<P>,
135    ) -> MetricVec<VecBuilderOfGauge<P>> {
136        vec
137    }
138
139    pub fn __extract_histogram_builder(vec: HistogramVec) -> MetricVec<VecBuilderOfHistogram> {
140        vec
141    }
142}
143pub use tait::*;
144
145use crate::UintGauge;
146
147pub type LabelGuardedHistogramVec<const N: usize> = LabelGuardedMetricVec<VecBuilderOfHistogram, N>;
148pub type LabelGuardedIntCounterVec<const N: usize> =
149    LabelGuardedMetricVec<VecBuilderOfCounter<AtomicU64>, N>;
150pub type LabelGuardedIntGaugeVec<const N: usize> =
151    LabelGuardedMetricVec<VecBuilderOfGauge<AtomicI64>, N>;
152pub type LabelGuardedUintGaugeVec<const N: usize> =
153    LabelGuardedMetricVec<VecBuilderOfGauge<AtomicU64>, N>;
154pub type LabelGuardedGaugeVec<const N: usize> =
155    LabelGuardedMetricVec<VecBuilderOfGauge<AtomicF64>, N>;
156
157pub type LabelGuardedHistogram<const N: usize> = LabelGuardedMetric<Histogram, N>;
158pub type LabelGuardedIntCounter<const N: usize> = LabelGuardedMetric<IntCounter, N>;
159pub type LabelGuardedIntGauge<const N: usize> = LabelGuardedMetric<IntGauge, N>;
160pub type LabelGuardedUintGauge<const N: usize> = LabelGuardedMetric<UintGauge, N>;
161pub type LabelGuardedGauge<const N: usize> = LabelGuardedMetric<Gauge, N>;
162
163pub type LabelGuardedLocalHistogram<const N: usize> = LabelGuardedMetric<LocalHistogram, N>;
164pub type LabelGuardedLocalIntCounter<const N: usize> = LabelGuardedMetric<LocalIntCounter, N>;
165
166fn gen_test_label<const N: usize>() -> [&'static str; N] {
167    const TEST_LABELS: [&str; 5] = ["test1", "test2", "test3", "test4", "test5"];
168    (0..N)
169        .map(|i| TEST_LABELS[i])
170        .collect_vec()
171        .try_into()
172        .unwrap()
173}
174
175#[derive(Default)]
176struct LabelGuardedMetricsInfo<const N: usize> {
177    labeled_metrics_count: HashMap<[String; N], usize>,
178    uncollected_removed_labels: HashSet<[String; N]>,
179}
180
181impl<const N: usize> LabelGuardedMetricsInfo<N> {
182    fn register_new_label(mutex: &Arc<Mutex<Self>>, labels: &[&str; N]) -> LabelGuard<N> {
183        let mut guard = mutex.lock();
184        let label_string = labels.map(|str| str.to_owned());
185        guard.uncollected_removed_labels.remove(&label_string);
186        *guard
187            .labeled_metrics_count
188            .entry(label_string.clone())
189            .or_insert(0) += 1;
190        LabelGuard {
191            labels: label_string,
192            info: mutex.clone(),
193        }
194    }
195}
196
197/// An RAII metrics vec with labels.
198///
199/// `LabelGuardedMetricVec` enhances the [`MetricVec`] to ensure the set of labels to be
200/// correctly removed from the Prometheus client once being dropped. This is useful for metrics
201/// that are associated with an object that can be dropped, such as streaming jobs, fragments,
202/// actors, batch tasks, etc.
203///
204/// When a set labels is dropped, it will record it in the `uncollected_removed_labels` set.
205/// Once the metrics has been collected, it will finally remove the metrics of the labels.
206///
207/// See also [`LabelGuardedMetricsInfo`] and [`LabelGuard::drop`].
208///
209/// # Arguments
210///
211/// * `T` - The type of the raw metrics vec.
212/// * `N` - The number of labels.
213#[derive(Clone)]
214pub struct LabelGuardedMetricVec<T: MetricVecBuilder, const N: usize> {
215    inner: MetricVec<T>,
216    info: Arc<Mutex<LabelGuardedMetricsInfo<N>>>,
217    labels: [&'static str; N],
218}
219
220impl<T: MetricVecBuilder, const N: usize> Debug for LabelGuardedMetricVec<T, N> {
221    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
222        f.debug_struct(format!("LabelGuardedMetricVec<{}, {}>", type_name::<T>(), N).as_str())
223            .field("label", &self.labels)
224            .finish()
225    }
226}
227
228impl<T: MetricVecBuilder, const N: usize> Collector for LabelGuardedMetricVec<T, N> {
229    fn desc(&self) -> Vec<&Desc> {
230        self.inner.desc()
231    }
232
233    fn collect(&self) -> Vec<MetricFamily> {
234        let mut guard = self.info.lock();
235        let ret = self.inner.collect();
236        for labels in guard.uncollected_removed_labels.drain() {
237            if let Err(e) = self
238                .inner
239                .remove_label_values(&labels.each_ref().map(|s| s.as_str()))
240            {
241                warn!(
242                    error = %e.as_report(),
243                    "err when delete metrics of {:?} of labels {:?}",
244                    self.inner.desc().first().expect("should have desc").fq_name,
245                    self.labels,
246                );
247            }
248        }
249        ret
250    }
251}
252
253impl<T: MetricVecBuilder, const N: usize> LabelGuardedMetricVec<T, N> {
254    pub fn new(inner: MetricVec<T>, labels: &[&'static str; N]) -> Self {
255        Self {
256            inner,
257            info: Default::default(),
258            labels: *labels,
259        }
260    }
261
262    /// This is similar to the `with_label_values` of the raw metrics vec.
263    /// We need to pay special attention that, unless for some special purpose,
264    /// we should not drop the returned `LabelGuardedMetric` immediately after
265    /// using it, such as `metrics.with_guarded_label_values(...).inc();`,
266    /// because after dropped the label will be regarded as not used any more,
267    /// and the internal raw metrics will be removed and reset.
268    ///
269    /// Instead, we should store the returned `LabelGuardedMetric` in a scope with longer
270    /// lifetime so that the labels can be regarded as being used in its whole life scope.
271    /// This is also the recommended way to use the raw metrics vec.
272    pub fn with_guarded_label_values(&self, labels: &[&str; N]) -> LabelGuardedMetric<T::M, N> {
273        let guard = LabelGuardedMetricsInfo::register_new_label(&self.info, labels);
274        let inner = self.inner.with_label_values(labels);
275        LabelGuardedMetric {
276            inner,
277            _guard: Arc::new(guard),
278        }
279    }
280
281    pub fn with_test_label(&self) -> LabelGuardedMetric<T::M, N> {
282        let labels: [&'static str; N] = gen_test_label::<N>();
283        self.with_guarded_label_values(&labels)
284    }
285}
286
287impl<const N: usize> LabelGuardedIntCounterVec<N> {
288    pub fn test_int_counter_vec() -> Self {
289        let registry = prometheus::Registry::new();
290        let labels = gen_test_label::<N>();
291        register_guarded_int_counter_vec_with_registry!("test", "test", &labels, &registry).unwrap()
292    }
293}
294
295impl<const N: usize> LabelGuardedIntGaugeVec<N> {
296    pub fn test_int_gauge_vec() -> Self {
297        let registry = prometheus::Registry::new();
298        let labels = gen_test_label::<N>();
299        register_guarded_int_gauge_vec_with_registry!("test", "test", &labels, &registry).unwrap()
300    }
301}
302
303impl<const N: usize> LabelGuardedGaugeVec<N> {
304    pub fn test_gauge_vec() -> Self {
305        let registry = prometheus::Registry::new();
306        let labels = gen_test_label::<N>();
307        register_guarded_gauge_vec_with_registry!("test", "test", &labels, &registry).unwrap()
308    }
309}
310
311impl<const N: usize> LabelGuardedHistogramVec<N> {
312    pub fn test_histogram_vec() -> Self {
313        let registry = prometheus::Registry::new();
314        let labels = gen_test_label::<N>();
315        register_guarded_histogram_vec_with_registry!("test", "test", &labels, &registry).unwrap()
316    }
317}
318
319#[derive(Clone)]
320struct LabelGuard<const N: usize> {
321    labels: [String; N],
322    info: Arc<Mutex<LabelGuardedMetricsInfo<N>>>,
323}
324
325impl<const N: usize> Drop for LabelGuard<N> {
326    fn drop(&mut self) {
327        let mut guard = self.info.lock();
328        let count = guard.labeled_metrics_count.get_mut(&self.labels).expect(
329            "should exist because the current existing dropping one means the count is not zero",
330        );
331        *count -= 1;
332        if *count == 0 {
333            guard
334                .labeled_metrics_count
335                .remove(&self.labels)
336                .expect("should exist");
337            guard.uncollected_removed_labels.insert(self.labels.clone());
338        }
339    }
340}
341
342#[derive(Clone)]
343pub struct LabelGuardedMetric<T, const N: usize> {
344    inner: T,
345    _guard: Arc<LabelGuard<N>>,
346}
347
348impl<T, const N: usize> Debug for LabelGuardedMetric<T, N> {
349    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
350        f.debug_struct("LabelGuardedMetric").finish()
351    }
352}
353
354impl<T, const N: usize> Deref for LabelGuardedMetric<T, N> {
355    type Target = T;
356
357    fn deref(&self) -> &Self::Target {
358        &self.inner
359    }
360}
361
362impl<const N: usize> LabelGuardedHistogram<N> {
363    pub fn test_histogram() -> Self {
364        LabelGuardedHistogramVec::<N>::test_histogram_vec().with_test_label()
365    }
366}
367
368impl<const N: usize> LabelGuardedIntCounter<N> {
369    pub fn test_int_counter() -> Self {
370        LabelGuardedIntCounterVec::<N>::test_int_counter_vec().with_test_label()
371    }
372}
373
374impl<const N: usize> LabelGuardedIntGauge<N> {
375    pub fn test_int_gauge() -> Self {
376        LabelGuardedIntGaugeVec::<N>::test_int_gauge_vec().with_test_label()
377    }
378}
379
380impl<const N: usize> LabelGuardedGauge<N> {
381    pub fn test_gauge() -> Self {
382        LabelGuardedGaugeVec::<N>::test_gauge_vec().with_test_label()
383    }
384}
385
386pub trait MetricWithLocal {
387    type Local;
388    fn local(&self) -> Self::Local;
389}
390
391impl MetricWithLocal for Histogram {
392    type Local = LocalHistogram;
393
394    fn local(&self) -> Self::Local {
395        self.local()
396    }
397}
398
399impl<P: Atomic> MetricWithLocal for GenericCounter<P> {
400    type Local = GenericLocalCounter<P>;
401
402    fn local(&self) -> Self::Local {
403        self.local()
404    }
405}
406
407impl<T: MetricWithLocal, const N: usize> LabelGuardedMetric<T, N> {
408    pub fn local(&self) -> LabelGuardedMetric<T::Local, N> {
409        LabelGuardedMetric {
410            inner: self.inner.local(),
411            _guard: self._guard.clone(),
412        }
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use prometheus::core::Collector;
419
420    use crate::LabelGuardedIntCounterVec;
421
422    #[test]
423    fn test_label_guarded_metrics_drop() {
424        let vec = LabelGuardedIntCounterVec::<3>::test_int_counter_vec();
425        let m1_1 = vec.with_guarded_label_values(&["1", "2", "3"]);
426        assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
427        let m1_2 = vec.with_guarded_label_values(&["1", "2", "3"]);
428        let m1_3 = m1_2.clone();
429        assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
430        let m2 = vec.with_guarded_label_values(&["2", "2", "3"]);
431        assert_eq!(2, vec.collect().pop().unwrap().get_metric().len());
432        drop(m1_3);
433        assert_eq!(2, vec.collect().pop().unwrap().get_metric().len());
434        assert_eq!(2, vec.collect().pop().unwrap().get_metric().len());
435        drop(m2);
436        assert_eq!(2, vec.collect().pop().unwrap().get_metric().len());
437        assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
438        drop(m1_1);
439        assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
440        assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
441        drop(m1_2);
442        assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
443        assert_eq!(0, vec.collect().pop().unwrap().get_metric().len());
444    }
445}