risingwave_frontend/monitor/
stats.rs1use core::mem;
16use std::collections::HashMap;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicU64, GenericCounter};
21use prometheus::{
22 Histogram, HistogramVec, IntGauge, Registry, exponential_buckets, histogram_opts,
23 register_histogram_vec_with_registry, register_histogram_with_registry,
24 register_int_counter_with_registry, register_int_gauge_with_registry,
25};
26use risingwave_common::metrics::TrAdderGauge;
27use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
28use tokio::task::JoinHandle;
29
30use crate::session::SessionMapRef;
31
32#[derive(Clone)]
33pub struct FrontendMetrics {
34 pub query_counter_local_execution: GenericCounter<AtomicU64>,
35 pub latency_local_execution: Histogram,
36 pub active_sessions: IntGauge,
37 pub batch_total_mem: TrAdderGauge,
38}
39
40pub static GLOBAL_FRONTEND_METRICS: LazyLock<FrontendMetrics> =
41 LazyLock::new(|| FrontendMetrics::new(&GLOBAL_METRICS_REGISTRY));
42
43impl FrontendMetrics {
44 fn new(registry: &Registry) -> Self {
45 let query_counter_local_execution = register_int_counter_with_registry!(
46 "frontend_query_counter_local_execution",
47 "Total query number of local execution mode",
48 registry
49 )
50 .unwrap();
51
52 let opts = histogram_opts!(
53 "frontend_latency_local_execution",
54 "latency of local execution mode",
55 exponential_buckets(0.01, 2.0, 23).unwrap()
56 );
57 let latency_local_execution = register_histogram_with_registry!(opts, registry).unwrap();
58
59 let active_sessions = register_int_gauge_with_registry!(
60 "frontend_active_sessions",
61 "Total number of active sessions in frontend",
62 registry
63 )
64 .unwrap();
65
66 let batch_total_mem = TrAdderGauge::new(
67 "frontend_batch_total_mem",
68 "All memory usage of batch executors in bytes",
69 )
70 .unwrap();
71
72 registry
73 .register(Box::new(batch_total_mem.clone()))
74 .unwrap();
75
76 Self {
77 query_counter_local_execution,
78 latency_local_execution,
79 active_sessions,
80 batch_total_mem,
81 }
82 }
83
84 pub fn for_test() -> Self {
86 GLOBAL_FRONTEND_METRICS.clone()
87 }
88}
89
90pub static GLOBAL_CURSOR_METRICS: LazyLock<CursorMetrics> =
91 LazyLock::new(|| CursorMetrics::new(&GLOBAL_METRICS_REGISTRY));
92
93#[derive(Clone)]
94pub struct CursorMetrics {
95 pub subscription_cursor_error_count: GenericCounter<AtomicU64>,
96 pub subscription_cursor_query_duration: HistogramVec,
97 pub subscription_cursor_declare_duration: HistogramVec,
98 pub subscription_cursor_fetch_duration: HistogramVec,
99 subsription_cursor_nums: IntGauge,
100 invalid_subsription_cursor_nums: IntGauge,
101 subscription_cursor_last_fetch_duration: HistogramVec,
102 _cursor_metrics_collector: Option<Arc<CursorMetricsCollector>>,
103}
104
105impl CursorMetrics {
106 pub fn new(registry: &Registry) -> Self {
107 let subscription_cursor_error_count = register_int_counter_with_registry!(
108 "subscription_cursor_error_count",
109 "The subscription error num of cursor",
110 registry
111 )
112 .unwrap();
113 let opts = histogram_opts!(
114 "subscription_cursor_query_duration",
115 "The amount of time a query exists inside the cursor",
116 exponential_buckets(1.0, 5.0, 11).unwrap(),
117 );
118 let subscription_cursor_query_duration =
119 register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
120
121 let opts = histogram_opts!(
122 "subscription_cursor_declare_duration",
123 "Subscription cursor duration of declare",
124 exponential_buckets(1.0, 5.0, 11).unwrap(),
125 );
126 let subscription_cursor_declare_duration =
127 register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
128
129 let opts = histogram_opts!(
130 "subscription_cursor_fetch_duration",
131 "Subscription cursor duration of fetch",
132 exponential_buckets(1.0, 5.0, 11).unwrap(),
133 );
134 let subscription_cursor_fetch_duration =
135 register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
136
137 let subsription_cursor_nums = register_int_gauge_with_registry!(
138 "subsription_cursor_nums",
139 "The number of subscription cursor",
140 registry
141 )
142 .unwrap();
143 let invalid_subsription_cursor_nums = register_int_gauge_with_registry!(
144 "invalid_subsription_cursor_nums",
145 "The number of invalid subscription cursor",
146 registry
147 )
148 .unwrap();
149
150 let opts = histogram_opts!(
151 "subscription_cursor_last_fetch_duration",
152 "Since the last fetch, the time up to now",
153 exponential_buckets(1.0, 5.0, 11).unwrap(),
154 );
155 let subscription_cursor_last_fetch_duration =
156 register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
157 Self {
158 _cursor_metrics_collector: None,
159 subscription_cursor_error_count,
160 subscription_cursor_query_duration,
161 subscription_cursor_declare_duration,
162 subscription_cursor_fetch_duration,
163 subsription_cursor_nums,
164 invalid_subsription_cursor_nums,
165 subscription_cursor_last_fetch_duration,
166 }
167 }
168
169 pub fn for_test() -> Self {
170 GLOBAL_CURSOR_METRICS.clone()
171 }
172
173 pub fn start_with_session_map(&mut self, session_map: SessionMapRef) {
174 self._cursor_metrics_collector = Some(Arc::new(CursorMetricsCollector::new(
175 session_map,
176 self.subsription_cursor_nums.clone(),
177 self.invalid_subsription_cursor_nums.clone(),
178 self.subscription_cursor_last_fetch_duration.clone(),
179 )));
180 }
181
182 pub fn init(session_map: SessionMapRef) -> Self {
183 let mut cursor_metrics = GLOBAL_CURSOR_METRICS.clone();
184 cursor_metrics.start_with_session_map(session_map);
185 cursor_metrics
186 }
187}
188
189pub struct PeriodicCursorMetrics {
190 pub subsription_cursor_nums: i64,
191 pub invalid_subsription_cursor_nums: i64,
192 pub subscription_cursor_last_fetch_duration: HashMap<String, f64>,
193}
194
195struct CursorMetricsCollector {
196 _join_handle: JoinHandle<()>,
197 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
198}
199impl CursorMetricsCollector {
200 fn new(
201 session_map: SessionMapRef,
202 subsription_cursor_nums: IntGauge,
203 invalid_subsription_cursor_nums: IntGauge,
204 subscription_cursor_last_fetch_duration: HistogramVec,
205 ) -> Self {
206 const COLLECT_INTERVAL_SECONDS: u64 = 60;
207
208 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
209 let join_handle = tokio::spawn(async move {
210 let mut monitor_interval =
211 tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
212 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
213 loop {
214 tokio::select! {
215 _ = monitor_interval.tick() => {},
217 _ = &mut shutdown_rx => {
219 tracing::info!("Fragment info monitor is stopped");
220 return;
221 }
222 }
223
224 let session_vec = { session_map.read().values().cloned().collect::<Vec<_>>() };
225 let mut subsription_cursor_nums_value = 0;
226 let mut invalid_subsription_cursor_nums_value = 0;
227 for session in &session_vec {
228 let periodic_cursor_metrics = session
229 .get_cursor_manager()
230 .get_periodic_cursor_metrics()
231 .await;
232 subsription_cursor_nums_value +=
233 periodic_cursor_metrics.subsription_cursor_nums;
234 invalid_subsription_cursor_nums_value +=
235 periodic_cursor_metrics.invalid_subsription_cursor_nums;
236 for (subscription_name, duration) in
237 &periodic_cursor_metrics.subscription_cursor_last_fetch_duration
238 {
239 subscription_cursor_last_fetch_duration
240 .with_label_values(&[subscription_name])
241 .observe(*duration);
242 }
243 }
244 subsription_cursor_nums.set(subsription_cursor_nums_value);
245 invalid_subsription_cursor_nums.set(invalid_subsription_cursor_nums_value);
246 }
247 });
248 Self {
249 _join_handle: join_handle,
250 shutdown_tx: Some(shutdown_tx),
251 }
252 }
253}
254impl Drop for CursorMetricsCollector {
255 fn drop(&mut self) {
256 if let Some(shutdown_tx) = mem::take(&mut self.shutdown_tx) {
257 shutdown_tx.send(()).ok();
258 }
259 }
260}