risingwave_common_metrics/monitor/
in_mem.rs1use std::collections::HashMap;
19use std::sync::atomic::AtomicU64;
20use std::sync::{Arc, Weak};
21
22use parking_lot::Mutex;
23
24pub type Count = Arc<AtomicU64>;
25
26pub trait CountMapIdTrait = Copy + std::hash::Hash + Eq;
27
28pub struct GuardedCount<I: CountMapIdTrait> {
29 id: I,
30 pub count: Count,
31 parent: Weak<Mutex<InnerCountMap<I>>>,
32}
33
34impl<I: CountMapIdTrait> GuardedCount<I> {
35 pub fn new(id: I, parent: &Arc<Mutex<InnerCountMap<I>>>) -> (Count, Self) {
36 let guard = GuardedCount {
37 id,
38 count: Arc::new(AtomicU64::new(0)),
39 parent: Arc::downgrade(parent),
40 };
41 (guard.count.clone(), guard)
42 }
43}
44
45impl<I: CountMapIdTrait> Drop for GuardedCount<I> {
46 fn drop(&mut self) {
47 if let Some(parent) = self.parent.upgrade() {
48 let mut map = parent.lock();
49 map.inner.remove(&self.id);
50 }
51 }
52}
53
54pub struct InnerCountMap<I: CountMapIdTrait> {
55 inner: HashMap<I, Count>,
56}
57
58#[derive(Clone)]
59pub struct CountMap<I: CountMapIdTrait>(Arc<Mutex<InnerCountMap<I>>>);
60
61impl<I: CountMapIdTrait> CountMap<I> {
62 pub fn new() -> Self {
63 let inner = Arc::new(Mutex::new(InnerCountMap {
64 inner: HashMap::new(),
65 }));
66 CountMap(inner)
67 }
68
69 pub fn new_count(&self, id: I) -> GuardedCount<I> {
70 let inner = &self.0;
71 let (count, guarded_count) = GuardedCount::new(id, inner);
72 let mut map = inner.lock();
73 map.inner.insert(id, count);
74 guarded_count
75 }
76
77 pub fn collect(&self, ids: &[I]) -> HashMap<I, u64> {
78 let map = self.0.lock();
79 ids.iter()
80 .filter_map(|id| {
81 map.inner
82 .get(id)
83 .map(|v| (*id, v.load(std::sync::atomic::Ordering::Relaxed)))
84 })
85 .collect()
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92
93 #[test]
94 fn test_count_map() {
95 let count_map = CountMap::<u64>::new();
96 let count1 = count_map.new_count(1);
97 let count2 = count_map.new_count(2);
98 count1
99 .count
100 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
101 count2
102 .count
103 .fetch_add(2, std::sync::atomic::Ordering::Relaxed);
104 let counts = count_map.collect(&[1, 2]);
105 assert_eq!(counts[&1], 1);
106 assert_eq!(counts[&2], 2);
107 }
108
109 #[test]
110 fn test_count_map_drop() {
111 let count_map = CountMap::<u64>::new();
112 let count1 = count_map.new_count(1);
113 let count2 = count_map.new_count(2);
114 count1
115 .count
116 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
117 count2
118 .count
119 .fetch_add(2, std::sync::atomic::Ordering::Relaxed);
120 let counts = count_map.collect(&[1, 2]);
121 assert_eq!(counts[&1], 1);
122 assert_eq!(counts[&2], 2);
123 drop(count1);
124 let counts = count_map.collect(&[1, 2]);
125 assert_eq!(counts.get(&1), None);
126 assert_eq!(counts.get(&2), Some(2).as_ref());
127 }
128}