risingwave_common_service/
metrics_manager.rs1use std::collections::HashSet;
16use std::ops::Deref;
17use std::sync::OnceLock;
18
19use axum::Extension;
20use axum::body::Body;
21use axum::handler::{Handler, HandlerWithoutStateExt};
22use axum::response::{IntoResponse, Response};
23use axum_extra::extract::Query as ExtraQuery;
24use prometheus::{Encoder, Registry, TextEncoder};
25use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
26use serde::Deserialize;
27use thiserror_ext::AsReport;
28use tokio::net::TcpListener;
29use tower_http::add_extension::AddExtensionLayer;
30use tower_http::compression::CompressionLayer;
31use tracing::{error, info, warn};
32
33#[derive(Debug, Deserialize)]
35struct Filter {
36 #[serde(default)]
37 include: HashSet<String>,
38 #[serde(default)]
39 exclude: HashSet<String>,
40}
41
42pub struct MetricsManager;
43
44impl MetricsManager {
45 pub fn boot_metrics_service(listen_addr: String) {
46 static METRICS_SERVICE_LISTEN_ADDR: OnceLock<String> = OnceLock::new();
47 let new_listen_addr = listen_addr.clone();
48 let current_listen_addr = METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| {
49 let listen_addr_clone = listen_addr.clone();
50 #[cfg(not(madsim))] tokio::spawn(async move {
52 info!(
53 "Prometheus listener for Prometheus is set up on http://{}",
54 listen_addr
55 );
56
57 let service = Self::metrics
58 .layer(AddExtensionLayer::new(
59 GLOBAL_METRICS_REGISTRY.deref().clone(),
60 ))
61 .layer(CompressionLayer::new())
62 .into_make_service();
63
64 let serve_future =
65 axum::serve(TcpListener::bind(&listen_addr).await.unwrap(), service);
66 if let Err(err) = serve_future.await {
67 error!(error = %err.as_report(), "metrics service exited with error");
68 }
69 });
70 listen_addr_clone
71 });
72 if new_listen_addr != *current_listen_addr {
73 warn!(
74 "unable to listen port {} for metrics service. Currently listening on {}",
75 new_listen_addr, current_listen_addr
76 );
77 }
78 }
79
80 #[expect(clippy::unused_async, reason = "required by service_fn")]
97 async fn metrics(
98 ExtraQuery(Filter { include, exclude }): ExtraQuery<Filter>,
99 Extension(registry): Extension<Registry>,
100 ) -> impl IntoResponse {
101 let mut mf = registry.gather();
102
103 if !include.is_empty() && !exclude.is_empty() {
106 return Response::builder()
107 .status(400)
108 .body("should not specify both include and exclude".into())
109 .unwrap();
110 } else if !include.is_empty() {
111 mf.retain(|fam| include.contains(fam.get_name()));
112 } else if !exclude.is_empty() {
113 mf.retain(|fam| !exclude.contains(fam.get_name()));
114 }
115
116 let encoder = TextEncoder::new();
117 let mut buffer = vec![];
118 encoder.encode(&mf, &mut buffer).unwrap();
119
120 Response::builder()
121 .header(axum::http::header::CONTENT_TYPE, encoder.format_type())
122 .body(Body::from(buffer))
123 .unwrap()
124 }
125}