risingwave_frontend/monitor/
stats.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::mem;
16use std::collections::HashMap;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicU64, GenericCounter};
21use prometheus::{
22    Histogram, HistogramVec, IntGauge, Registry, exponential_buckets, histogram_opts,
23    register_histogram_vec_with_registry, register_histogram_with_registry,
24    register_int_counter_with_registry, register_int_gauge_with_registry,
25};
26use risingwave_common::metrics::TrAdderGauge;
27use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
28use tokio::task::JoinHandle;
29
30use crate::session::SessionMapRef;
31
32#[derive(Clone)]
33pub struct FrontendMetrics {
34    pub query_counter_local_execution: GenericCounter<AtomicU64>,
35    pub latency_local_execution: Histogram,
36    pub active_sessions: IntGauge,
37    pub batch_total_mem: TrAdderGauge,
38}
39
40pub static GLOBAL_FRONTEND_METRICS: LazyLock<FrontendMetrics> =
41    LazyLock::new(|| FrontendMetrics::new(&GLOBAL_METRICS_REGISTRY));
42
43impl FrontendMetrics {
44    fn new(registry: &Registry) -> Self {
45        let query_counter_local_execution = register_int_counter_with_registry!(
46            "frontend_query_counter_local_execution",
47            "Total query number of local execution mode",
48            registry
49        )
50        .unwrap();
51
52        let opts = histogram_opts!(
53            "frontend_latency_local_execution",
54            "latency of local execution mode",
55            exponential_buckets(0.01, 2.0, 23).unwrap()
56        );
57        let latency_local_execution = register_histogram_with_registry!(opts, registry).unwrap();
58
59        let active_sessions = register_int_gauge_with_registry!(
60            "frontend_active_sessions",
61            "Total number of active sessions in frontend",
62            registry
63        )
64        .unwrap();
65
66        let batch_total_mem = TrAdderGauge::new(
67            "frontend_batch_total_mem",
68            "All memory usage of batch executors in bytes",
69        )
70        .unwrap();
71
72        registry
73            .register(Box::new(batch_total_mem.clone()))
74            .unwrap();
75
76        Self {
77            query_counter_local_execution,
78            latency_local_execution,
79            active_sessions,
80            batch_total_mem,
81        }
82    }
83
84    /// Create a new `FrontendMetrics` instance used in tests or other places.
85    pub fn for_test() -> Self {
86        GLOBAL_FRONTEND_METRICS.clone()
87    }
88}
89
90pub static GLOBAL_CURSOR_METRICS: LazyLock<CursorMetrics> =
91    LazyLock::new(|| CursorMetrics::new(&GLOBAL_METRICS_REGISTRY));
92
93#[derive(Clone)]
94pub struct CursorMetrics {
95    pub subscription_cursor_error_count: GenericCounter<AtomicU64>,
96    pub subscription_cursor_query_duration: HistogramVec,
97    pub subscription_cursor_declare_duration: HistogramVec,
98    pub subscription_cursor_fetch_duration: HistogramVec,
99    subsription_cursor_nums: IntGauge,
100    invalid_subsription_cursor_nums: IntGauge,
101    subscription_cursor_last_fetch_duration: HistogramVec,
102    _cursor_metrics_collector: Option<Arc<CursorMetricsCollector>>,
103}
104
105impl CursorMetrics {
106    pub fn new(registry: &Registry) -> Self {
107        let subscription_cursor_error_count = register_int_counter_with_registry!(
108            "subscription_cursor_error_count",
109            "The subscription error num of cursor",
110            registry
111        )
112        .unwrap();
113        let opts = histogram_opts!(
114            "subscription_cursor_query_duration",
115            "The amount of time a query exists inside the cursor",
116            exponential_buckets(1.0, 5.0, 11).unwrap(),
117        );
118        let subscription_cursor_query_duration =
119            register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
120
121        let opts = histogram_opts!(
122            "subscription_cursor_declare_duration",
123            "Subscription cursor duration of declare",
124            exponential_buckets(1.0, 5.0, 11).unwrap(),
125        );
126        let subscription_cursor_declare_duration =
127            register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
128
129        let opts = histogram_opts!(
130            "subscription_cursor_fetch_duration",
131            "Subscription cursor duration of fetch",
132            exponential_buckets(1.0, 5.0, 11).unwrap(),
133        );
134        let subscription_cursor_fetch_duration =
135            register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
136
137        let subsription_cursor_nums = register_int_gauge_with_registry!(
138            "subsription_cursor_nums",
139            "The number of subscription cursor",
140            registry
141        )
142        .unwrap();
143        let invalid_subsription_cursor_nums = register_int_gauge_with_registry!(
144            "invalid_subsription_cursor_nums",
145            "The number of invalid subscription cursor",
146            registry
147        )
148        .unwrap();
149
150        let opts = histogram_opts!(
151            "subscription_cursor_last_fetch_duration",
152            "Since the last fetch, the time up to now",
153            exponential_buckets(1.0, 5.0, 11).unwrap(),
154        );
155        let subscription_cursor_last_fetch_duration =
156            register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
157        Self {
158            _cursor_metrics_collector: None,
159            subscription_cursor_error_count,
160            subscription_cursor_query_duration,
161            subscription_cursor_declare_duration,
162            subscription_cursor_fetch_duration,
163            subsription_cursor_nums,
164            invalid_subsription_cursor_nums,
165            subscription_cursor_last_fetch_duration,
166        }
167    }
168
169    pub fn for_test() -> Self {
170        GLOBAL_CURSOR_METRICS.clone()
171    }
172
173    pub fn start_with_session_map(&mut self, session_map: SessionMapRef) {
174        self._cursor_metrics_collector = Some(Arc::new(CursorMetricsCollector::new(
175            session_map,
176            self.subsription_cursor_nums.clone(),
177            self.invalid_subsription_cursor_nums.clone(),
178            self.subscription_cursor_last_fetch_duration.clone(),
179        )));
180    }
181
182    pub fn init(session_map: SessionMapRef) -> Self {
183        let mut cursor_metrics = GLOBAL_CURSOR_METRICS.clone();
184        cursor_metrics.start_with_session_map(session_map);
185        cursor_metrics
186    }
187}
188
189pub struct PeriodicCursorMetrics {
190    pub subsription_cursor_nums: i64,
191    pub invalid_subsription_cursor_nums: i64,
192    pub subscription_cursor_last_fetch_duration: HashMap<String, f64>,
193}
194
195struct CursorMetricsCollector {
196    _join_handle: JoinHandle<()>,
197    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
198}
199impl CursorMetricsCollector {
200    fn new(
201        session_map: SessionMapRef,
202        subsription_cursor_nums: IntGauge,
203        invalid_subsription_cursor_nums: IntGauge,
204        subscription_cursor_last_fetch_duration: HistogramVec,
205    ) -> Self {
206        const COLLECT_INTERVAL_SECONDS: u64 = 60;
207
208        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
209        let join_handle = tokio::spawn(async move {
210            let mut monitor_interval =
211                tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
212            monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
213            loop {
214                tokio::select! {
215                    // Wait for interval
216                    _ = monitor_interval.tick() => {},
217                    // Shutdown monitor
218                    _ = &mut shutdown_rx => {
219                        tracing::info!("Fragment info monitor is stopped");
220                        return;
221                    }
222                }
223
224                let session_vec = { session_map.read().values().cloned().collect::<Vec<_>>() };
225                let mut subsription_cursor_nums_value = 0;
226                let mut invalid_subsription_cursor_nums_value = 0;
227                for session in &session_vec {
228                    let periodic_cursor_metrics = session
229                        .get_cursor_manager()
230                        .get_periodic_cursor_metrics()
231                        .await;
232                    subsription_cursor_nums_value +=
233                        periodic_cursor_metrics.subsription_cursor_nums;
234                    invalid_subsription_cursor_nums_value +=
235                        periodic_cursor_metrics.invalid_subsription_cursor_nums;
236                    for (subscription_name, duration) in
237                        &periodic_cursor_metrics.subscription_cursor_last_fetch_duration
238                    {
239                        subscription_cursor_last_fetch_duration
240                            .with_label_values(&[subscription_name])
241                            .observe(*duration);
242                    }
243                }
244                subsription_cursor_nums.set(subsription_cursor_nums_value);
245                invalid_subsription_cursor_nums.set(invalid_subsription_cursor_nums_value);
246            }
247        });
248        Self {
249            _join_handle: join_handle,
250            shutdown_tx: Some(shutdown_tx),
251        }
252    }
253}
254impl Drop for CursorMetricsCollector {
255    fn drop(&mut self) {
256        if let Some(shutdown_tx) = mem::take(&mut self.shutdown_tx) {
257            shutdown_tx.send(()).ok();
258        }
259    }
260}