1use std::alloc::{Allocator, Global};
16use std::borrow::Borrow;
17use std::hash::{BuildHasher, Hash};
18use std::ops::{Deref, DerefMut};
19use std::sync::Arc;
20use std::sync::atomic::Ordering;
21
22use risingwave_common::lru::{LruCache, RandomState};
23use risingwave_common::metrics::LabelGuardedIntGauge;
24use risingwave_common::sequence::AtomicSequence;
25use risingwave_common_estimate_size::EstimateSize;
26
27use crate::common::metrics::MetricsInfo;
28
29const REPORT_SIZE_EVERY_N_KB_CHANGE: usize = 4096;
30
31pub struct ManagedLruCache<K, V, S = RandomState, A = Global>
34where
35 K: Hash + Eq,
36 S: BuildHasher + Send + Sync + 'static,
37 A: Clone + Allocator,
38{
39 inner: LruCache<K, V, S, A>,
40
41 watermark_sequence: Arc<AtomicSequence>,
44
45 _metrics_info: MetricsInfo,
47
48 reporter: HeapSizeReporter,
49}
50
51impl<K, V, S, A> ManagedLruCache<K, V, S, A>
52where
53 K: Hash + Eq + EstimateSize,
54 V: EstimateSize,
55 S: BuildHasher + Send + Sync + 'static,
56 A: Clone + Allocator,
57{
58 pub fn unbounded_with_hasher_in(
59 watermark_sequence: Arc<AtomicSequence>,
60 metrics_info: MetricsInfo,
61 hash_builder: S,
62 alloc: A,
63 ) -> Self {
64 let inner = LruCache::unbounded_with_hasher_in(hash_builder, alloc);
65
66 let memory_usage_metrics = metrics_info
67 .metrics
68 .stream_memory_usage
69 .with_guarded_label_values(&[
70 &metrics_info.actor_id,
71 &metrics_info.table_id,
72 &metrics_info.desc,
73 ]);
74 memory_usage_metrics.set(0.into());
75
76 let reporter = HeapSizeReporter::new(memory_usage_metrics, 0, 0);
77
78 Self {
79 inner,
80 watermark_sequence,
81 _metrics_info: metrics_info,
82 reporter,
83 }
84 }
85
86 pub fn evict(&mut self) {
88 let sequence = self.watermark_sequence.load(Ordering::Relaxed);
89 while let Some((key, value, _)) = self.inner.pop_with_sequence(sequence) {
90 let charge = key.estimated_size() + value.estimated_size();
91 self.reporter.dec(charge);
92 }
93 }
94
95 pub fn put(&mut self, k: K, v: V) -> Option<V> {
96 let key_size = k.estimated_size();
97 self.reporter.inc(key_size + v.estimated_size());
98 let old_val = self.inner.put(k, v);
99 if let Some(old_val) = &old_val {
100 self.reporter.dec(key_size + old_val.estimated_size());
101 }
102 old_val
103 }
104
105 pub fn remove(&mut self, k: &K) -> Option<V> {
106 let key_size = k.estimated_size();
107 let old_val = self.inner.remove(k);
108 if let Some(old_val) = &old_val {
109 self.reporter.dec(key_size + old_val.estimated_size());
110 }
111 old_val
112 }
113
114 pub fn get_mut(&mut self, k: &K) -> Option<MutGuard<'_, V>> {
115 let v = self.inner.get_mut(k);
116 v.map(|inner| MutGuard::new(inner, &mut self.reporter))
117 }
118
119 pub fn get<Q>(&mut self, k: &Q) -> Option<&V>
120 where
121 K: Borrow<Q>,
122 Q: Hash + Eq + ?Sized,
123 {
124 self.inner.get(k)
125 }
126
127 pub fn peek<Q>(&self, k: &Q) -> Option<&V>
128 where
129 K: Borrow<Q>,
130 Q: Hash + Eq + ?Sized,
131 {
132 self.inner.peek(k)
133 }
134
135 pub fn peek_mut(&mut self, k: &K) -> Option<MutGuard<'_, V>> {
136 let v = self.inner.peek_mut(k);
137 v.map(|inner| MutGuard::new(inner, &mut self.reporter))
138 }
139
140 pub fn contains<Q>(&self, k: &Q) -> bool
141 where
142 K: Borrow<Q>,
143 Q: Hash + Eq + ?Sized,
144 {
145 self.inner.contains(k)
146 }
147
148 pub fn len(&self) -> usize {
149 self.inner.len()
150 }
151
152 pub fn is_empty(&self) -> bool {
153 self.inner.is_empty()
154 }
155
156 pub fn clear(&mut self) {
157 self.inner.clear();
158 }
159}
160
161impl<K, V> ManagedLruCache<K, V>
162where
163 K: Hash + Eq + EstimateSize,
164 V: EstimateSize,
165{
166 pub fn unbounded(watermark_sequence: Arc<AtomicSequence>, metrics_info: MetricsInfo) -> Self {
167 Self::unbounded_with_hasher(watermark_sequence, metrics_info, RandomState::default())
168 }
169}
170
171impl<K, V, S> ManagedLruCache<K, V, S>
172where
173 K: Hash + Eq + EstimateSize,
174 V: EstimateSize,
175 S: BuildHasher + Send + Sync + 'static,
176{
177 pub fn unbounded_with_hasher(
178 watermark_sequence: Arc<AtomicSequence>,
179 metrics_info: MetricsInfo,
180 hash_builder: S,
181 ) -> Self {
182 Self::unbounded_with_hasher_in(watermark_sequence, metrics_info, hash_builder, Global)
183 }
184}
185
186pub struct MutGuard<'a, V: EstimateSize> {
187 inner: &'a mut V,
188 reporter: &'a mut HeapSizeReporter,
189 old_value_size: usize,
190}
191
192impl<'a, V: EstimateSize> MutGuard<'a, V> {
193 fn new(inner: &'a mut V, reporter: &'a mut HeapSizeReporter) -> Self {
194 let old_value_size = inner.estimated_size();
195 Self {
196 inner,
197 reporter,
198 old_value_size,
199 }
200 }
201}
202
203impl<V: EstimateSize> Drop for MutGuard<'_, V> {
204 fn drop(&mut self) {
205 let new_value_size = self.inner.estimated_size();
206 if new_value_size != self.old_value_size {
207 self.reporter.apply(|size| {
208 *size = size
209 .saturating_sub(self.old_value_size)
210 .saturating_add(new_value_size)
211 })
212 }
213 }
214}
215
216impl<V: EstimateSize> Deref for MutGuard<'_, V> {
217 type Target = V;
218
219 fn deref(&self) -> &Self::Target {
220 self.inner
221 }
222}
223
224impl<V: EstimateSize> DerefMut for MutGuard<'_, V> {
225 fn deref_mut(&mut self) -> &mut Self::Target {
226 self.inner
227 }
228}
229
230struct HeapSizeReporter {
231 metrics: LabelGuardedIntGauge,
232 heap_size: usize,
233 last_reported: usize,
234}
235
236impl HeapSizeReporter {
237 fn new(
238 heap_size_metrics: LabelGuardedIntGauge,
239 heap_size: usize,
240 last_reported: usize,
241 ) -> Self {
242 Self {
243 metrics: heap_size_metrics,
244 heap_size,
245 last_reported,
246 }
247 }
248
249 fn inc(&mut self, size: usize) {
250 self.heap_size = self.heap_size.saturating_add(size);
251 self.try_report();
252 }
253
254 fn dec(&mut self, size: usize) {
255 self.heap_size = self.heap_size.saturating_sub(size);
256 self.try_report();
257 }
258
259 fn apply<F>(&mut self, f: F)
260 where
261 F: FnOnce(&mut usize),
262 {
263 f(&mut self.heap_size);
264 self.try_report();
265 }
266
267 fn try_report(&mut self) -> bool {
268 if self.heap_size.abs_diff(self.last_reported) >= REPORT_SIZE_EVERY_N_KB_CHANGE << 10 {
269 self.metrics.set(self.heap_size as _);
270 self.last_reported = self.heap_size;
271 true
272 } else {
273 false
274 }
275 }
276}
277
278impl Drop for HeapSizeReporter {
279 fn drop(&mut self) {
280 self.metrics.set(0);
281 }
282}