risingwave_common_metrics/monitor/
in_mem.rs1use std::collections::HashMap;
19use std::sync::atomic::AtomicU64;
20use std::sync::{Arc, Weak};
21
22use parking_lot::Mutex;
23
24pub type Count = Arc<AtomicU64>;
25
26pub struct GuardedCount {
27 id: u64,
28 pub count: Count,
29 parent: Weak<Mutex<InnerCountMap>>,
30}
31
32impl GuardedCount {
33 pub fn new(id: u64, parent: &Arc<Mutex<InnerCountMap>>) -> (Count, Self) {
34 let guard = GuardedCount {
35 id,
36 count: Arc::new(AtomicU64::new(0)),
37 parent: Arc::downgrade(parent),
38 };
39 (guard.count.clone(), guard)
40 }
41}
42
43impl Drop for GuardedCount {
44 fn drop(&mut self) {
45 if let Some(parent) = self.parent.upgrade() {
46 let mut map = parent.lock();
47 map.inner.remove(&self.id);
48 }
49 }
50}
51
52pub struct InnerCountMap {
53 inner: HashMap<u64, Count>,
54}
55
56#[derive(Clone)]
57pub struct CountMap(Arc<Mutex<InnerCountMap>>);
58
59impl CountMap {
60 pub fn new() -> Self {
61 let inner = Arc::new(Mutex::new(InnerCountMap {
62 inner: HashMap::new(),
63 }));
64 CountMap(inner)
65 }
66
67 pub fn new_count(&self, id: u64) -> GuardedCount {
68 let inner = &self.0;
69 let (count, guarded_count) = GuardedCount::new(id, inner);
70 let mut map = inner.lock();
71 map.inner.insert(id, count);
72 guarded_count
73 }
74
75 pub fn collect(&self, ids: &[u64]) -> HashMap<u64, u64> {
76 let map = self.0.lock();
77 ids.iter()
78 .filter_map(|id| {
79 map.inner
80 .get(id)
81 .map(|v| (*id, v.load(std::sync::atomic::Ordering::Relaxed)))
82 })
83 .collect()
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use super::*;
90
91 #[test]
92 fn test_count_map() {
93 let count_map = CountMap::new();
94 let count1 = count_map.new_count(1);
95 let count2 = count_map.new_count(2);
96 count1
97 .count
98 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
99 count2
100 .count
101 .fetch_add(2, std::sync::atomic::Ordering::Relaxed);
102 let counts = count_map.collect(&[1, 2]);
103 assert_eq!(counts[&1], 1);
104 assert_eq!(counts[&2], 2);
105 }
106
107 #[test]
108 fn test_count_map_drop() {
109 let count_map = CountMap::new();
110 let count1 = count_map.new_count(1);
111 let count2 = count_map.new_count(2);
112 count1
113 .count
114 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
115 count2
116 .count
117 .fetch_add(2, std::sync::atomic::Ordering::Relaxed);
118 let counts = count_map.collect(&[1, 2]);
119 assert_eq!(counts[&1], 1);
120 assert_eq!(counts[&2], 2);
121 drop(count1);
122 let counts = count_map.collect(&[1, 2]);
123 assert_eq!(counts.get(&1), None);
124 assert_eq!(counts.get(&2), Some(2).as_ref());
125 }
126}