risingwave_frontend/monitor/
stats.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::mem;
16use std::collections::HashMap;
17use std::sync::{Arc, LazyLock};
18use std::time::Duration;
19
20use prometheus::core::{AtomicU64, GenericCounter};
21use prometheus::{
22    Histogram, HistogramVec, IntGauge, Registry, exponential_buckets, histogram_opts,
23    register_histogram_vec_with_registry, register_histogram_with_registry,
24    register_int_counter_with_registry, register_int_gauge_with_registry,
25};
26use risingwave_common::metrics::TrAdderGauge;
27use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
28use tokio::task::JoinHandle;
29
30#[cfg(feature = "datafusion")]
31use crate::datafusion::DataFusionMetrics;
32use crate::session::SessionMapRef;
33
34#[derive(Clone)]
35pub struct FrontendMetrics {
36    pub query_counter_local_execution: GenericCounter<AtomicU64>,
37    pub latency_local_execution: Histogram,
38    #[cfg(feature = "datafusion")]
39    pub datafusion: DataFusionMetrics,
40    pub active_sessions: IntGauge,
41    pub batch_total_mem: TrAdderGauge,
42}
43
44pub static GLOBAL_FRONTEND_METRICS: LazyLock<FrontendMetrics> =
45    LazyLock::new(|| FrontendMetrics::new(&GLOBAL_METRICS_REGISTRY));
46
47impl FrontendMetrics {
48    fn new(registry: &Registry) -> Self {
49        let query_counter_local_execution = register_int_counter_with_registry!(
50            "frontend_query_counter_local_execution",
51            "Total query number of local execution mode for RisingWave batch engine",
52            registry
53        )
54        .unwrap();
55
56        let opts = histogram_opts!(
57            "frontend_latency_local_execution",
58            "latency of local execution mode for RisingWave batch engine",
59            exponential_buckets(0.01, 2.0, 23).unwrap()
60        );
61        let latency_local_execution = register_histogram_with_registry!(opts, registry).unwrap();
62
63        #[cfg(feature = "datafusion")]
64        let datafusion = DataFusionMetrics::new(registry);
65
66        let active_sessions = register_int_gauge_with_registry!(
67            "frontend_active_sessions",
68            "Total number of active sessions in frontend",
69            registry
70        )
71        .unwrap();
72
73        let batch_total_mem = TrAdderGauge::new(
74            "frontend_batch_total_mem",
75            "All memory usage of batch executors in bytes",
76        )
77        .unwrap();
78
79        registry
80            .register(Box::new(batch_total_mem.clone()))
81            .unwrap();
82
83        Self {
84            query_counter_local_execution,
85            latency_local_execution,
86            #[cfg(feature = "datafusion")]
87            datafusion,
88            active_sessions,
89            batch_total_mem,
90        }
91    }
92
93    /// Create a new `FrontendMetrics` instance used in tests or other places.
94    pub fn for_test() -> Self {
95        GLOBAL_FRONTEND_METRICS.clone()
96    }
97}
98
99pub static GLOBAL_CURSOR_METRICS: LazyLock<CursorMetrics> =
100    LazyLock::new(|| CursorMetrics::new(&GLOBAL_METRICS_REGISTRY));
101
102#[derive(Clone)]
103pub struct CursorMetrics {
104    pub subscription_cursor_error_count: GenericCounter<AtomicU64>,
105    pub subscription_cursor_query_duration: HistogramVec,
106    pub subscription_cursor_declare_duration: HistogramVec,
107    pub subscription_cursor_fetch_duration: HistogramVec,
108    subscription_cursor_nums: IntGauge,
109    invalid_subscription_cursor_nums: IntGauge,
110    subscription_cursor_last_fetch_duration: HistogramVec,
111    _cursor_metrics_collector: Option<Arc<CursorMetricsCollector>>,
112}
113
114impl CursorMetrics {
115    pub fn new(registry: &Registry) -> Self {
116        let subscription_cursor_error_count = register_int_counter_with_registry!(
117            "subscription_cursor_error_count",
118            "The subscription error num of cursor",
119            registry
120        )
121        .unwrap();
122        let opts = histogram_opts!(
123            "subscription_cursor_query_duration",
124            "The amount of time a query exists inside the cursor",
125            exponential_buckets(1.0, 5.0, 11).unwrap(),
126        );
127        let subscription_cursor_query_duration =
128            register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
129
130        let opts = histogram_opts!(
131            "subscription_cursor_declare_duration",
132            "Subscription cursor duration of declare",
133            exponential_buckets(1.0, 5.0, 11).unwrap(),
134        );
135        let subscription_cursor_declare_duration =
136            register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
137
138        let opts = histogram_opts!(
139            "subscription_cursor_fetch_duration",
140            "Subscription cursor duration of fetch",
141            exponential_buckets(1.0, 5.0, 11).unwrap(),
142        );
143        let subscription_cursor_fetch_duration =
144            register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
145
146        let subscription_cursor_nums = register_int_gauge_with_registry!(
147            "subscription_cursor_nums",
148            "The number of subscription cursor",
149            registry
150        )
151        .unwrap();
152        let invalid_subscription_cursor_nums = register_int_gauge_with_registry!(
153            "invalid_subscription_cursor_nums",
154            "The number of invalid subscription cursor",
155            registry
156        )
157        .unwrap();
158
159        let opts = histogram_opts!(
160            "subscription_cursor_last_fetch_duration",
161            "Since the last fetch, the time up to now",
162            exponential_buckets(1.0, 5.0, 11).unwrap(),
163        );
164        let subscription_cursor_last_fetch_duration =
165            register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
166        Self {
167            _cursor_metrics_collector: None,
168            subscription_cursor_error_count,
169            subscription_cursor_query_duration,
170            subscription_cursor_declare_duration,
171            subscription_cursor_fetch_duration,
172            subscription_cursor_nums,
173            invalid_subscription_cursor_nums,
174            subscription_cursor_last_fetch_duration,
175        }
176    }
177
178    pub fn for_test() -> Self {
179        GLOBAL_CURSOR_METRICS.clone()
180    }
181
182    pub fn start_with_session_map(&mut self, session_map: SessionMapRef) {
183        self._cursor_metrics_collector = Some(Arc::new(CursorMetricsCollector::new(
184            session_map,
185            self.subscription_cursor_nums.clone(),
186            self.invalid_subscription_cursor_nums.clone(),
187            self.subscription_cursor_last_fetch_duration.clone(),
188        )));
189    }
190
191    pub fn init(session_map: SessionMapRef) -> Self {
192        let mut cursor_metrics = GLOBAL_CURSOR_METRICS.clone();
193        cursor_metrics.start_with_session_map(session_map);
194        cursor_metrics
195    }
196}
197
198pub struct PeriodicCursorMetrics {
199    pub subscription_cursor_nums: i64,
200    pub invalid_subscription_cursor_nums: i64,
201    pub subscription_cursor_last_fetch_duration: HashMap<String, f64>,
202}
203
204struct CursorMetricsCollector {
205    _join_handle: JoinHandle<()>,
206    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
207}
208impl CursorMetricsCollector {
209    fn new(
210        session_map: SessionMapRef,
211        subscription_cursor_nums: IntGauge,
212        invalid_subscription_cursor_nums: IntGauge,
213        subscription_cursor_last_fetch_duration: HistogramVec,
214    ) -> Self {
215        const COLLECT_INTERVAL_SECONDS: u64 = 60;
216
217        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
218        let join_handle = tokio::spawn(async move {
219            let mut monitor_interval =
220                tokio::time::interval(Duration::from_secs(COLLECT_INTERVAL_SECONDS));
221            monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
222            loop {
223                tokio::select! {
224                    // Wait for interval
225                    _ = monitor_interval.tick() => {},
226                    // Shutdown monitor
227                    _ = &mut shutdown_rx => {
228                        tracing::info!("Fragment info monitor is stopped");
229                        return;
230                    }
231                }
232
233                let session_vec = { session_map.read().values().cloned().collect::<Vec<_>>() };
234                let mut subscription_cursor_nums_value = 0;
235                let mut invalid_subscription_cursor_nums_value = 0;
236                for session in &session_vec {
237                    let periodic_cursor_metrics = session
238                        .get_cursor_manager()
239                        .get_periodic_cursor_metrics()
240                        .await;
241                    subscription_cursor_nums_value +=
242                        periodic_cursor_metrics.subscription_cursor_nums;
243                    invalid_subscription_cursor_nums_value +=
244                        periodic_cursor_metrics.invalid_subscription_cursor_nums;
245                    for (subscription_name, duration) in
246                        &periodic_cursor_metrics.subscription_cursor_last_fetch_duration
247                    {
248                        subscription_cursor_last_fetch_duration
249                            .with_label_values(&[subscription_name])
250                            .observe(*duration);
251                    }
252                }
253                subscription_cursor_nums.set(subscription_cursor_nums_value);
254                invalid_subscription_cursor_nums.set(invalid_subscription_cursor_nums_value);
255            }
256        });
257        Self {
258            _join_handle: join_handle,
259            shutdown_tx: Some(shutdown_tx),
260        }
261    }
262}
263impl Drop for CursorMetricsCollector {
264    fn drop(&mut self) {
265        if let Some(shutdown_tx) = mem::take(&mut self.shutdown_tx) {
266            shutdown_tx.send(()).ok();
267        }
268    }
269}