risingwave_stream/executor/dedup/
cache.rs1use std::hash::Hash;
16
17use risingwave_common_estimate_size::EstimateSize;
18
19use crate::cache::ManagedLruCache;
20use crate::common::metrics::MetricsInfo;
21use crate::executor::prelude::*;
22
23pub struct DedupCache<K: Hash + Eq + EstimateSize> {
26 inner: ManagedLruCache<K, ()>,
27}
28
29impl<K: Hash + Eq + EstimateSize> DedupCache<K> {
30 pub fn new(watermark_sequence: AtomicU64Ref, metrics_info: MetricsInfo) -> Self {
31 let cache = ManagedLruCache::unbounded(watermark_sequence, metrics_info);
32 Self { inner: cache }
33 }
34
35 pub fn dedup_insert(&mut self, key: K) -> bool {
38 self.inner.put(key, ()).is_none()
39 }
40
41 pub fn insert(&mut self, key: K) {
43 self.inner.push(key, ());
44 }
45
46 pub fn contains(&self, key: &K) -> bool {
48 self.inner.contains(key)
49 }
50
51 pub fn evict(&mut self) {
53 self.inner.evict()
54 }
55
56 pub fn clear(&mut self) {
58 self.inner.clear()
59 }
60}
61
62#[cfg(test)]
63mod tests {
64 use std::sync::Arc;
65 use std::sync::atomic::AtomicU64;
66
67 use super::DedupCache;
68 use crate::common::metrics::MetricsInfo;
69
70 #[test]
71 fn test_dedup_cache() {
72 let mut cache = DedupCache::new(Arc::new(AtomicU64::new(10000)), MetricsInfo::for_test());
73
74 cache.insert(10);
75 assert!(cache.contains(&10));
76 assert!(!cache.dedup_insert(10));
77
78 assert!(cache.dedup_insert(20));
79 assert!(cache.contains(&20));
80 assert!(!cache.dedup_insert(20));
81
82 cache.clear();
83 assert!(!cache.contains(&10));
84 assert!(!cache.contains(&20));
85 }
86}