risingwave_frontend/
metrics_reader.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::{Result, anyhow};
18use prometheus_http_query::Client as PrometheusClient;
19use risingwave_common::metrics_reader::{ChannelDeltaStats, ChannelKey, MetricsReader};
20
21/// Default time offset in seconds for metrics queries
22const DEFAULT_TIME_OFFSET_SECONDS: i64 = 60;
23
24/// Conversion factor from nanoseconds to seconds
25const NANOSECONDS_TO_SECONDS: f64 = 1_000_000_000.0;
26
27/// Implementation of `MetricsReader` that queries Prometheus directly.
28pub struct MetricsReaderImpl {
29    prometheus_client: Option<PrometheusClient>,
30    prometheus_selector: String,
31}
32
33impl MetricsReaderImpl {
34    /// Creates a new `MetricsReaderImpl` with the given Prometheus client and selector.
35    pub fn new(prometheus_client: Option<PrometheusClient>, prometheus_selector: String) -> Self {
36        Self {
37            prometheus_client,
38            prometheus_selector,
39        }
40    }
41}
42
43#[async_trait::async_trait]
44impl MetricsReader for MetricsReaderImpl {
45    async fn get_channel_delta_stats(
46        &self,
47        at_time: Option<i64>,
48        time_offset: Option<i64>,
49    ) -> Result<HashMap<ChannelKey, ChannelDeltaStats>> {
50        let time_offset = time_offset.unwrap_or(DEFAULT_TIME_OFFSET_SECONDS);
51
52        // Check if Prometheus client is available
53        let prometheus_client = self
54            .prometheus_client
55            .as_ref()
56            .ok_or_else(|| anyhow!("Prometheus endpoint is not set"))?;
57
58        // Query channel delta stats: throughput and backpressure rate
59        let channel_input_throughput_query = format!(
60            "sum(rate(stream_actor_in_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
61            self.prometheus_selector, time_offset
62        );
63        let channel_output_throughput_query = format!(
64            "sum(rate(stream_actor_out_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
65            self.prometheus_selector, time_offset
66        );
67        let channel_backpressure_query = format!(
68            "sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[{}s])) by (fragment_id, downstream_fragment_id) \
69             / ignoring (downstream_fragment_id) group_left sum(stream_actor_count) by (fragment_id)",
70            self.prometheus_selector, time_offset
71        );
72
73        // Execute all queries concurrently with optional time parameter
74        let (
75            channel_input_throughput_result,
76            channel_output_throughput_result,
77            channel_backpressure_result,
78        ) = {
79            let mut input_query = prometheus_client.query(channel_input_throughput_query);
80            let mut output_query = prometheus_client.query(channel_output_throughput_query);
81            let mut backpressure_query = prometheus_client.query(channel_backpressure_query);
82
83            // Set the evaluation time if provided
84            if let Some(at_time) = at_time {
85                input_query = input_query.at(at_time);
86                output_query = output_query.at(at_time);
87                backpressure_query = backpressure_query.at(at_time);
88            }
89
90            tokio::try_join!(
91                input_query.get(),
92                output_query.get(),
93                backpressure_query.get(),
94            )
95            .map_err(|e| anyhow!(e).context("failed to query Prometheus"))?
96        };
97
98        // Process channel delta stats
99        let mut channel_data: HashMap<ChannelKey, ChannelDeltaStats> = HashMap::new();
100
101        // Collect input throughput
102        if let Some(channel_input_throughput_data) =
103            channel_input_throughput_result.data().as_vector()
104        {
105            for sample in channel_input_throughput_data {
106                if let Some(fragment_id_str) = sample.metric().get("fragment_id")
107                    && let Some(upstream_fragment_id_str) =
108                        sample.metric().get("upstream_fragment_id")
109                    && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
110                        fragment_id_str.parse::<u32>(),
111                        upstream_fragment_id_str.parse::<u32>(),
112                    )
113                {
114                    let key = ChannelKey {
115                        upstream_fragment_id,
116                        downstream_fragment_id: fragment_id,
117                    };
118                    channel_data
119                        .entry(key)
120                        .or_insert_with(|| ChannelDeltaStats {
121                            backpressure_rate: 0.0,
122                            recv_throughput: 0.0,
123                            send_throughput: 0.0,
124                        })
125                        .recv_throughput = sample.sample().value();
126                }
127            }
128        }
129
130        // Collect output throughput
131        if let Some(channel_output_throughput_data) =
132            channel_output_throughput_result.data().as_vector()
133        {
134            for sample in channel_output_throughput_data {
135                if let Some(fragment_id_str) = sample.metric().get("fragment_id")
136                    && let Some(upstream_fragment_id_str) =
137                        sample.metric().get("upstream_fragment_id")
138                    && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
139                        fragment_id_str.parse::<u32>(),
140                        upstream_fragment_id_str.parse::<u32>(),
141                    )
142                {
143                    let key = ChannelKey {
144                        upstream_fragment_id,
145                        downstream_fragment_id: fragment_id,
146                    };
147                    channel_data
148                        .entry(key)
149                        .or_insert_with(|| ChannelDeltaStats {
150                            backpressure_rate: 0.0,
151                            recv_throughput: 0.0,
152                            send_throughput: 0.0,
153                        })
154                        .send_throughput = sample.sample().value();
155                }
156            }
157        }
158
159        // Collect backpressure rate
160        if let Some(channel_backpressure_data) = channel_backpressure_result.data().as_vector() {
161            for sample in channel_backpressure_data {
162                if let Some(fragment_id_str) = sample.metric().get("fragment_id")
163                    && let Some(downstream_fragment_id_str) =
164                        sample.metric().get("downstream_fragment_id")
165                    && let (Ok(fragment_id), Ok(downstream_fragment_id)) = (
166                        fragment_id_str.parse::<u32>(),
167                        downstream_fragment_id_str.parse::<u32>(),
168                    )
169                {
170                    let key = ChannelKey {
171                        upstream_fragment_id: fragment_id,
172                        downstream_fragment_id,
173                    };
174                    channel_data
175                        .entry(key)
176                        .or_insert_with(|| ChannelDeltaStats {
177                            backpressure_rate: 0.0,
178                            recv_throughput: 0.0,
179                            send_throughput: 0.0,
180                        })
181                        .backpressure_rate = sample.sample().value() / NANOSECONDS_TO_SECONDS;
182                }
183            }
184        }
185
186        Ok(channel_data)
187    }
188}