Skip to main content

risingwave_common_metrics/monitor/
rwlock.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::{Deref, DerefMut};
16use std::time::Instant;
17
18use prometheus::{Histogram, HistogramVec};
19use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
20
21const UNNAMED_PROCESS: &str = "unnamed";
22
23pub struct MonitoredRwLock<T> {
24    // labels: [lock_name, lock_type]
25    metrics: HistogramVec,
26    // labels: [method, lock_name]
27    process_metrics: HistogramVec,
28    inner: RwLock<T>,
29    lock_name: &'static str,
30}
31
32impl<T> MonitoredRwLock<T> {
33    pub fn new(
34        metrics: HistogramVec,
35        process_metrics: HistogramVec,
36        val: T,
37        lock_name: &'static str,
38    ) -> Self {
39        Self {
40            metrics,
41            process_metrics,
42            inner: RwLock::new(val),
43            lock_name,
44        }
45    }
46
47    pub async fn read(&self) -> MonitoredRwLockGuard<RwLockReadGuard<'_, T>> {
48        self.read_with_process_name(UNNAMED_PROCESS).await
49    }
50
51    pub async fn write(&self) -> MonitoredRwLockGuard<RwLockWriteGuard<'_, T>> {
52        self.write_with_process_name(UNNAMED_PROCESS).await
53    }
54
55    pub async fn read_with_process_name(
56        &self,
57        func_name: &'static str,
58    ) -> MonitoredRwLockGuard<RwLockReadGuard<'_, T>> {
59        let _timer = self
60            .metrics
61            .with_label_values(&[self.lock_name, "read"])
62            .start_timer();
63        let guard = self.inner.read().await;
64        MonitoredRwLockGuard::new(guard, self.process_metric(func_name))
65    }
66
67    pub async fn write_with_process_name(
68        &self,
69        func_name: &'static str,
70    ) -> MonitoredRwLockGuard<RwLockWriteGuard<'_, T>> {
71        let _timer = self
72            .metrics
73            .with_label_values(&[self.lock_name, "write"])
74            .start_timer();
75        let guard = self.inner.write().await;
76        MonitoredRwLockGuard::new(guard, self.process_metric(func_name))
77    }
78
79    fn process_metric(&self, func_name: &'static str) -> Histogram {
80        self.process_metrics
81            .with_label_values(&[func_name, self.lock_name])
82    }
83}
84
85pub struct MonitoredRwLockGuard<G> {
86    guard: G,
87    process_metric: Histogram,
88    process_start: Instant,
89}
90
91impl<G> MonitoredRwLockGuard<G> {
92    fn new(guard: G, process_metric: Histogram) -> Self {
93        Self {
94            guard,
95            process_metric,
96            process_start: Instant::now(),
97        }
98    }
99}
100
101impl<G> Deref for MonitoredRwLockGuard<G>
102where
103    G: Deref,
104{
105    type Target = G::Target;
106
107    fn deref(&self) -> &Self::Target {
108        &self.guard
109    }
110}
111
112impl<G> DerefMut for MonitoredRwLockGuard<G>
113where
114    G: DerefMut,
115{
116    fn deref_mut(&mut self) -> &mut Self::Target {
117        &mut self.guard
118    }
119}
120
121impl<G> Drop for MonitoredRwLockGuard<G> {
122    fn drop(&mut self) {
123        self.process_metric
124            .observe(self.process_start.elapsed().as_secs_f64());
125    }
126}