risingwave_frontend/
metrics_reader.rs1use std::collections::HashMap;
16
17use anyhow::{Result, anyhow};
18use prometheus_http_query::Client as PrometheusClient;
19use risingwave_common::metrics_reader::{ChannelDeltaStats, ChannelKey, MetricsReader};
20
21const DEFAULT_TIME_OFFSET_SECONDS: i64 = 60;
23
24const NANOSECONDS_TO_SECONDS: f64 = 1_000_000_000.0;
26
27pub struct MetricsReaderImpl {
29 prometheus_client: Option<PrometheusClient>,
30 prometheus_selector: String,
31}
32
33impl MetricsReaderImpl {
34 pub fn new(prometheus_client: Option<PrometheusClient>, prometheus_selector: String) -> Self {
36 Self {
37 prometheus_client,
38 prometheus_selector,
39 }
40 }
41}
42
43#[async_trait::async_trait]
44impl MetricsReader for MetricsReaderImpl {
45 async fn get_channel_delta_stats(
46 &self,
47 at_time: Option<i64>,
48 time_offset: Option<i64>,
49 ) -> Result<HashMap<ChannelKey, ChannelDeltaStats>> {
50 let time_offset = time_offset.unwrap_or(DEFAULT_TIME_OFFSET_SECONDS);
51
52 let prometheus_client = self
54 .prometheus_client
55 .as_ref()
56 .ok_or_else(|| anyhow!("Prometheus endpoint is not set"))?;
57
58 let channel_input_throughput_query = format!(
60 "sum(rate(stream_actor_in_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
61 self.prometheus_selector, time_offset
62 );
63 let channel_output_throughput_query = format!(
64 "sum(rate(stream_actor_out_record_cnt{{{}}}[{}s])) by (fragment_id, upstream_fragment_id)",
65 self.prometheus_selector, time_offset
66 );
67 let channel_backpressure_query = format!(
68 "sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[{}s])) by (fragment_id, downstream_fragment_id) \
69 / ignoring (downstream_fragment_id) group_left sum(stream_actor_count) by (fragment_id)",
70 self.prometheus_selector, time_offset
71 );
72
73 let (
75 channel_input_throughput_result,
76 channel_output_throughput_result,
77 channel_backpressure_result,
78 ) = {
79 let mut input_query = prometheus_client.query(channel_input_throughput_query);
80 let mut output_query = prometheus_client.query(channel_output_throughput_query);
81 let mut backpressure_query = prometheus_client.query(channel_backpressure_query);
82
83 if let Some(at_time) = at_time {
85 input_query = input_query.at(at_time);
86 output_query = output_query.at(at_time);
87 backpressure_query = backpressure_query.at(at_time);
88 }
89
90 tokio::try_join!(
91 input_query.get(),
92 output_query.get(),
93 backpressure_query.get(),
94 )
95 .map_err(|e| anyhow!(e).context("failed to query Prometheus"))?
96 };
97
98 let mut channel_data: HashMap<ChannelKey, ChannelDeltaStats> = HashMap::new();
100
101 if let Some(channel_input_throughput_data) =
103 channel_input_throughput_result.data().as_vector()
104 {
105 for sample in channel_input_throughput_data {
106 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
107 && let Some(upstream_fragment_id_str) =
108 sample.metric().get("upstream_fragment_id")
109 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
110 fragment_id_str.parse::<u32>(),
111 upstream_fragment_id_str.parse::<u32>(),
112 )
113 {
114 let key = ChannelKey {
115 upstream_fragment_id,
116 downstream_fragment_id: fragment_id,
117 };
118 channel_data
119 .entry(key)
120 .or_insert_with(|| ChannelDeltaStats {
121 backpressure_rate: 0.0,
122 recv_throughput: 0.0,
123 send_throughput: 0.0,
124 })
125 .recv_throughput = sample.sample().value();
126 }
127 }
128 }
129
130 if let Some(channel_output_throughput_data) =
132 channel_output_throughput_result.data().as_vector()
133 {
134 for sample in channel_output_throughput_data {
135 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
136 && let Some(upstream_fragment_id_str) =
137 sample.metric().get("upstream_fragment_id")
138 && let (Ok(fragment_id), Ok(upstream_fragment_id)) = (
139 fragment_id_str.parse::<u32>(),
140 upstream_fragment_id_str.parse::<u32>(),
141 )
142 {
143 let key = ChannelKey {
144 upstream_fragment_id,
145 downstream_fragment_id: fragment_id,
146 };
147 channel_data
148 .entry(key)
149 .or_insert_with(|| ChannelDeltaStats {
150 backpressure_rate: 0.0,
151 recv_throughput: 0.0,
152 send_throughput: 0.0,
153 })
154 .send_throughput = sample.sample().value();
155 }
156 }
157 }
158
159 if let Some(channel_backpressure_data) = channel_backpressure_result.data().as_vector() {
161 for sample in channel_backpressure_data {
162 if let Some(fragment_id_str) = sample.metric().get("fragment_id")
163 && let Some(downstream_fragment_id_str) =
164 sample.metric().get("downstream_fragment_id")
165 && let (Ok(fragment_id), Ok(downstream_fragment_id)) = (
166 fragment_id_str.parse::<u32>(),
167 downstream_fragment_id_str.parse::<u32>(),
168 )
169 {
170 let key = ChannelKey {
171 upstream_fragment_id: fragment_id,
172 downstream_fragment_id,
173 };
174 channel_data
175 .entry(key)
176 .or_insert_with(|| ChannelDeltaStats {
177 backpressure_rate: 0.0,
178 recv_throughput: 0.0,
179 send_throughput: 0.0,
180 })
181 .backpressure_rate = sample.sample().value() / NANOSECONDS_TO_SECONDS;
182 }
183 }
184 }
185
186 Ok(channel_data)
187 }
188}