risingwave_meta/dashboard/
prometheus.rsuse std::collections::HashMap;
use std::time::SystemTime;
use anyhow::anyhow;
use axum::{Extension, Json};
use prometheus_http_query::response::{InstantVector, RangeVector, Sample};
use serde::Serialize;
use super::handlers::{err, DashboardError};
use super::Service;
#[derive(Serialize, Debug)]
pub struct PrometheusSample {
pub timestamp: f64,
pub value: f64,
}
impl From<&Sample> for PrometheusSample {
fn from(value: &Sample) -> Self {
PrometheusSample {
timestamp: value.timestamp(),
value: value.value(),
}
}
}
#[derive(Serialize, Debug)]
pub struct PrometheusVector {
metric: HashMap<String, String>,
sample: Vec<PrometheusSample>,
}
impl From<&RangeVector> for PrometheusVector {
fn from(value: &RangeVector) -> Self {
PrometheusVector {
metric: value.metric().clone(),
sample: value.samples().iter().map(Into::into).collect(),
}
}
}
impl From<&InstantVector> for PrometheusVector {
fn from(value: &InstantVector) -> Self {
PrometheusVector {
metric: value.metric().clone(),
sample: vec![value.sample().into()],
}
}
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ClusterMetrics {
cpu_data: Vec<PrometheusVector>,
memory_data: Vec<PrometheusVector>,
}
pub type Result<T> = std::result::Result<T, DashboardError>;
pub async fn list_prometheus_cluster(
Extension(srv): Extension<Service>,
) -> Result<Json<ClusterMetrics>> {
if let Some(ref client) = srv.prometheus_client {
let now = SystemTime::now();
let cpu_query =
format!("sum(rate(process_cpu_seconds_total{{job=~\"compute|meta|frontend\",{}}}[60s])) by (job,instance)", srv.prometheus_selector);
let result = client
.query_range(
cpu_query,
now.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64
- 3600,
now.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
15.0,
)
.get()
.await
.map_err(err)?;
let cpu_data = result
.data()
.as_matrix()
.unwrap()
.iter()
.map(PrometheusVector::from)
.collect();
let memory_query =
format!("avg(process_resident_memory_bytes{{job=~\"compute|meta|frontend\",{}}}) by (job,instance)", srv.prometheus_selector);
let result = client
.query_range(
memory_query,
now.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64
- 3600,
now.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
60.0,
)
.get()
.await
.map_err(err)?;
let memory_data = result
.data()
.as_matrix()
.unwrap()
.iter()
.map(PrometheusVector::from)
.collect();
Ok(Json(ClusterMetrics {
cpu_data,
memory_data,
}))
} else {
Err(err(anyhow!("Prometheus endpoint is not set")))
}
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct FragmentBackPressure {
output_buffer_blocking_duration: Vec<PrometheusVector>,
}
pub async fn list_prometheus_fragment_back_pressure(
Extension(srv): Extension<Service>,
) -> Result<Json<FragmentBackPressure>> {
if let Some(ref client) = srv.prometheus_client {
let back_pressure_query = format!(
"sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[60s])) by (fragment_id, downstream_fragment_id) \
/ ignoring (downstream_fragment_id) group_left sum(stream_actor_count{{{}}}) by (fragment_id) \
/ 1000000000",
srv.prometheus_selector,
srv.prometheus_selector,
);
let result = client.query(back_pressure_query).get().await.map_err(err)?;
let back_pressure_data = result
.data()
.as_vector()
.unwrap()
.iter()
.map(PrometheusVector::from)
.collect();
Ok(Json(FragmentBackPressure {
output_buffer_blocking_duration: back_pressure_data,
}))
} else {
Err(err(anyhow!("Prometheus endpoint is not set")))
}
}