risingwave_meta/dashboard/
prometheus.rs1use std::collections::HashMap;
16use std::time::SystemTime;
17
18use anyhow::anyhow;
19use axum::{Extension, Json};
20use prometheus_http_query::response::{InstantVector, RangeVector, Sample};
21use serde::Serialize;
22
23use super::Service;
24use super::handlers::{DashboardError, err};
25
26#[derive(Serialize, Debug)]
27pub struct PrometheusSample {
28 pub timestamp: f64,
29 pub value: f64,
30}
31
32impl From<&Sample> for PrometheusSample {
33 fn from(value: &Sample) -> Self {
34 PrometheusSample {
35 timestamp: value.timestamp(),
36 value: value.value(),
37 }
38 }
39}
40
41#[derive(Serialize, Debug)]
42pub struct PrometheusVector {
43 metric: HashMap<String, String>,
44 sample: Vec<PrometheusSample>,
46}
47
48impl From<&RangeVector> for PrometheusVector {
49 fn from(value: &RangeVector) -> Self {
50 PrometheusVector {
51 metric: value.metric().clone(),
52 sample: value.samples().iter().map(Into::into).collect(),
53 }
54 }
55}
56
57impl From<&InstantVector> for PrometheusVector {
60 fn from(value: &InstantVector) -> Self {
61 PrometheusVector {
62 metric: value.metric().clone(),
63 sample: vec![value.sample().into()],
64 }
65 }
66}
67
68#[derive(Serialize, Debug)]
69#[serde(rename_all = "camelCase")]
70pub struct ClusterMetrics {
71 cpu_data: Vec<PrometheusVector>,
72 memory_data: Vec<PrometheusVector>,
73}
74
75pub type Result<T> = std::result::Result<T, DashboardError>;
76
77pub async fn list_prometheus_cluster(
78 Extension(srv): Extension<Service>,
79) -> Result<Json<ClusterMetrics>> {
80 if let Some(ref client) = srv.prometheus_client {
81 let now = SystemTime::now();
83 let cpu_query = format!(
84 "sum(rate(process_cpu_seconds_total{{job=~\"standalone|compute|meta|frontend\", {}}}[60s]) or label_replace(rate(process_cpu_seconds_total{{component=~\"standalone|compute|meta|frontend\", {}}}[60s]), \"job\", \"$1\", \"component\", \"(.*)\")) by (job,instance)",
85 srv.prometheus_selector, srv.prometheus_selector
86 );
87 let result = client
88 .query_range(
89 cpu_query,
90 now.duration_since(SystemTime::UNIX_EPOCH)
91 .unwrap()
92 .as_secs() as i64
93 - 3600,
94 now.duration_since(SystemTime::UNIX_EPOCH)
95 .unwrap()
96 .as_secs() as i64,
97 15.0,
98 )
99 .get()
100 .await
101 .map_err(err)?;
102 let cpu_data = result
103 .data()
104 .as_matrix()
105 .unwrap()
106 .iter()
107 .map(PrometheusVector::from)
108 .collect();
109 let memory_query = format!(
110 "avg(process_resident_memory_bytes{{job=~\"standalone|compute|meta|frontend\", {}}} or label_replace(process_resident_memory_bytes{{component=~\"standalone|compute|meta|frontend\", {}}}, \"job\", \"$1\", \"component\", \"(.*)\")) by (job,instance)",
111 srv.prometheus_selector, srv.prometheus_selector
112 );
113 let result = client
114 .query_range(
115 memory_query,
116 now.duration_since(SystemTime::UNIX_EPOCH)
117 .unwrap()
118 .as_secs() as i64
119 - 3600,
120 now.duration_since(SystemTime::UNIX_EPOCH)
121 .unwrap()
122 .as_secs() as i64,
123 60.0,
124 )
125 .get()
126 .await
127 .map_err(err)?;
128 let memory_data = result
129 .data()
130 .as_matrix()
131 .unwrap()
132 .iter()
133 .map(PrometheusVector::from)
134 .collect();
135 Ok(Json(ClusterMetrics {
136 cpu_data,
137 memory_data,
138 }))
139 } else {
140 Err(err(anyhow!("Prometheus endpoint is not set")))
141 }
142}