risingwave_common_metrics/monitor/
in_mem.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module contains data structures for in-memory monitoring.
16//! It is intentionally decoupled from Prometheus.
17
18use std::collections::HashMap;
19use std::sync::atomic::AtomicU64;
20use std::sync::{Arc, Weak};
21
22use parking_lot::Mutex;
23
24pub type Count = Arc<AtomicU64>;
25
26pub trait CountMapIdTrait = Copy + std::hash::Hash + Eq;
27
28pub struct GuardedCount<I: CountMapIdTrait> {
29    id: I,
30    pub count: Count,
31    parent: Weak<Mutex<InnerCountMap<I>>>,
32}
33
34impl<I: CountMapIdTrait> GuardedCount<I> {
35    pub fn new(id: I, parent: &Arc<Mutex<InnerCountMap<I>>>) -> (Count, Self) {
36        let guard = GuardedCount {
37            id,
38            count: Arc::new(AtomicU64::new(0)),
39            parent: Arc::downgrade(parent),
40        };
41        (guard.count.clone(), guard)
42    }
43}
44
45impl<I: CountMapIdTrait> Drop for GuardedCount<I> {
46    fn drop(&mut self) {
47        if let Some(parent) = self.parent.upgrade() {
48            let mut map = parent.lock();
49            map.inner.remove(&self.id);
50        }
51    }
52}
53
54pub struct InnerCountMap<I: CountMapIdTrait> {
55    inner: HashMap<I, Count>,
56}
57
58#[derive(Clone)]
59pub struct CountMap<I: CountMapIdTrait>(Arc<Mutex<InnerCountMap<I>>>);
60
61impl<I: CountMapIdTrait> CountMap<I> {
62    pub fn new() -> Self {
63        let inner = Arc::new(Mutex::new(InnerCountMap {
64            inner: HashMap::new(),
65        }));
66        CountMap(inner)
67    }
68
69    pub fn new_count(&self, id: I) -> GuardedCount<I> {
70        let inner = &self.0;
71        let (count, guarded_count) = GuardedCount::new(id, inner);
72        let mut map = inner.lock();
73        map.inner.insert(id, count);
74        guarded_count
75    }
76
77    pub fn collect(&self, ids: &[I]) -> HashMap<I, u64> {
78        let map = self.0.lock();
79        ids.iter()
80            .filter_map(|id| {
81                map.inner
82                    .get(id)
83                    .map(|v| (*id, v.load(std::sync::atomic::Ordering::Relaxed)))
84            })
85            .collect()
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92
93    #[test]
94    fn test_count_map() {
95        let count_map = CountMap::<u64>::new();
96        let count1 = count_map.new_count(1);
97        let count2 = count_map.new_count(2);
98        count1
99            .count
100            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
101        count2
102            .count
103            .fetch_add(2, std::sync::atomic::Ordering::Relaxed);
104        let counts = count_map.collect(&[1, 2]);
105        assert_eq!(counts[&1], 1);
106        assert_eq!(counts[&2], 2);
107    }
108
109    #[test]
110    fn test_count_map_drop() {
111        let count_map = CountMap::<u64>::new();
112        let count1 = count_map.new_count(1);
113        let count2 = count_map.new_count(2);
114        count1
115            .count
116            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
117        count2
118            .count
119            .fetch_add(2, std::sync::atomic::Ordering::Relaxed);
120        let counts = count_map.collect(&[1, 2]);
121        assert_eq!(counts[&1], 1);
122        assert_eq!(counts[&2], 2);
123        drop(count1);
124        let counts = count_map.collect(&[1, 2]);
125        assert_eq!(counts.get(&1), None);
126        assert_eq!(counts.get(&2), Some(2).as_ref());
127    }
128}