risingwave_meta/dashboard/
prometheus.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::time::SystemTime;
17
18use anyhow::anyhow;
19use axum::{Extension, Json};
20use prometheus_http_query::response::{InstantVector, RangeVector, Sample};
21use serde::Serialize;
22
23use super::Service;
24use super::handlers::{DashboardError, err};
25
26#[derive(Serialize, Debug)]
27pub struct PrometheusSample {
28    pub timestamp: f64,
29    pub value: f64,
30}
31
32impl From<&Sample> for PrometheusSample {
33    fn from(value: &Sample) -> Self {
34        PrometheusSample {
35            timestamp: value.timestamp(),
36            value: value.value(),
37        }
38    }
39}
40
41#[derive(Serialize, Debug)]
42pub struct PrometheusVector {
43    metric: HashMap<String, String>,
44    // Multiple samples from `RangeVector` or single sample from `InstantVector`.
45    sample: Vec<PrometheusSample>,
46}
47
48impl From<&RangeVector> for PrometheusVector {
49    fn from(value: &RangeVector) -> Self {
50        PrometheusVector {
51            metric: value.metric().clone(),
52            sample: value.samples().iter().map(Into::into).collect(),
53        }
54    }
55}
56
57// Note(eric): For backward compatibility, we store the `InstantVector` as a single sample,
58// instead of defining a new struct.
59impl From<&InstantVector> for PrometheusVector {
60    fn from(value: &InstantVector) -> Self {
61        PrometheusVector {
62            metric: value.metric().clone(),
63            sample: vec![value.sample().into()],
64        }
65    }
66}
67
68#[derive(Serialize, Debug)]
69#[serde(rename_all = "camelCase")]
70pub struct ClusterMetrics {
71    cpu_data: Vec<PrometheusVector>,
72    memory_data: Vec<PrometheusVector>,
73}
74
75pub type Result<T> = std::result::Result<T, DashboardError>;
76
77pub async fn list_prometheus_cluster(
78    Extension(srv): Extension<Service>,
79) -> Result<Json<ClusterMetrics>> {
80    if let Some(ref client) = srv.prometheus_client {
81        // assume job_name is one of compute, meta, frontend
82        let now = SystemTime::now();
83        let cpu_query = format!(
84            "sum(rate(process_cpu_seconds_total{{job=~\"standalone|compute|meta|frontend\", {}}}[60s]) or label_replace(rate(process_cpu_seconds_total{{component=~\"standalone|compute|meta|frontend\", {}}}[60s]), \"job\", \"$1\", \"component\", \"(.*)\")) by (job,instance)",
85            srv.prometheus_selector, srv.prometheus_selector
86        );
87        let result = client
88            .query_range(
89                cpu_query,
90                now.duration_since(SystemTime::UNIX_EPOCH)
91                    .unwrap()
92                    .as_secs() as i64
93                    - 3600,
94                now.duration_since(SystemTime::UNIX_EPOCH)
95                    .unwrap()
96                    .as_secs() as i64,
97                15.0,
98            )
99            .get()
100            .await
101            .map_err(err)?;
102        let cpu_data = result
103            .data()
104            .as_matrix()
105            .unwrap()
106            .iter()
107            .map(PrometheusVector::from)
108            .collect();
109        let memory_query = format!(
110            "avg(process_resident_memory_bytes{{job=~\"standalone|compute|meta|frontend\", {}}} or label_replace(process_resident_memory_bytes{{component=~\"standalone|compute|meta|frontend\", {}}}, \"job\", \"$1\", \"component\", \"(.*)\")) by (job,instance)",
111            srv.prometheus_selector, srv.prometheus_selector
112        );
113        let result = client
114            .query_range(
115                memory_query,
116                now.duration_since(SystemTime::UNIX_EPOCH)
117                    .unwrap()
118                    .as_secs() as i64
119                    - 3600,
120                now.duration_since(SystemTime::UNIX_EPOCH)
121                    .unwrap()
122                    .as_secs() as i64,
123                60.0,
124            )
125            .get()
126            .await
127            .map_err(err)?;
128        let memory_data = result
129            .data()
130            .as_matrix()
131            .unwrap()
132            .iter()
133            .map(PrometheusVector::from)
134            .collect();
135        Ok(Json(ClusterMetrics {
136            cpu_data,
137            memory_data,
138        }))
139    } else {
140        Err(err(anyhow!("Prometheus endpoint is not set")))
141    }
142}