risingwave_common_metrics/
error_metrics.rs1use std::collections::HashMap;
16use std::sync::{Arc, LazyLock};
17
18use itertools::Itertools;
19use parking_lot::Mutex;
20use prometheus::Registry;
21use prometheus::core::{Collector, Desc};
22use prometheus::proto::{Gauge, LabelPair, Metric, MetricFamily};
23use rw_iter_util::ZipEqFast;
24
25use crate::monitor::GLOBAL_METRICS_REGISTRY;
26
27pub struct ErrorMetric<const N: usize> {
28 payload: Arc<Mutex<HashMap<[String; N], u32>>>,
29 desc: Desc,
30}
31
32impl<const N: usize> ErrorMetric<N> {
33 pub fn new(name: &str, help: &str, label_names: &[&str; N]) -> Self {
34 Self {
35 payload: Default::default(),
36 desc: Desc::new(
37 name.to_owned(),
38 help.to_owned(),
39 label_names.iter().map(|l| l.to_string()).collect_vec(),
40 Default::default(),
41 )
42 .unwrap(),
43 }
44 }
45
46 pub fn report(&self, labels: [String; N]) {
47 let mut m = self.payload.lock();
48 let v = m.entry(labels).or_default();
49 *v += 1;
50 }
51
52 fn collect(&self) -> MetricFamily {
53 let mut m = MetricFamily::default();
54 m.set_name(self.desc.fq_name.clone());
55 m.set_help(self.desc.help.clone());
56 m.set_field_type(prometheus::proto::MetricType::GAUGE);
57
58 let payload = self.payload.lock().drain().collect_vec();
59 let mut metrics = Vec::with_capacity(payload.len());
60 for (labels, count) in payload {
61 let mut label_pairs = Vec::with_capacity(self.desc.variable_labels.len());
62 for (name, label) in self.desc.variable_labels.iter().zip_eq_fast(labels) {
63 let mut label_pair = LabelPair::default();
64 label_pair.set_name(name.clone());
65 label_pair.set_value(label);
66 label_pairs.push(label_pair);
67 }
68
69 let mut metric = Metric::new();
70 metric.set_label(label_pairs.into());
71 let mut gauge = Gauge::default();
72 gauge.set_value(count as f64);
73 metric.set_gauge(gauge);
74 metrics.push(metric);
75 }
76 m.set_metric(metrics.into());
77 m
78 }
79}
80
81pub type ErrorMetricRef<const N: usize> = Arc<ErrorMetric<N>>;
82
83#[derive(Clone)]
88pub struct ErrorMetrics {
89 pub user_sink_error: ErrorMetricRef<4>,
90 pub user_compute_error: ErrorMetricRef<3>,
91 pub user_source_error: ErrorMetricRef<4>,
92}
93
94impl ErrorMetrics {
95 pub fn new() -> Self {
96 Self {
97 user_sink_error: Arc::new(ErrorMetric::new(
98 "user_sink_error",
99 "Sink errors in the system, queryable by tags",
100 &["error_type", "sink_id", "sink_name", "fragment_id"],
101 )),
102 user_compute_error: Arc::new(ErrorMetric::new(
103 "user_compute_error",
104 "Compute errors in the system, queryable by tags",
105 &["error_type", "executor_name", "fragment_id"],
106 )),
107 user_source_error: Arc::new(ErrorMetric::new(
108 "user_source_error",
109 "Source errors in the system, queryable by tags",
110 &["error_type", "source_id", "source_name", "fragment_id"],
111 )),
112 }
113 }
114
115 fn desc(&self) -> Vec<&Desc> {
116 vec![
117 &self.user_sink_error.desc,
118 &self.user_compute_error.desc,
119 &self.user_source_error.desc,
120 ]
121 }
122
123 fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
124 vec![
125 self.user_sink_error.collect(),
126 self.user_compute_error.collect(),
127 self.user_source_error.collect(),
128 ]
129 }
130}
131
132impl Default for ErrorMetrics {
133 fn default() -> Self {
134 ErrorMetrics::new()
135 }
136}
137
138pub struct ErrorMetricsCollector {
139 metrics: ErrorMetrics,
140}
141
142impl Collector for ErrorMetricsCollector {
143 fn desc(&self) -> Vec<&Desc> {
144 self.metrics.desc()
145 }
146
147 fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
148 self.metrics.collect()
149 }
150}
151
152pub fn monitor_errors(registry: &Registry, metrics: ErrorMetrics) {
153 let ec = ErrorMetricsCollector { metrics };
154 registry.register(Box::new(ec)).unwrap()
155}
156
157pub static GLOBAL_ERROR_METRICS: LazyLock<ErrorMetrics> = LazyLock::new(|| {
158 let e = ErrorMetrics::new();
159 monitor_errors(&GLOBAL_METRICS_REGISTRY, e.clone());
160 e
161});