risingwave_common_metrics/monitor/
in_mem.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module contains data structures for in-memory monitoring.
16//! It is intentionally decoupled from Prometheus.
17
18use std::collections::HashMap;
19use std::sync::atomic::AtomicU64;
20use std::sync::{Arc, Weak};
21
22use parking_lot::Mutex;
23
24pub type Count = Arc<AtomicU64>;
25
26pub struct GuardedCount {
27    id: u64,
28    pub count: Count,
29    parent: Weak<Mutex<InnerCountMap>>,
30}
31
32impl GuardedCount {
33    pub fn new(id: u64, parent: &Arc<Mutex<InnerCountMap>>) -> (Count, Self) {
34        let guard = GuardedCount {
35            id,
36            count: Arc::new(AtomicU64::new(0)),
37            parent: Arc::downgrade(parent),
38        };
39        (guard.count.clone(), guard)
40    }
41}
42
43impl Drop for GuardedCount {
44    fn drop(&mut self) {
45        if let Some(parent) = self.parent.upgrade() {
46            let mut map = parent.lock();
47            map.inner.remove(&self.id);
48        }
49    }
50}
51
52pub struct InnerCountMap {
53    inner: HashMap<u64, Count>,
54}
55
56#[derive(Clone)]
57pub struct CountMap(Arc<Mutex<InnerCountMap>>);
58
59impl CountMap {
60    pub fn new() -> Self {
61        let inner = Arc::new(Mutex::new(InnerCountMap {
62            inner: HashMap::new(),
63        }));
64        CountMap(inner)
65    }
66
67    pub fn new_count(&self, id: u64) -> GuardedCount {
68        let inner = &self.0;
69        let (count, guarded_count) = GuardedCount::new(id, inner);
70        let mut map = inner.lock();
71        map.inner.insert(id, count);
72        guarded_count
73    }
74
75    pub fn collect(&self, ids: &[u64]) -> HashMap<u64, u64> {
76        let map = self.0.lock();
77        ids.iter()
78            .filter_map(|id| {
79                map.inner
80                    .get(id)
81                    .map(|v| (*id, v.load(std::sync::atomic::Ordering::Relaxed)))
82            })
83            .collect()
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90
91    #[test]
92    fn test_count_map() {
93        let count_map = CountMap::new();
94        let count1 = count_map.new_count(1);
95        let count2 = count_map.new_count(2);
96        count1
97            .count
98            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
99        count2
100            .count
101            .fetch_add(2, std::sync::atomic::Ordering::Relaxed);
102        let counts = count_map.collect(&[1, 2]);
103        assert_eq!(counts[&1], 1);
104        assert_eq!(counts[&2], 2);
105    }
106
107    #[test]
108    fn test_count_map_drop() {
109        let count_map = CountMap::new();
110        let count1 = count_map.new_count(1);
111        let count2 = count_map.new_count(2);
112        count1
113            .count
114            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
115        count2
116            .count
117            .fetch_add(2, std::sync::atomic::Ordering::Relaxed);
118        let counts = count_map.collect(&[1, 2]);
119        assert_eq!(counts[&1], 1);
120        assert_eq!(counts[&2], 2);
121        drop(count1);
122        let counts = count_map.collect(&[1, 2]);
123        assert_eq!(counts.get(&1), None);
124        assert_eq!(counts.get(&2), Some(2).as_ref());
125    }
126}