risingwave_stream/cache/
managed_lru.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::alloc::{Allocator, Global};
16use std::borrow::Borrow;
17use std::hash::{BuildHasher, Hash};
18use std::ops::{Deref, DerefMut};
19use std::sync::Arc;
20use std::sync::atomic::Ordering;
21
22use risingwave_common::lru::{LruCache, RandomState};
23use risingwave_common::metrics::LabelGuardedIntGauge;
24use risingwave_common::sequence::AtomicSequence;
25use risingwave_common_estimate_size::EstimateSize;
26
27use crate::common::metrics::MetricsInfo;
28
29const REPORT_SIZE_EVERY_N_KB_CHANGE: usize = 4096;
30
31/// The managed cache is a lru cache that bounds the memory usage by epoch.
32/// Should be used with `MemoryManager`.
33pub struct ManagedLruCache<K, V, S = RandomState, A = Global>
34where
35    K: Hash + Eq,
36    S: BuildHasher + Send + Sync + 'static,
37    A: Clone + Allocator,
38{
39    inner: LruCache<K, V, S, A>,
40
41    /// The entry with sequence less than `watermark_sequence` should be evicted.
42    /// `watermark_sequence` should only be updatd by `MemoryManager`.
43    watermark_sequence: Arc<AtomicSequence>,
44
45    // Metrics info
46    _metrics_info: MetricsInfo,
47
48    reporter: HeapSizeReporter,
49}
50
51impl<K, V, S, A> ManagedLruCache<K, V, S, A>
52where
53    K: Hash + Eq + EstimateSize,
54    V: EstimateSize,
55    S: BuildHasher + Send + Sync + 'static,
56    A: Clone + Allocator,
57{
58    pub fn unbounded_with_hasher_in(
59        watermark_sequence: Arc<AtomicSequence>,
60        metrics_info: MetricsInfo,
61        hash_builder: S,
62        alloc: A,
63    ) -> Self {
64        let inner = LruCache::unbounded_with_hasher_in(hash_builder, alloc);
65
66        let memory_usage_metrics = metrics_info
67            .metrics
68            .stream_memory_usage
69            .with_guarded_label_values(&[
70                &metrics_info.actor_id,
71                &metrics_info.table_id,
72                &metrics_info.desc,
73            ]);
74        memory_usage_metrics.set(0.into());
75
76        let reporter = HeapSizeReporter::new(memory_usage_metrics, 0, 0);
77
78        Self {
79            inner,
80            watermark_sequence,
81            _metrics_info: metrics_info,
82            reporter,
83        }
84    }
85
86    /// Evict epochs lower than the watermark
87    pub fn evict(&mut self) {
88        let sequence = self.watermark_sequence.load(Ordering::Relaxed);
89        while let Some((key, value, _)) = self.inner.pop_with_sequence(sequence) {
90            let charge = key.estimated_size() + value.estimated_size();
91            self.reporter.dec(charge);
92        }
93    }
94
95    pub fn put(&mut self, k: K, v: V) -> Option<V> {
96        let key_size = k.estimated_size();
97        self.reporter.inc(key_size + v.estimated_size());
98        let old_val = self.inner.put(k, v);
99        if let Some(old_val) = &old_val {
100            self.reporter.dec(key_size + old_val.estimated_size());
101        }
102        old_val
103    }
104
105    pub fn remove(&mut self, k: &K) -> Option<V> {
106        let key_size = k.estimated_size();
107        let old_val = self.inner.remove(k);
108        if let Some(old_val) = &old_val {
109            self.reporter.dec(key_size + old_val.estimated_size());
110        }
111        old_val
112    }
113
114    pub fn get_mut(&mut self, k: &K) -> Option<MutGuard<'_, V>> {
115        let v = self.inner.get_mut(k);
116        v.map(|inner| MutGuard::new(inner, &mut self.reporter))
117    }
118
119    pub fn get<Q>(&mut self, k: &Q) -> Option<&V>
120    where
121        K: Borrow<Q>,
122        Q: Hash + Eq + ?Sized,
123    {
124        self.inner.get(k)
125    }
126
127    pub fn peek<Q>(&self, k: &Q) -> Option<&V>
128    where
129        K: Borrow<Q>,
130        Q: Hash + Eq + ?Sized,
131    {
132        self.inner.peek(k)
133    }
134
135    pub fn peek_mut(&mut self, k: &K) -> Option<MutGuard<'_, V>> {
136        let v = self.inner.peek_mut(k);
137        v.map(|inner| MutGuard::new(inner, &mut self.reporter))
138    }
139
140    pub fn contains<Q>(&self, k: &Q) -> bool
141    where
142        K: Borrow<Q>,
143        Q: Hash + Eq + ?Sized,
144    {
145        self.inner.contains(k)
146    }
147
148    pub fn len(&self) -> usize {
149        self.inner.len()
150    }
151
152    pub fn is_empty(&self) -> bool {
153        self.inner.is_empty()
154    }
155
156    pub fn clear(&mut self) {
157        self.inner.clear();
158    }
159}
160
161impl<K, V> ManagedLruCache<K, V>
162where
163    K: Hash + Eq + EstimateSize,
164    V: EstimateSize,
165{
166    pub fn unbounded(watermark_sequence: Arc<AtomicSequence>, metrics_info: MetricsInfo) -> Self {
167        Self::unbounded_with_hasher(watermark_sequence, metrics_info, RandomState::default())
168    }
169}
170
171impl<K, V, S> ManagedLruCache<K, V, S>
172where
173    K: Hash + Eq + EstimateSize,
174    V: EstimateSize,
175    S: BuildHasher + Send + Sync + 'static,
176{
177    pub fn unbounded_with_hasher(
178        watermark_sequence: Arc<AtomicSequence>,
179        metrics_info: MetricsInfo,
180        hash_builder: S,
181    ) -> Self {
182        Self::unbounded_with_hasher_in(watermark_sequence, metrics_info, hash_builder, Global)
183    }
184}
185
186pub struct MutGuard<'a, V: EstimateSize> {
187    inner: &'a mut V,
188    reporter: &'a mut HeapSizeReporter,
189    old_value_size: usize,
190}
191
192impl<'a, V: EstimateSize> MutGuard<'a, V> {
193    fn new(inner: &'a mut V, reporter: &'a mut HeapSizeReporter) -> Self {
194        let old_value_size = inner.estimated_size();
195        Self {
196            inner,
197            reporter,
198            old_value_size,
199        }
200    }
201}
202
203impl<V: EstimateSize> Drop for MutGuard<'_, V> {
204    fn drop(&mut self) {
205        let new_value_size = self.inner.estimated_size();
206        if new_value_size != self.old_value_size {
207            self.reporter.apply(|size| {
208                *size = size
209                    .saturating_sub(self.old_value_size)
210                    .saturating_add(new_value_size)
211            })
212        }
213    }
214}
215
216impl<V: EstimateSize> Deref for MutGuard<'_, V> {
217    type Target = V;
218
219    fn deref(&self) -> &Self::Target {
220        self.inner
221    }
222}
223
224impl<V: EstimateSize> DerefMut for MutGuard<'_, V> {
225    fn deref_mut(&mut self) -> &mut Self::Target {
226        self.inner
227    }
228}
229
230struct HeapSizeReporter {
231    metrics: LabelGuardedIntGauge,
232    heap_size: usize,
233    last_reported: usize,
234}
235
236impl HeapSizeReporter {
237    fn new(
238        heap_size_metrics: LabelGuardedIntGauge,
239        heap_size: usize,
240        last_reported: usize,
241    ) -> Self {
242        Self {
243            metrics: heap_size_metrics,
244            heap_size,
245            last_reported,
246        }
247    }
248
249    fn inc(&mut self, size: usize) {
250        self.heap_size = self.heap_size.saturating_add(size);
251        self.try_report();
252    }
253
254    fn dec(&mut self, size: usize) {
255        self.heap_size = self.heap_size.saturating_sub(size);
256        self.try_report();
257    }
258
259    fn apply<F>(&mut self, f: F)
260    where
261        F: FnOnce(&mut usize),
262    {
263        f(&mut self.heap_size);
264        self.try_report();
265    }
266
267    fn try_report(&mut self) -> bool {
268        if self.heap_size.abs_diff(self.last_reported) >= REPORT_SIZE_EVERY_N_KB_CHANGE << 10 {
269            self.metrics.set(self.heap_size as _);
270            self.last_reported = self.heap_size;
271            true
272        } else {
273            false
274        }
275    }
276}
277
278impl Drop for HeapSizeReporter {
279    fn drop(&mut self) {
280        self.metrics.set(0);
281    }
282}