risingwave_common_metrics/
error_metrics.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::{Arc, LazyLock};
17
18use itertools::Itertools;
19use parking_lot::Mutex;
20use prometheus::Registry;
21use prometheus::core::{Collector, Desc};
22use prometheus::proto::{Gauge, LabelPair, Metric, MetricFamily};
23use rw_iter_util::ZipEqFast;
24
25use crate::monitor::GLOBAL_METRICS_REGISTRY;
26
27pub struct ErrorMetric<const N: usize> {
28    payload: Arc<Mutex<HashMap<[String; N], u32>>>,
29    desc: Desc,
30}
31
32impl<const N: usize> ErrorMetric<N> {
33    pub fn new(name: &str, help: &str, label_names: &[&str; N]) -> Self {
34        Self {
35            payload: Default::default(),
36            desc: Desc::new(
37                name.to_owned(),
38                help.to_owned(),
39                label_names.iter().map(|l| l.to_string()).collect_vec(),
40                Default::default(),
41            )
42            .unwrap(),
43        }
44    }
45
46    pub fn report(&self, labels: [String; N]) {
47        let mut m = self.payload.lock();
48        let v = m.entry(labels).or_default();
49        *v += 1;
50    }
51
52    fn collect(&self) -> MetricFamily {
53        let mut m = MetricFamily::default();
54        m.set_name(self.desc.fq_name.clone());
55        m.set_help(self.desc.help.clone());
56        m.set_field_type(prometheus::proto::MetricType::GAUGE);
57
58        let payload = self.payload.lock().drain().collect_vec();
59        let mut metrics = Vec::with_capacity(payload.len());
60        for (labels, count) in payload {
61            let mut label_pairs = Vec::with_capacity(self.desc.variable_labels.len());
62            for (name, label) in self.desc.variable_labels.iter().zip_eq_fast(labels) {
63                let mut label_pair = LabelPair::default();
64                label_pair.set_name(name.clone());
65                label_pair.set_value(label);
66                label_pairs.push(label_pair);
67            }
68
69            let mut metric = Metric::new();
70            metric.set_label(label_pairs.into());
71            let mut gauge = Gauge::default();
72            gauge.set_value(count as f64);
73            metric.set_gauge(gauge);
74            metrics.push(metric);
75        }
76        m.set_metric(metrics.into());
77        m
78    }
79}
80
81pub type ErrorMetricRef<const N: usize> = Arc<ErrorMetric<N>>;
82
83/// Metrics for counting errors in the system.
84/// The detailed error messages are not supposed to be stored in the metrics, but in the logs.
85///
86/// Please avoid adding new error metrics here. Instead, introduce new `error_type` for new errors.
87#[derive(Clone)]
88pub struct ErrorMetrics {
89    pub user_sink_error: ErrorMetricRef<4>,
90    pub user_compute_error: ErrorMetricRef<3>,
91    pub user_source_error: ErrorMetricRef<4>,
92}
93
94impl ErrorMetrics {
95    pub fn new() -> Self {
96        Self {
97            user_sink_error: Arc::new(ErrorMetric::new(
98                "user_sink_error",
99                "Sink errors in the system, queryable by tags",
100                &["error_type", "sink_id", "sink_name", "fragment_id"],
101            )),
102            user_compute_error: Arc::new(ErrorMetric::new(
103                "user_compute_error",
104                "Compute errors in the system, queryable by tags",
105                &["error_type", "executor_name", "fragment_id"],
106            )),
107            user_source_error: Arc::new(ErrorMetric::new(
108                "user_source_error",
109                "Source errors in the system, queryable by tags",
110                &["error_type", "source_id", "source_name", "fragment_id"],
111            )),
112        }
113    }
114
115    fn desc(&self) -> Vec<&Desc> {
116        vec![
117            &self.user_sink_error.desc,
118            &self.user_compute_error.desc,
119            &self.user_source_error.desc,
120        ]
121    }
122
123    fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
124        vec![
125            self.user_sink_error.collect(),
126            self.user_compute_error.collect(),
127            self.user_source_error.collect(),
128        ]
129    }
130}
131
132impl Default for ErrorMetrics {
133    fn default() -> Self {
134        ErrorMetrics::new()
135    }
136}
137
138pub struct ErrorMetricsCollector {
139    metrics: ErrorMetrics,
140}
141
142impl Collector for ErrorMetricsCollector {
143    fn desc(&self) -> Vec<&Desc> {
144        self.metrics.desc()
145    }
146
147    fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
148        self.metrics.collect()
149    }
150}
151
152pub fn monitor_errors(registry: &Registry, metrics: ErrorMetrics) {
153    let ec = ErrorMetricsCollector { metrics };
154    registry.register(Box::new(ec)).unwrap()
155}
156
157pub static GLOBAL_ERROR_METRICS: LazyLock<ErrorMetrics> = LazyLock::new(|| {
158    let e = ErrorMetrics::new();
159    monitor_errors(&GLOBAL_METRICS_REGISTRY, e.clone());
160    e
161});