risingwave_frontend/monitor/
stats.rs1use core::mem;
16use std::collections::HashMap;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicU64, GenericCounter};
21use prometheus::{
22 Histogram, HistogramVec, IntGauge, Registry, exponential_buckets, histogram_opts,
23 register_histogram_vec_with_registry, register_histogram_with_registry,
24 register_int_counter_with_registry, register_int_gauge_with_registry,
25};
26use risingwave_common::metrics::TrAdderGauge;
27use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
28use tokio::task::JoinHandle;
29
30#[cfg(feature = "datafusion")]
31use crate::datafusion::DataFusionMetrics;
32use crate::session::SessionMapRef;
33
34#[derive(Clone)]
35pub struct FrontendMetrics {
36 pub query_counter_local_execution: GenericCounter<AtomicU64>,
37 pub latency_local_execution: Histogram,
38 #[cfg(feature = "datafusion")]
39 pub datafusion: DataFusionMetrics,
40 pub active_sessions: IntGauge,
41 pub batch_total_mem: TrAdderGauge,
42}
43
44pub static GLOBAL_FRONTEND_METRICS: LazyLock<FrontendMetrics> =
45 LazyLock::new(|| FrontendMetrics::new(&GLOBAL_METRICS_REGISTRY));
46
47impl FrontendMetrics {
48 fn new(registry: &Registry) -> Self {
49 let query_counter_local_execution = register_int_counter_with_registry!(
50 "frontend_query_counter_local_execution",
51 "Total query number of local execution mode for RisingWave batch engine",
52 registry
53 )
54 .unwrap();
55
56 let opts = histogram_opts!(
57 "frontend_latency_local_execution",
58 "latency of local execution mode for RisingWave batch engine",
59 exponential_buckets(0.01, 2.0, 23).unwrap()
60 );
61 let latency_local_execution = register_histogram_with_registry!(opts, registry).unwrap();
62
63 #[cfg(feature = "datafusion")]
64 let datafusion = DataFusionMetrics::new(registry);
65
66 let active_sessions = register_int_gauge_with_registry!(
67 "frontend_active_sessions",
68 "Total number of active sessions in frontend",
69 registry
70 )
71 .unwrap();
72
73 let batch_total_mem = TrAdderGauge::new(
74 "frontend_batch_total_mem",
75 "All memory usage of batch executors in bytes",
76 )
77 .unwrap();
78
79 registry
80 .register(Box::new(batch_total_mem.clone()))
81 .unwrap();
82
83 Self {
84 query_counter_local_execution,
85 latency_local_execution,
86 #[cfg(feature = "datafusion")]
87 datafusion,
88 active_sessions,
89 batch_total_mem,
90 }
91 }
92
93 pub fn for_test() -> Self {
95 GLOBAL_FRONTEND_METRICS.clone()
96 }
97}
98
99pub static GLOBAL_CURSOR_METRICS: LazyLock<CursorMetrics> =
100 LazyLock::new(|| CursorMetrics::new(&GLOBAL_METRICS_REGISTRY));
101
102#[derive(Clone)]
103pub struct CursorMetrics {
104 pub subscription_cursor_error_count: GenericCounter<AtomicU64>,
105 pub subscription_cursor_query_duration: HistogramVec,
106 pub subscription_cursor_declare_duration: HistogramVec,
107 pub subscription_cursor_fetch_duration: HistogramVec,
108 subscription_cursor_nums: IntGauge,
109 invalid_subscription_cursor_nums: IntGauge,
110 subscription_cursor_last_fetch_duration: HistogramVec,
111 _cursor_metrics_collector: Option<Arc<CursorMetricsCollector>>,
112}
113
114impl CursorMetrics {
115 pub fn new(registry: &Registry) -> Self {
116 let subscription_cursor_error_count = register_int_counter_with_registry!(
117 "subscription_cursor_error_count",
118 "The subscription error num of cursor",
119 registry
120 )
121 .unwrap();
122 let opts = histogram_opts!(
123 "subscription_cursor_query_duration",
124 "The amount of time a query exists inside the cursor",
125 exponential_buckets(1.0, 5.0, 11).unwrap(),
126 );
127 let subscription_cursor_query_duration =
128 register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
129
130 let opts = histogram_opts!(
131 "subscription_cursor_declare_duration",
132 "Subscription cursor duration of declare",
133 exponential_buckets(1.0, 5.0, 11).unwrap(),
134 );
135 let subscription_cursor_declare_duration =
136 register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
137
138 let opts = histogram_opts!(
139 "subscription_cursor_fetch_duration",
140 "Subscription cursor duration of fetch",
141 exponential_buckets(1.0, 5.0, 11).unwrap(),
142 );
143 let subscription_cursor_fetch_duration =
144 register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
145
146 let subscription_cursor_nums = register_int_gauge_with_registry!(
147 "subscription_cursor_nums",
148 "The number of subscription cursor",
149 registry
150 )
151 .unwrap();
152 let invalid_subscription_cursor_nums = register_int_gauge_with_registry!(
153 "invalid_subscription_cursor_nums",
154 "The number of invalid subscription cursor",
155 registry
156 )
157 .unwrap();
158
159 let opts = histogram_opts!(
160 "subscription_cursor_last_fetch_duration",
161 "Since the last fetch, the time up to now",
162 exponential_buckets(1.0, 5.0, 11).unwrap(),
163 );
164 let subscription_cursor_last_fetch_duration =
165 register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
166 Self {
167 _cursor_metrics_collector: None,
168 subscription_cursor_error_count,
169 subscription_cursor_query_duration,
170 subscription_cursor_declare_duration,
171 subscription_cursor_fetch_duration,
172 subscription_cursor_nums,
173 invalid_subscription_cursor_nums,
174 subscription_cursor_last_fetch_duration,
175 }
176 }
177
178 pub fn for_test() -> Self {
179 GLOBAL_CURSOR_METRICS.clone()
180 }
181
182 pub fn start_with_session_map(&mut self, session_map: SessionMapRef) {
183 self._cursor_metrics_collector = Some(Arc::new(CursorMetricsCollector::new(
184 session_map,
185 self.subscription_cursor_nums.clone(),
186 self.invalid_subscription_cursor_nums.clone(),
187 self.subscription_cursor_last_fetch_duration.clone(),
188 )));
189 }
190
191 pub fn init(session_map: SessionMapRef) -> Self {
192 let mut cursor_metrics = GLOBAL_CURSOR_METRICS.clone();
193 cursor_metrics.start_with_session_map(session_map);
194 cursor_metrics
195 }
196}
197
198pub struct PeriodicCursorMetrics {
199 pub subscription_cursor_nums: i64,
200 pub invalid_subscription_cursor_nums: i64,
201 pub subscription_cursor_last_fetch_duration: HashMap<String, f64>,
202}
203
204struct CursorMetricsCollector {
205 _join_handle: JoinHandle<()>,
206 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
207}
208impl CursorMetricsCollector {
209 fn new(
210 session_map: SessionMapRef,
211 subscription_cursor_nums: IntGauge,
212 invalid_subscription_cursor_nums: IntGauge,
213 subscription_cursor_last_fetch_duration: HistogramVec,
214 ) -> Self {
215 const COLLECT_INTERVAL_SECONDS: u64 = 60;
216
217 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
218 let join_handle = tokio::spawn(async move {
219 let mut monitor_interval =
220 tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
221 monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
222 loop {
223 tokio::select! {
224 _ = monitor_interval.tick() => {},
226 _ = &mut shutdown_rx => {
228 tracing::info!("Fragment info monitor is stopped");
229 return;
230 }
231 }
232
233 let session_vec = { session_map.read().values().cloned().collect::<Vec<_>>() };
234 let mut subscription_cursor_nums_value = 0;
235 let mut invalid_subscription_cursor_nums_value = 0;
236 for session in &session_vec {
237 let periodic_cursor_metrics = session
238 .get_cursor_manager()
239 .get_periodic_cursor_metrics()
240 .await;
241 subscription_cursor_nums_value +=
242 periodic_cursor_metrics.subscription_cursor_nums;
243 invalid_subscription_cursor_nums_value +=
244 periodic_cursor_metrics.invalid_subscription_cursor_nums;
245 for (subscription_name, duration) in
246 &periodic_cursor_metrics.subscription_cursor_last_fetch_duration
247 {
248 subscription_cursor_last_fetch_duration
249 .with_label_values(&[subscription_name])
250 .observe(*duration);
251 }
252 }
253 subscription_cursor_nums.set(subscription_cursor_nums_value);
254 invalid_subscription_cursor_nums.set(invalid_subscription_cursor_nums_value);
255 }
256 });
257 Self {
258 _join_handle: join_handle,
259 shutdown_tx: Some(shutdown_tx),
260 }
261 }
262}
263impl Drop for CursorMetricsCollector {
264 fn drop(&mut self) {
265 if let Some(shutdown_tx) = mem::take(&mut self.shutdown_tx) {
266 shutdown_tx.send(()).ok();
267 }
268 }
269}