risingwave_stream/cache/
managed_lru.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::alloc::{Allocator, Global};
16use std::borrow::Borrow;
17use std::hash::{BuildHasher, Hash};
18use std::ops::{Deref, DerefMut};
19use std::sync::Arc;
20use std::sync::atomic::Ordering;
21
22use risingwave_common::lru::{LruCache, RandomState};
23use risingwave_common::metrics::LabelGuardedIntGauge;
24use risingwave_common::sequence::AtomicSequence;
25use risingwave_common_estimate_size::EstimateSize;
26
27use crate::common::metrics::MetricsInfo;
28
29const REPORT_SIZE_EVERY_N_KB_CHANGE: usize = 4096;
30
31/// The managed cache is a lru cache that bounds the memory usage by epoch.
32/// Should be used with `MemoryManager`.
33pub struct ManagedLruCache<K, V, S = RandomState, A = Global>
34where
35    K: Hash + Eq,
36    S: BuildHasher + Send + Sync + 'static,
37    A: Clone + Allocator,
38{
39    inner: LruCache<K, V, S, A>,
40
41    /// The entry with sequence less than `watermark_sequence` should be evicted.
42    /// `watermark_sequence` should only be updatd by `MemoryManager`.
43    watermark_sequence: Arc<AtomicSequence>,
44
45    // Metrics info
46    _metrics_info: MetricsInfo,
47
48    reporter: HeapSizeReporter,
49}
50
51impl<K, V, S, A> ManagedLruCache<K, V, S, A>
52where
53    K: Hash + Eq + EstimateSize,
54    V: EstimateSize,
55    S: BuildHasher + Send + Sync + 'static,
56    A: Clone + Allocator,
57{
58    pub fn unbounded_with_hasher_in(
59        watermark_sequence: Arc<AtomicSequence>,
60        metrics_info: MetricsInfo,
61        hash_builder: S,
62        alloc: A,
63    ) -> Self {
64        let inner = LruCache::unbounded_with_hasher_in(hash_builder, alloc);
65
66        let memory_usage_metrics = metrics_info
67            .metrics
68            .stream_memory_usage
69            .with_guarded_label_values(&[
70                &metrics_info.actor_id,
71                &metrics_info.table_id,
72                &metrics_info.desc,
73            ]);
74        memory_usage_metrics.set(0.into());
75
76        let reporter = HeapSizeReporter::new(memory_usage_metrics, 0, 0);
77
78        Self {
79            inner,
80            watermark_sequence,
81            _metrics_info: metrics_info,
82            reporter,
83        }
84    }
85
86    /// Evict epochs lower than the watermark
87    pub fn evict(&mut self) {
88        let sequence = self.watermark_sequence.load(Ordering::Relaxed);
89        while let Some((key, value, _)) = self.inner.pop_with_sequence(sequence) {
90            let charge = key.estimated_size() + value.estimated_size();
91            self.reporter.dec(charge);
92        }
93    }
94
95    pub fn put(&mut self, k: K, v: V) -> Option<V> {
96        let key_size = k.estimated_size();
97        self.reporter.inc(key_size + v.estimated_size());
98        let old_val = self.inner.put(k, v);
99        if let Some(old_val) = &old_val {
100            self.reporter.dec(key_size + old_val.estimated_size());
101        }
102        old_val
103    }
104
105    // TODO(MrCroxx): REMOVE ME!!!
106    pub fn push(&mut self, k: K, v: V) -> Option<V> {
107        self.put(k, v)
108    }
109
110    pub fn get_mut(&mut self, k: &K) -> Option<MutGuard<'_, V>> {
111        let v = self.inner.get_mut(k);
112        v.map(|inner| MutGuard::new(inner, &mut self.reporter))
113    }
114
115    pub fn get<Q>(&mut self, k: &Q) -> Option<&V>
116    where
117        K: Borrow<Q>,
118        Q: Hash + Eq + ?Sized,
119    {
120        self.inner.get(k)
121    }
122
123    pub fn peek<Q>(&self, k: &Q) -> Option<&V>
124    where
125        K: Borrow<Q>,
126        Q: Hash + Eq + ?Sized,
127    {
128        self.inner.peek(k)
129    }
130
131    pub fn peek_mut(&mut self, k: &K) -> Option<MutGuard<'_, V>> {
132        let v = self.inner.peek_mut(k);
133        v.map(|inner| MutGuard::new(inner, &mut self.reporter))
134    }
135
136    pub fn contains<Q>(&self, k: &Q) -> bool
137    where
138        K: Borrow<Q>,
139        Q: Hash + Eq + ?Sized,
140    {
141        self.inner.contains(k)
142    }
143
144    pub fn len(&self) -> usize {
145        self.inner.len()
146    }
147
148    pub fn is_empty(&self) -> bool {
149        self.inner.is_empty()
150    }
151
152    pub fn clear(&mut self) {
153        self.inner.clear();
154    }
155}
156
157impl<K, V> ManagedLruCache<K, V>
158where
159    K: Hash + Eq + EstimateSize,
160    V: EstimateSize,
161{
162    pub fn unbounded(watermark_sequence: Arc<AtomicSequence>, metrics_info: MetricsInfo) -> Self {
163        Self::unbounded_with_hasher(watermark_sequence, metrics_info, RandomState::default())
164    }
165}
166
167impl<K, V, S> ManagedLruCache<K, V, S>
168where
169    K: Hash + Eq + EstimateSize,
170    V: EstimateSize,
171    S: BuildHasher + Send + Sync + 'static,
172{
173    pub fn unbounded_with_hasher(
174        watermark_sequence: Arc<AtomicSequence>,
175        metrics_info: MetricsInfo,
176        hash_builder: S,
177    ) -> Self {
178        Self::unbounded_with_hasher_in(watermark_sequence, metrics_info, hash_builder, Global)
179    }
180}
181
182pub struct MutGuard<'a, V: EstimateSize> {
183    inner: &'a mut V,
184    reporter: &'a mut HeapSizeReporter,
185    old_value_size: usize,
186}
187
188impl<'a, V: EstimateSize> MutGuard<'a, V> {
189    fn new(inner: &'a mut V, reporter: &'a mut HeapSizeReporter) -> Self {
190        let old_value_size = inner.estimated_size();
191        Self {
192            inner,
193            reporter,
194            old_value_size,
195        }
196    }
197}
198
199impl<V: EstimateSize> Drop for MutGuard<'_, V> {
200    fn drop(&mut self) {
201        let new_value_size = self.inner.estimated_size();
202        if new_value_size != self.old_value_size {
203            self.reporter.apply(|size| {
204                *size = size
205                    .saturating_sub(self.old_value_size)
206                    .saturating_add(new_value_size)
207            })
208        }
209    }
210}
211
212impl<V: EstimateSize> Deref for MutGuard<'_, V> {
213    type Target = V;
214
215    fn deref(&self) -> &Self::Target {
216        self.inner
217    }
218}
219
220impl<V: EstimateSize> DerefMut for MutGuard<'_, V> {
221    fn deref_mut(&mut self) -> &mut Self::Target {
222        self.inner
223    }
224}
225
226struct HeapSizeReporter {
227    metrics: LabelGuardedIntGauge<3>,
228    heap_size: usize,
229    last_reported: usize,
230}
231
232impl HeapSizeReporter {
233    fn new(
234        heap_size_metrics: LabelGuardedIntGauge<3>,
235        heap_size: usize,
236        last_reported: usize,
237    ) -> Self {
238        Self {
239            metrics: heap_size_metrics,
240            heap_size,
241            last_reported,
242        }
243    }
244
245    fn inc(&mut self, size: usize) {
246        self.heap_size = self.heap_size.saturating_add(size);
247        self.try_report();
248    }
249
250    fn dec(&mut self, size: usize) {
251        self.heap_size = self.heap_size.saturating_sub(size);
252        self.try_report();
253    }
254
255    fn apply<F>(&mut self, f: F)
256    where
257        F: FnOnce(&mut usize),
258    {
259        f(&mut self.heap_size);
260        self.try_report();
261    }
262
263    fn try_report(&mut self) -> bool {
264        if self.heap_size.abs_diff(self.last_reported) >= REPORT_SIZE_EVERY_N_KB_CHANGE << 10 {
265            self.metrics.set(self.heap_size as _);
266            self.last_reported = self.heap_size;
267            true
268        } else {
269            false
270        }
271    }
272}
273
274impl Drop for HeapSizeReporter {
275    fn drop(&mut self) {
276        self.metrics.set(0);
277    }
278}