risingwave_common_service/
metrics_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::ops::Deref;
17use std::sync::OnceLock;
18
19use axum::Extension;
20use axum::body::Body;
21use axum::handler::{Handler, HandlerWithoutStateExt};
22use axum::response::{IntoResponse, Response};
23use axum_extra::extract::Query as ExtraQuery;
24use prometheus::{Encoder, Registry, TextEncoder};
25use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
26use serde::Deserialize;
27use thiserror_ext::AsReport;
28use tokio::net::TcpListener;
29use tower_http::add_extension::AddExtensionLayer;
30use tower_http::compression::CompressionLayer;
31use tracing::{error, info, warn};
32
33/// The filter for metrics scrape handler. See [`MetricsManager::metrics`] for more details.
34#[derive(Debug, Deserialize)]
35struct Filter {
36    #[serde(default)]
37    include: HashSet<String>,
38    #[serde(default)]
39    exclude: HashSet<String>,
40}
41
42pub struct MetricsManager;
43
44impl MetricsManager {
45    pub fn boot_metrics_service(listen_addr: String) {
46        static METRICS_SERVICE_LISTEN_ADDR: OnceLock<String> = OnceLock::new();
47        let new_listen_addr = listen_addr.clone();
48        let current_listen_addr = METRICS_SERVICE_LISTEN_ADDR.get_or_init(|| {
49            let listen_addr_clone = listen_addr.clone();
50            #[cfg(not(madsim))] // no need in simulation test
51            tokio::spawn(async move {
52                info!(
53                    "Prometheus listener for Prometheus is set up on http://{}",
54                    listen_addr
55                );
56
57                let service = Self::metrics
58                    .layer(AddExtensionLayer::new(
59                        GLOBAL_METRICS_REGISTRY.deref().clone(),
60                    ))
61                    .layer(CompressionLayer::new())
62                    .into_make_service();
63
64                let serve_future =
65                    axum::serve(TcpListener::bind(&listen_addr).await.unwrap(), service);
66                if let Err(err) = serve_future.await {
67                    error!(error = %err.as_report(), "metrics service exited with error");
68                }
69            });
70            listen_addr_clone
71        });
72        if new_listen_addr != *current_listen_addr {
73            warn!(
74                "unable to listen port {} for metrics service. Currently listening on {}",
75                new_listen_addr, current_listen_addr
76            );
77        }
78    }
79
80    /// Gather metrics from the global registry and encode them in the Prometheus text format.
81    ///
82    /// The handler accepts the following query parameters to filter metrics. Note that `include`
83    /// and `exclude` should not be used together.
84    ///
85    /// - `/metrics`                            (without filter)
86    /// - `/metrics?include=foo`                (include one metric)
87    /// - `/metrics?include=foo&include=bar`    (include multiple metrics)
88    /// - `/metrics?exclude=foo&exclude=bar`    (include all but foo and bar)
89    ///
90    /// One can specify parameters by configuring Prometheus scrape config like below:
91    /// ```yaml
92    /// - job_name: compute-node
93    ///   params:
94    ///     include: ["foo", "bar"]
95    /// ```
96    #[expect(clippy::unused_async, reason = "required by service_fn")]
97    async fn metrics(
98        ExtraQuery(Filter { include, exclude }): ExtraQuery<Filter>,
99        Extension(registry): Extension<Registry>,
100    ) -> impl IntoResponse {
101        let mut mf = registry.gather();
102
103        // Filter metrics by name.
104        // TODO: can we avoid gathering them all?
105        if !include.is_empty() && !exclude.is_empty() {
106            return Response::builder()
107                .status(400)
108                .body("should not specify both include and exclude".into())
109                .unwrap();
110        } else if !include.is_empty() {
111            mf.retain(|fam| include.contains(fam.get_name()));
112        } else if !exclude.is_empty() {
113            mf.retain(|fam| !exclude.contains(fam.get_name()));
114        }
115
116        let encoder = TextEncoder::new();
117        let mut buffer = vec![];
118        encoder.encode(&mf, &mut buffer).unwrap();
119
120        Response::builder()
121            .header(axum::http::header::CONTENT_TYPE, encoder.format_type())
122            .body(Body::from(buffer))
123            .unwrap()
124    }
125}