risingwave_common_metrics/monitor/
rwlock.rs1use std::ops::{Deref, DerefMut};
16use std::time::Instant;
17
18use prometheus::{Histogram, HistogramVec};
19use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
20
21const UNNAMED_PROCESS: &str = "unnamed";
22
23pub struct MonitoredRwLock<T> {
24 metrics: HistogramVec,
26 process_metrics: HistogramVec,
28 inner: RwLock<T>,
29 lock_name: &'static str,
30}
31
32impl<T> MonitoredRwLock<T> {
33 pub fn new(
34 metrics: HistogramVec,
35 process_metrics: HistogramVec,
36 val: T,
37 lock_name: &'static str,
38 ) -> Self {
39 Self {
40 metrics,
41 process_metrics,
42 inner: RwLock::new(val),
43 lock_name,
44 }
45 }
46
47 pub async fn read(&self) -> MonitoredRwLockGuard<RwLockReadGuard<'_, T>> {
48 self.read_with_process_name(UNNAMED_PROCESS).await
49 }
50
51 pub async fn write(&self) -> MonitoredRwLockGuard<RwLockWriteGuard<'_, T>> {
52 self.write_with_process_name(UNNAMED_PROCESS).await
53 }
54
55 pub async fn read_with_process_name(
56 &self,
57 func_name: &'static str,
58 ) -> MonitoredRwLockGuard<RwLockReadGuard<'_, T>> {
59 let _timer = self
60 .metrics
61 .with_label_values(&[self.lock_name, "read"])
62 .start_timer();
63 let guard = self.inner.read().await;
64 MonitoredRwLockGuard::new(guard, self.process_metric(func_name))
65 }
66
67 pub async fn write_with_process_name(
68 &self,
69 func_name: &'static str,
70 ) -> MonitoredRwLockGuard<RwLockWriteGuard<'_, T>> {
71 let _timer = self
72 .metrics
73 .with_label_values(&[self.lock_name, "write"])
74 .start_timer();
75 let guard = self.inner.write().await;
76 MonitoredRwLockGuard::new(guard, self.process_metric(func_name))
77 }
78
79 fn process_metric(&self, func_name: &'static str) -> Histogram {
80 self.process_metrics
81 .with_label_values(&[func_name, self.lock_name])
82 }
83}
84
85pub struct MonitoredRwLockGuard<G> {
86 guard: G,
87 process_metric: Histogram,
88 process_start: Instant,
89}
90
91impl<G> MonitoredRwLockGuard<G> {
92 fn new(guard: G, process_metric: Histogram) -> Self {
93 Self {
94 guard,
95 process_metric,
96 process_start: Instant::now(),
97 }
98 }
99}
100
101impl<G> Deref for MonitoredRwLockGuard<G>
102where
103 G: Deref,
104{
105 type Target = G::Target;
106
107 fn deref(&self) -> &Self::Target {
108 &self.guard
109 }
110}
111
112impl<G> DerefMut for MonitoredRwLockGuard<G>
113where
114 G: DerefMut,
115{
116 fn deref_mut(&mut self) -> &mut Self::Target {
117 &mut self.guard
118 }
119}
120
121impl<G> Drop for MonitoredRwLockGuard<G> {
122 fn drop(&mut self) {
123 self.process_metric
124 .observe(self.process_start.elapsed().as_secs_f64());
125 }
126}