risingwave_stream/executor/dedup/
cache.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::Hash;
16
17use risingwave_common_estimate_size::EstimateSize;
18
19use crate::cache::ManagedLruCache;
20use crate::common::metrics::MetricsInfo;
21use crate::executor::prelude::*;
22
23/// [`DedupCache`] is used for key deduplication. Currently, the cache behaves like a set that only
24/// accepts a key without a value. This could be refined in the future to support k-v pairs.
25pub struct DedupCache<K: Hash + Eq + EstimateSize> {
26    inner: ManagedLruCache<K, ()>,
27}
28
29impl<K: Hash + Eq + EstimateSize> DedupCache<K> {
30    pub fn new(watermark_sequence: AtomicU64Ref, metrics_info: MetricsInfo) -> Self {
31        let cache = ManagedLruCache::unbounded(watermark_sequence, metrics_info);
32        Self { inner: cache }
33    }
34
35    /// Insert a `key` into the cache only if the `key` doesn't exist in the cache before. Return
36    /// whether the `key` is successfully inserted.
37    pub fn dedup_insert(&mut self, key: K) -> bool {
38        self.inner.put(key, ()).is_none()
39    }
40
41    /// Insert a `key` into the cache without checking for duplication.
42    pub fn insert(&mut self, key: K) {
43        self.inner.push(key, ());
44    }
45
46    /// Check whether the given key is in the cache.
47    pub fn contains(&self, key: &K) -> bool {
48        self.inner.contains(key)
49    }
50
51    /// Evict the inner LRU cache according to the watermark epoch.
52    pub fn evict(&mut self) {
53        self.inner.evict()
54    }
55
56    /// Clear everything in the cache.
57    pub fn clear(&mut self) {
58        self.inner.clear()
59    }
60}
61
62#[cfg(test)]
63mod tests {
64    use std::sync::Arc;
65    use std::sync::atomic::AtomicU64;
66
67    use super::DedupCache;
68    use crate::common::metrics::MetricsInfo;
69
70    #[test]
71    fn test_dedup_cache() {
72        let mut cache = DedupCache::new(Arc::new(AtomicU64::new(10000)), MetricsInfo::for_test());
73
74        cache.insert(10);
75        assert!(cache.contains(&10));
76        assert!(!cache.dedup_insert(10));
77
78        assert!(cache.dedup_insert(20));
79        assert!(cache.contains(&20));
80        assert!(!cache.dedup_insert(20));
81
82        cache.clear();
83        assert!(!cache.contains(&10));
84        assert!(!cache.contains(&20));
85    }
86}