risingwave_common_metrics/
guarded_metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::type_name;
16use std::collections::{HashMap, HashSet};
17use std::fmt::{Debug, Formatter};
18use std::ops::Deref;
19use std::sync::Arc;
20
21use parking_lot::Mutex;
22use prometheus::core::{
23    Atomic, AtomicF64, AtomicI64, AtomicU64, Collector, Desc, GenericCounter, GenericLocalCounter,
24    MetricVec, MetricVecBuilder,
25};
26use prometheus::local::{LocalHistogram, LocalIntCounter};
27use prometheus::proto::MetricFamily;
28use prometheus::{Gauge, Histogram, IntCounter, IntGauge};
29use thiserror_ext::AsReport;
30use tracing::warn;
31
32#[macro_export]
33macro_rules! register_guarded_histogram_vec_with_registry {
34    ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
35        $crate::register_guarded_histogram_vec_with_registry! {
36            {prometheus::histogram_opts!($NAME, $HELP)},
37            $LABELS_NAMES,
38            $REGISTRY
39        }
40    }};
41    ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $BUCKETS:expr, $REGISTRY:expr $(,)?) => {{
42        $crate::register_guarded_histogram_vec_with_registry! {
43            {prometheus::histogram_opts!($NAME, $HELP, $BUCKETS)},
44            $LABELS_NAMES,
45            $REGISTRY
46        }
47    }};
48    ($HOPTS:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
49        let inner = prometheus::HistogramVec::new($HOPTS, $LABELS_NAMES);
50        inner.and_then(|inner| {
51            let inner = $crate::__extract_histogram_builder(inner);
52            let label_guarded = $crate::LabelGuardedHistogramVec::new(inner, { $LABELS_NAMES });
53            let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
54            result.map(move |()| label_guarded)
55        })
56    }};
57}
58
59#[macro_export]
60macro_rules! register_guarded_gauge_vec_with_registry {
61    ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
62        let inner = prometheus::GaugeVec::new(prometheus::opts!($NAME, $HELP), $LABELS_NAMES);
63        inner.and_then(|inner| {
64            let inner = $crate::__extract_gauge_builder(inner);
65            let label_guarded = $crate::LabelGuardedGaugeVec::new(inner, { $LABELS_NAMES });
66            let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
67            result.map(move |()| label_guarded)
68        })
69    }};
70}
71
72#[macro_export]
73macro_rules! register_guarded_int_gauge_vec_with_registry {
74    ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
75        let inner = prometheus::IntGaugeVec::new(prometheus::opts!($NAME, $HELP), $LABELS_NAMES);
76        inner.and_then(|inner| {
77            let inner = $crate::__extract_gauge_builder(inner);
78            let label_guarded = $crate::LabelGuardedIntGaugeVec::new(inner, { $LABELS_NAMES });
79            let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
80            result.map(move |()| label_guarded)
81        })
82    }};
83}
84
85#[macro_export]
86macro_rules! register_guarded_uint_gauge_vec_with_registry {
87    ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
88        let inner = prometheus::core::GenericGaugeVec::<prometheus::core::AtomicU64>::new(
89            prometheus::opts!($NAME, $HELP),
90            $LABELS_NAMES,
91        );
92        inner.and_then(|inner| {
93            let inner = $crate::__extract_gauge_builder(inner);
94            let label_guarded = $crate::LabelGuardedUintGaugeVec::new(inner, { $LABELS_NAMES });
95            let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
96            result.map(move |()| label_guarded)
97        })
98    }};
99}
100
101#[macro_export]
102macro_rules! register_guarded_int_counter_vec_with_registry {
103    ($NAME:expr, $HELP:expr, $LABELS_NAMES:expr, $REGISTRY:expr $(,)?) => {{
104        let inner = prometheus::IntCounterVec::new(prometheus::opts!($NAME, $HELP), $LABELS_NAMES);
105        inner.and_then(|inner| {
106            let inner = $crate::__extract_counter_builder(inner);
107            let label_guarded = $crate::LabelGuardedIntCounterVec::new(inner, { $LABELS_NAMES });
108            let result = ($REGISTRY).register(Box::new(label_guarded.clone()));
109            result.map(move |()| label_guarded)
110        })
111    }};
112}
113
114// put TAITs in a separate module to avoid "non-defining opaque type use in defining scope"
115mod tait {
116    use prometheus::core::{
117        Atomic, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec, MetricVec,
118        MetricVecBuilder,
119    };
120    use prometheus::{Histogram, HistogramVec};
121
122    pub type VecBuilderOfCounter<P: Atomic> = impl MetricVecBuilder<M = GenericCounter<P>>;
123    pub type VecBuilderOfGauge<P: Atomic> = impl MetricVecBuilder<M = GenericGauge<P>>;
124    pub type VecBuilderOfHistogram = impl MetricVecBuilder<M = Histogram>;
125
126    #[define_opaque(VecBuilderOfCounter)]
127    pub fn __extract_counter_builder<P: Atomic>(
128        vec: GenericCounterVec<P>,
129    ) -> MetricVec<VecBuilderOfCounter<P>> {
130        vec
131    }
132
133    #[define_opaque(VecBuilderOfGauge)]
134    pub fn __extract_gauge_builder<P: Atomic>(
135        vec: GenericGaugeVec<P>,
136    ) -> MetricVec<VecBuilderOfGauge<P>> {
137        vec
138    }
139
140    #[define_opaque(VecBuilderOfHistogram)]
141    pub fn __extract_histogram_builder(vec: HistogramVec) -> MetricVec<VecBuilderOfHistogram> {
142        vec
143    }
144}
145pub use tait::*;
146
147use crate::UintGauge;
148
149pub type LabelGuardedHistogramVec = LabelGuardedMetricVec<VecBuilderOfHistogram>;
150pub type LabelGuardedIntCounterVec = LabelGuardedMetricVec<VecBuilderOfCounter<AtomicU64>>;
151pub type LabelGuardedIntGaugeVec = LabelGuardedMetricVec<VecBuilderOfGauge<AtomicI64>>;
152pub type LabelGuardedUintGaugeVec = LabelGuardedMetricVec<VecBuilderOfGauge<AtomicU64>>;
153pub type LabelGuardedGaugeVec = LabelGuardedMetricVec<VecBuilderOfGauge<AtomicF64>>;
154
155pub type LabelGuardedHistogram = LabelGuardedMetric<Histogram>;
156pub type LabelGuardedIntCounter = LabelGuardedMetric<IntCounter>;
157pub type LabelGuardedIntGauge = LabelGuardedMetric<IntGauge>;
158pub type LabelGuardedUintGauge = LabelGuardedMetric<UintGauge>;
159pub type LabelGuardedGauge = LabelGuardedMetric<Gauge>;
160
161pub type LabelGuardedLocalHistogram = LabelGuardedMetric<LocalHistogram>;
162pub type LabelGuardedLocalIntCounter = LabelGuardedMetric<LocalIntCounter>;
163
164fn gen_test_label<const N: usize>() -> [&'static str; N] {
165    const TEST_LABELS: [&str; 5] = ["test1", "test2", "test3", "test4", "test5"];
166    (0..N)
167        .map(|i| TEST_LABELS[i])
168        .collect::<Vec<_>>()
169        .try_into()
170        .unwrap()
171}
172
173#[derive(Default)]
174struct LabelGuardedMetricsInfo {
175    labeled_metrics_count: HashMap<Box<[String]>, usize>,
176    uncollected_removed_labels: HashSet<Box<[String]>>,
177}
178
179impl LabelGuardedMetricsInfo {
180    fn register_new_label<V: AsRef<str>>(mutex: &Arc<Mutex<Self>>, labels: &[V]) -> LabelGuard {
181        let mut guard = mutex.lock();
182        let label_string = labels
183            .iter()
184            .map(|label| label.as_ref().to_owned())
185            .collect::<Vec<_>>()
186            .into_boxed_slice();
187        guard.uncollected_removed_labels.remove(&label_string);
188        *guard
189            .labeled_metrics_count
190            .entry(label_string.clone())
191            .or_insert(0) += 1;
192        LabelGuard {
193            labels: label_string,
194            info: mutex.clone(),
195        }
196    }
197}
198
199/// An RAII metrics vec with labels.
200///
201/// `LabelGuardedMetricVec` enhances the [`MetricVec`] to ensure the set of labels to be
202/// correctly removed from the Prometheus client once being dropped. This is useful for metrics
203/// that are associated with an object that can be dropped, such as streaming jobs, fragments,
204/// actors, batch tasks, etc.
205///
206/// When a set labels is dropped, it will record it in the `uncollected_removed_labels` set.
207/// Once the metrics has been collected, it will finally remove the metrics of the labels.
208///
209/// See also [`LabelGuardedMetricsInfo`] and [`LabelGuard::drop`].
210///
211/// # Arguments
212///
213/// * `T` - The type of the raw metrics vec.
214/// * `N` - The number of labels.
215#[derive(Clone)]
216pub struct LabelGuardedMetricVec<T: MetricVecBuilder> {
217    inner: MetricVec<T>,
218    info: Arc<Mutex<LabelGuardedMetricsInfo>>,
219    labels: Box<[&'static str]>,
220}
221
222impl<T: MetricVecBuilder> Debug for LabelGuardedMetricVec<T> {
223    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
224        f.debug_struct(format!("LabelGuardedMetricVec<{}>", type_name::<T>()).as_str())
225            .field("label", &self.labels)
226            .finish()
227    }
228}
229
230impl<T: MetricVecBuilder> Collector for LabelGuardedMetricVec<T> {
231    fn desc(&self) -> Vec<&Desc> {
232        self.inner.desc()
233    }
234
235    fn collect(&self) -> Vec<MetricFamily> {
236        let mut guard = self.info.lock();
237        let ret = self.inner.collect();
238        for labels in guard.uncollected_removed_labels.drain() {
239            if let Err(e) = self.inner.remove_label_values(&labels) {
240                warn!(
241                    error = %e.as_report(),
242                    "err when delete metrics of {:?} of labels {:?}",
243                    self.inner.desc().first().expect("should have desc").fq_name,
244                    self.labels,
245                );
246            }
247        }
248        ret
249    }
250}
251
252impl<T: MetricVecBuilder> LabelGuardedMetricVec<T> {
253    pub fn new(inner: MetricVec<T>, labels: &[&'static str]) -> Self {
254        Self {
255            inner,
256            info: Default::default(),
257            labels: labels.to_vec().into_boxed_slice(),
258        }
259    }
260
261    /// This is similar to the `with_label_values` of the raw metrics vec.
262    /// We need to pay special attention that, unless for some special purpose,
263    /// we should not drop the returned `LabelGuardedMetric` immediately after
264    /// using it, such as `metrics.with_guarded_label_values(...).inc();`,
265    /// because after dropped the label will be regarded as not used any more,
266    /// and the internal raw metrics will be removed and reset.
267    ///
268    /// Instead, we should store the returned `LabelGuardedMetric` in a scope with longer
269    /// lifetime so that the labels can be regarded as being used in its whole life scope.
270    /// This is also the recommended way to use the raw metrics vec.
271    pub fn with_guarded_label_values<V: AsRef<str> + std::fmt::Debug>(
272        &self,
273        labels: &[V],
274    ) -> LabelGuardedMetric<T::M> {
275        let guard = LabelGuardedMetricsInfo::register_new_label(&self.info, labels);
276        let inner = self.inner.with_label_values(labels);
277        LabelGuardedMetric {
278            inner,
279            _guard: Arc::new(guard),
280        }
281    }
282
283    pub fn with_test_label<const N: usize>(&self) -> LabelGuardedMetric<T::M> {
284        let labels = gen_test_label::<N>();
285        self.with_guarded_label_values(&labels)
286    }
287}
288
289impl LabelGuardedIntCounterVec {
290    pub fn test_int_counter_vec<const N: usize>() -> Self {
291        let registry = prometheus::Registry::new();
292        let labels = gen_test_label::<N>();
293        register_guarded_int_counter_vec_with_registry!("test", "test", &labels, &registry).unwrap()
294    }
295}
296
297impl LabelGuardedIntGaugeVec {
298    pub fn test_int_gauge_vec<const N: usize>() -> Self {
299        let registry = prometheus::Registry::new();
300        let labels = gen_test_label::<N>();
301        register_guarded_int_gauge_vec_with_registry!("test", "test", &labels, &registry).unwrap()
302    }
303}
304
305impl LabelGuardedGaugeVec {
306    pub fn test_gauge_vec<const N: usize>() -> Self {
307        let registry = prometheus::Registry::new();
308        let labels = gen_test_label::<N>();
309        register_guarded_gauge_vec_with_registry!("test", "test", &labels, &registry).unwrap()
310    }
311}
312
313impl LabelGuardedHistogramVec {
314    pub fn test_histogram_vec<const N: usize>() -> Self {
315        let registry = prometheus::Registry::new();
316        let labels = gen_test_label::<N>();
317        register_guarded_histogram_vec_with_registry!("test", "test", &labels, &registry).unwrap()
318    }
319}
320
321#[derive(Clone)]
322struct LabelGuard {
323    labels: Box<[String]>,
324    info: Arc<Mutex<LabelGuardedMetricsInfo>>,
325}
326
327impl Drop for LabelGuard {
328    fn drop(&mut self) {
329        let mut guard = self.info.lock();
330        let count = guard.labeled_metrics_count.get_mut(&self.labels).expect(
331            "should exist because the current existing dropping one means the count is not zero",
332        );
333        *count -= 1;
334        if *count == 0 {
335            guard
336                .labeled_metrics_count
337                .remove(&self.labels)
338                .expect("should exist");
339            guard.uncollected_removed_labels.insert(self.labels.clone());
340        }
341    }
342}
343
344#[derive(Clone)]
345pub struct LabelGuardedMetric<T> {
346    inner: T,
347    _guard: Arc<LabelGuard>,
348}
349
350impl<T> Debug for LabelGuardedMetric<T> {
351    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
352        f.debug_struct("LabelGuardedMetric").finish()
353    }
354}
355
356impl<T> Deref for LabelGuardedMetric<T> {
357    type Target = T;
358
359    fn deref(&self) -> &Self::Target {
360        &self.inner
361    }
362}
363
364impl LabelGuardedHistogram {
365    pub fn test_histogram<const N: usize>() -> Self {
366        LabelGuardedHistogramVec::test_histogram_vec::<N>().with_test_label::<N>()
367    }
368}
369
370impl LabelGuardedIntCounter {
371    pub fn test_int_counter<const N: usize>() -> Self {
372        LabelGuardedIntCounterVec::test_int_counter_vec::<N>().with_test_label::<N>()
373    }
374}
375
376impl LabelGuardedIntGauge {
377    pub fn test_int_gauge<const N: usize>() -> Self {
378        LabelGuardedIntGaugeVec::test_int_gauge_vec::<N>().with_test_label::<N>()
379    }
380}
381
382impl LabelGuardedGauge {
383    pub fn test_gauge<const N: usize>() -> Self {
384        LabelGuardedGaugeVec::test_gauge_vec::<N>().with_test_label::<N>()
385    }
386}
387
388pub trait MetricWithLocal {
389    type Local;
390    fn local(&self) -> Self::Local;
391}
392
393impl MetricWithLocal for Histogram {
394    type Local = LocalHistogram;
395
396    fn local(&self) -> Self::Local {
397        self.local()
398    }
399}
400
401impl<P: Atomic> MetricWithLocal for GenericCounter<P> {
402    type Local = GenericLocalCounter<P>;
403
404    fn local(&self) -> Self::Local {
405        self.local()
406    }
407}
408
409impl<T: MetricWithLocal> LabelGuardedMetric<T> {
410    pub fn local(&self) -> LabelGuardedMetric<T::Local> {
411        LabelGuardedMetric {
412            inner: self.inner.local(),
413            _guard: self._guard.clone(),
414        }
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use prometheus::core::Collector;
421
422    use crate::LabelGuardedIntCounterVec;
423
424    #[test]
425    fn test_label_guarded_metrics_drop() {
426        let vec = LabelGuardedIntCounterVec::test_int_counter_vec::<3>();
427        let m1_1 = vec.with_guarded_label_values(&["1", "2", "3"]);
428        assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
429        let m1_2 = vec.with_guarded_label_values(&["1", "2", "3"]);
430        let m1_3 = m1_2.clone();
431        assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
432        let m2 = vec.with_guarded_label_values(&["2", "2", "3"]);
433        assert_eq!(2, vec.collect().pop().unwrap().get_metric().len());
434        drop(m1_3);
435        assert_eq!(2, vec.collect().pop().unwrap().get_metric().len());
436        assert_eq!(2, vec.collect().pop().unwrap().get_metric().len());
437        drop(m2);
438        assert_eq!(2, vec.collect().pop().unwrap().get_metric().len());
439        assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
440        drop(m1_1);
441        assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
442        assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
443        drop(m1_2);
444        assert_eq!(1, vec.collect().pop().unwrap().get_metric().len());
445        assert_eq!(0, vec.collect().pop().unwrap().get_metric().len());
446    }
447}