risingwave_frontend/
metrics_reader.rs1use std::collections::HashMap;
16
17use anyhow::{Result, anyhow};
18use prometheus_http_query::Client as PrometheusClient;
19use risingwave_common::metrics_reader::{
20 ChannelDeltaStats, ChannelKey, KafkaPartitionMetrics, MetricsReader,
21};
22
23const DEFAULT_TIME_OFFSET_SECONDS: i64 = 60;
25
26const NANOSECONDS_TO_SECONDS: f64 = 1_000_000_000.0;
28
29pub struct MetricsReaderImpl {
31 prometheus_client: Option<PrometheusClient>,
32 prometheus_selector: String,
33}
34
35impl MetricsReaderImpl {
36 pub fn new(prometheus_client: Option<PrometheusClient>, prometheus_selector: String) -> Self {
38 Self {
39 prometheus_client,
40 prometheus_selector,
41 }
42 }
43}
44
45#[async_trait::async_trait]
46impl MetricsReader for MetricsReaderImpl {
47 async fn get_channel_delta_stats(
48 &self,
49 at_time: Option<i64>,
50 time_offset: Option<i64>,
51 ) -> Result<HashMap<ChannelKey, ChannelDeltaStats>> {
52 let time_offset = time_offset.unwrap_or(DEFAULT_TIME_OFFSET_SECONDS);
53
54 let Some(prometheus_client) = self.prometheus_client.as_ref() else {
56 return Ok(HashMap::new());
57 };
58
59 let channel_input_throughput_query = format!(
61 "sum(rate(stream_actor_in_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
62 self.prometheus_selector, time_offset
63 );
64 let channel_output_throughput_query = format!(
65 "sum(rate(stream_actor_out_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
66 self.prometheus_selector, time_offset
67 );
68 let channel_backpressure_query = format!(
69 "sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[{}s])) by (fragment_id, downstream_fragment_id) \
70 / ignoring (downstream_fragment_id) group_left sum(stream_actor_count) by (fragment_id)",
71 self.prometheus_selector, time_offset
72 );
73
74 let (
76 channel_input_throughput_result,
77 channel_output_throughput_result,
78 channel_backpressure_result,
79 ) = {
80 let mut input_query = prometheus_client.query(channel_input_throughput_query);
81 let mut output_query = prometheus_client.query(channel_output_throughput_query);
82 let mut backpressure_query = prometheus_client.query(channel_backpressure_query);
83
84 if let Some(at_time) = at_time {
86 input_query = input_query.at(at_time);
87 output_query = output_query.at(at_time);
88 backpressure_query = backpressure_query.at(at_time);
89 }
90
91 tokio::try_join!(
92 input_query.get(),
93 output_query.get(),
94 backpressure_query.get(),
95 )
96 .map_err(|e| anyhow!(e).context("failed to query Prometheus"))?
97 };
98
99 let mut channel_data: HashMap<ChannelKey, ChannelDeltaStats> = HashMap::new();
101
102 if let Some(channel_input_throughput_data) =
104 channel_input_throughput_result.data().as_vector()
105 {
106 for sample in channel_input_throughput_data {
107 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
108 && let Some(upstream_fragment_id_str) =
109 sample.metric().get("upstream_fragment_id")
110 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
111 fragment_id_str.parse::<u32>(),
112 upstream_fragment_id_str.parse::<u32>(),
113 )
114 {
115 let key = ChannelKey {
116 upstream_fragment_id,
117 downstream_fragment_id: fragment_id,
118 };
119 channel_data
120 .entry(key)
121 .or_insert_with(|| ChannelDeltaStats {
122 backpressure_rate: 0.0,
123 recv_throughput: 0.0,
124 send_throughput: 0.0,
125 })
126 .recv_throughput = sample.sample().value();
127 }
128 }
129 }
130
131 if let Some(channel_output_throughput_data) =
133 channel_output_throughput_result.data().as_vector()
134 {
135 for sample in channel_output_throughput_data {
136 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
137 && let Some(upstream_fragment_id_str) =
138 sample.metric().get("upstream_fragment_id")
139 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
140 fragment_id_str.parse::<u32>(),
141 upstream_fragment_id_str.parse::<u32>(),
142 )
143 {
144 let key = ChannelKey {
145 upstream_fragment_id,
146 downstream_fragment_id: fragment_id,
147 };
148 channel_data
149 .entry(key)
150 .or_insert_with(|| ChannelDeltaStats {
151 backpressure_rate: 0.0,
152 recv_throughput: 0.0,
153 send_throughput: 0.0,
154 })
155 .send_throughput = sample.sample().value();
156 }
157 }
158 }
159
160 if let Some(channel_backpressure_data) = channel_backpressure_result.data().as_vector() {
162 for sample in channel_backpressure_data {
163 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
164 && let Some(downstream_fragment_id_str) =
165 sample.metric().get("downstream_fragment_id")
166 && let (Ok(fragment_id), Ok(downstream_fragment_id)) = (
167 fragment_id_str.parse::<u32>(),
168 downstream_fragment_id_str.parse::<u32>(),
169 )
170 {
171 let key = ChannelKey {
172 upstream_fragment_id: fragment_id,
173 downstream_fragment_id,
174 };
175 channel_data
176 .entry(key)
177 .or_insert_with(|| ChannelDeltaStats {
178 backpressure_rate: 0.0,
179 recv_throughput: 0.0,
180 send_throughput: 0.0,
181 })
182 .backpressure_rate = sample.sample().value() / NANOSECONDS_TO_SECONDS;
183 }
184 }
185 }
186
187 Ok(channel_data)
188 }
189
190 async fn get_kafka_source_metrics(&self) -> Result<Vec<KafkaPartitionMetrics>> {
191 let Some(prometheus_client) = self.prometheus_client.as_ref() else {
193 return Ok(vec![]);
194 };
195
196 let high_watermark_query = format!(
197 "max(source_kafka_high_watermark{{{}}}) by (source_id, partition)",
198 self.prometheus_selector
199 );
200 let latest_offset_query = format!(
201 "max(source_latest_message_id{{{}}}) by (source_id, partition)",
202 self.prometheus_selector
203 );
204
205 let (high_watermark_result, latest_offset_result) = tokio::try_join!(
206 prometheus_client.query(high_watermark_query).get(),
207 prometheus_client.query(latest_offset_query).get(),
208 )
209 .map_err(|e| anyhow!(e).context("failed to query Prometheus"))?;
210
211 let mut metrics: HashMap<(u32, String), KafkaPartitionMetrics> = HashMap::new();
212
213 if let Some(samples) = high_watermark_result.data().as_vector() {
214 for sample in samples {
215 if let (Some(source_id_str), Some(partition_id)) = (
216 sample.metric().get("source_id"),
217 sample.metric().get("partition"),
218 ) && let Ok(source_id) = source_id_str.parse::<u32>()
219 {
220 let entry = metrics
221 .entry((source_id, partition_id.to_owned()))
222 .or_insert_with(|| KafkaPartitionMetrics {
223 source_id,
224 partition_id: partition_id.to_owned(),
225 high_watermark: None,
226 latest_offset: None,
227 });
228 entry.high_watermark = Some(sample.sample().value().round() as i64);
229 }
230 }
231 }
232
233 if let Some(samples) = latest_offset_result.data().as_vector() {
234 for sample in samples {
235 if let (Some(source_id_str), Some(partition_id)) = (
236 sample.metric().get("source_id"),
237 sample.metric().get("partition"),
238 ) && let Ok(source_id) = source_id_str.parse::<u32>()
239 {
240 let entry = metrics
241 .entry((source_id, partition_id.to_owned()))
242 .or_insert_with(|| KafkaPartitionMetrics {
243 source_id,
244 partition_id: partition_id.to_owned(),
245 high_watermark: None,
246 latest_offset: None,
247 });
248 entry.latest_offset = Some(sample.sample().value().round() as i64);
249 }
250 }
251 }
252
253 Ok(metrics.into_values().collect())
254 }
255}