risingwave_frontend/
metrics_reader.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::{Result, anyhow};
18use prometheus_http_query::Client as PrometheusClient;
19use risingwave_common::metrics_reader::{
20    ChannelDeltaStats, ChannelKey, KafkaPartitionMetrics, MetricsReader,
21};
22
23/// Default time offset in seconds for metrics queries
24const DEFAULT_TIME_OFFSET_SECONDS: i64 = 60;
25
26/// Conversion factor from nanoseconds to seconds
27const NANOSECONDS_TO_SECONDS: f64 = 1_000_000_000.0;
28
29/// Implementation of `MetricsReader` that queries Prometheus directly.
30pub struct MetricsReaderImpl {
31    prometheus_client: Option<PrometheusClient>,
32    prometheus_selector: String,
33}
34
35impl MetricsReaderImpl {
36    /// Creates a new `MetricsReaderImpl` with the given Prometheus client and selector.
37    pub fn new(prometheus_client: Option<PrometheusClient>, prometheus_selector: String) -> Self {
38        Self {
39            prometheus_client,
40            prometheus_selector,
41        }
42    }
43}
44
45#[async_trait::async_trait]
46impl MetricsReader for MetricsReaderImpl {
47    async fn get_channel_delta_stats(
48        &self,
49        at_time: Option<i64>,
50        time_offset: Option<i64>,
51    ) -> Result<HashMap<ChannelKey, ChannelDeltaStats>> {
52        let time_offset = time_offset.unwrap_or(DEFAULT_TIME_OFFSET_SECONDS);
53
54        // Return empty result if Prometheus client is not configured
55        let Some(prometheus_client) = self.prometheus_client.as_ref() else {
56            return Ok(HashMap::new());
57        };
58
59        // Query channel delta stats: throughput and backpressure rate
60        let channel_input_throughput_query = format!(
61            "sum(rate(stream_actor_in_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
62            self.prometheus_selector, time_offset
63        );
64        let channel_output_throughput_query = format!(
65            "sum(rate(stream_actor_out_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
66            self.prometheus_selector, time_offset
67        );
68        let channel_backpressure_query = format!(
69            "sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[{}s])) by (fragment_id, downstream_fragment_id) \
70             / ignoring (downstream_fragment_id) group_left sum(stream_actor_count) by (fragment_id)",
71            self.prometheus_selector, time_offset
72        );
73
74        // Execute all queries concurrently with optional time parameter
75        let (
76            channel_input_throughput_result,
77            channel_output_throughput_result,
78            channel_backpressure_result,
79        ) = {
80            let mut input_query = prometheus_client.query(channel_input_throughput_query);
81            let mut output_query = prometheus_client.query(channel_output_throughput_query);
82            let mut backpressure_query = prometheus_client.query(channel_backpressure_query);
83
84            // Set the evaluation time if provided
85            if let Some(at_time) = at_time {
86                input_query = input_query.at(at_time);
87                output_query = output_query.at(at_time);
88                backpressure_query = backpressure_query.at(at_time);
89            }
90
91            tokio::try_join!(
92                input_query.get(),
93                output_query.get(),
94                backpressure_query.get(),
95            )
96            .map_err(|e| anyhow!(e).context("failed to query Prometheus"))?
97        };
98
99        // Process channel delta stats
100        let mut channel_data: HashMap<ChannelKey, ChannelDeltaStats> = HashMap::new();
101
102        // Collect input throughput
103        if let Some(channel_input_throughput_data) =
104            channel_input_throughput_result.data().as_vector()
105        {
106            for sample in channel_input_throughput_data {
107                if let Some(fragment_id_str) = sample.metric().get("fragment_id")
108                    && let Some(upstream_fragment_id_str) =
109                        sample.metric().get("upstream_fragment_id")
110                    && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
111                        fragment_id_str.parse::<u32>(),
112                        upstream_fragment_id_str.parse::<u32>(),
113                    )
114                {
115                    let key = ChannelKey {
116                        upstream_fragment_id,
117                        downstream_fragment_id: fragment_id,
118                    };
119                    channel_data
120                        .entry(key)
121                        .or_insert_with(|| ChannelDeltaStats {
122                            backpressure_rate: 0.0,
123                            recv_throughput: 0.0,
124                            send_throughput: 0.0,
125                        })
126                        .recv_throughput = sample.sample().value();
127                }
128            }
129        }
130
131        // Collect output throughput
132        if let Some(channel_output_throughput_data) =
133            channel_output_throughput_result.data().as_vector()
134        {
135            for sample in channel_output_throughput_data {
136                if let Some(fragment_id_str) = sample.metric().get("fragment_id")
137                    && let Some(upstream_fragment_id_str) =
138                        sample.metric().get("upstream_fragment_id")
139                    && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
140                        fragment_id_str.parse::<u32>(),
141                        upstream_fragment_id_str.parse::<u32>(),
142                    )
143                {
144                    let key = ChannelKey {
145                        upstream_fragment_id,
146                        downstream_fragment_id: fragment_id,
147                    };
148                    channel_data
149                        .entry(key)
150                        .or_insert_with(|| ChannelDeltaStats {
151                            backpressure_rate: 0.0,
152                            recv_throughput: 0.0,
153                            send_throughput: 0.0,
154                        })
155                        .send_throughput = sample.sample().value();
156                }
157            }
158        }
159
160        // Collect backpressure rate
161        if let Some(channel_backpressure_data) = channel_backpressure_result.data().as_vector() {
162            for sample in channel_backpressure_data {
163                if let Some(fragment_id_str) = sample.metric().get("fragment_id")
164                    && let Some(downstream_fragment_id_str) =
165                        sample.metric().get("downstream_fragment_id")
166                    && let (Ok(fragment_id), Ok(downstream_fragment_id)) = (
167                        fragment_id_str.parse::<u32>(),
168                        downstream_fragment_id_str.parse::<u32>(),
169                    )
170                {
171                    let key = ChannelKey {
172                        upstream_fragment_id: fragment_id,
173                        downstream_fragment_id,
174                    };
175                    channel_data
176                        .entry(key)
177                        .or_insert_with(|| ChannelDeltaStats {
178                            backpressure_rate: 0.0,
179                            recv_throughput: 0.0,
180                            send_throughput: 0.0,
181                        })
182                        .backpressure_rate = sample.sample().value() / NANOSECONDS_TO_SECONDS;
183                }
184            }
185        }
186
187        Ok(channel_data)
188    }
189
190    async fn get_kafka_source_metrics(&self) -> Result<Vec<KafkaPartitionMetrics>> {
191        // Return empty result if Prometheus client is not configured
192        let Some(prometheus_client) = self.prometheus_client.as_ref() else {
193            return Ok(vec![]);
194        };
195
196        let high_watermark_query = format!(
197            "max(source_kafka_high_watermark{{{}}}) by (source_id, partition)",
198            self.prometheus_selector
199        );
200        let latest_offset_query = format!(
201            "max(source_latest_message_id{{{}}}) by (source_id, partition)",
202            self.prometheus_selector
203        );
204
205        let (high_watermark_result, latest_offset_result) = tokio::try_join!(
206            prometheus_client.query(high_watermark_query).get(),
207            prometheus_client.query(latest_offset_query).get(),
208        )
209        .map_err(|e| anyhow!(e).context("failed to query Prometheus"))?;
210
211        let mut metrics: HashMap<(u32, String), KafkaPartitionMetrics> = HashMap::new();
212
213        if let Some(samples) = high_watermark_result.data().as_vector() {
214            for sample in samples {
215                if let (Some(source_id_str), Some(partition_id)) = (
216                    sample.metric().get("source_id"),
217                    sample.metric().get("partition"),
218                ) && let Ok(source_id) = source_id_str.parse::<u32>()
219                {
220                    let entry = metrics
221                        .entry((source_id, partition_id.to_owned()))
222                        .or_insert_with(|| KafkaPartitionMetrics {
223                            source_id,
224                            partition_id: partition_id.to_owned(),
225                            high_watermark: None,
226                            latest_offset: None,
227                        });
228                    entry.high_watermark = Some(sample.sample().value().round() as i64);
229                }
230            }
231        }
232
233        if let Some(samples) = latest_offset_result.data().as_vector() {
234            for sample in samples {
235                if let (Some(source_id_str), Some(partition_id)) = (
236                    sample.metric().get("source_id"),
237                    sample.metric().get("partition"),
238                ) && let Ok(source_id) = source_id_str.parse::<u32>()
239                {
240                    let entry = metrics
241                        .entry((source_id, partition_id.to_owned()))
242                        .or_insert_with(|| KafkaPartitionMetrics {
243                            source_id,
244                            partition_id: partition_id.to_owned(),
245                            high_watermark: None,
246                            latest_offset: None,
247                        });
248                    entry.latest_offset = Some(sample.sample().value().round() as i64);
249                }
250            }
251        }
252
253        Ok(metrics.into_values().collect())
254    }
255}