1use std::alloc::{Allocator, Global};
16use std::borrow::Borrow;
17use std::hash::{BuildHasher, Hash};
18use std::ops::{Deref, DerefMut};
19use std::sync::Arc;
20use std::sync::atomic::Ordering;
21
22use risingwave_common::lru::{LruCache, RandomState};
23use risingwave_common::metrics::LabelGuardedIntGauge;
24use risingwave_common::sequence::AtomicSequence;
25use risingwave_common_estimate_size::EstimateSize;
26
27use crate::common::metrics::MetricsInfo;
28
29const REPORT_SIZE_EVERY_N_KB_CHANGE: usize = 4096;
30
31pub struct ManagedLruCache<K, V, S = RandomState, A = Global>
34where
35 K: Hash + Eq,
36 S: BuildHasher + Send + Sync + 'static,
37 A: Clone + Allocator,
38{
39 inner: LruCache<K, V, S, A>,
40
41 watermark_sequence: Arc<AtomicSequence>,
44
45 _metrics_info: MetricsInfo,
47
48 reporter: HeapSizeReporter,
49}
50
51impl<K, V, S, A> ManagedLruCache<K, V, S, A>
52where
53 K: Hash + Eq + EstimateSize,
54 V: EstimateSize,
55 S: BuildHasher + Send + Sync + 'static,
56 A: Clone + Allocator,
57{
58 pub fn unbounded_with_hasher_in(
59 watermark_sequence: Arc<AtomicSequence>,
60 metrics_info: MetricsInfo,
61 hash_builder: S,
62 alloc: A,
63 ) -> Self {
64 let inner = LruCache::unbounded_with_hasher_in(hash_builder, alloc);
65
66 let memory_usage_metrics = metrics_info
67 .metrics
68 .stream_memory_usage
69 .with_guarded_label_values(&[
70 &metrics_info.actor_id,
71 &metrics_info.table_id,
72 &metrics_info.desc,
73 ]);
74 memory_usage_metrics.set(0.into());
75
76 let reporter = HeapSizeReporter::new(memory_usage_metrics, 0, 0);
77
78 Self {
79 inner,
80 watermark_sequence,
81 _metrics_info: metrics_info,
82 reporter,
83 }
84 }
85
86 pub fn evict(&mut self) {
88 let sequence = self.watermark_sequence.load(Ordering::Relaxed);
89 while let Some((key, value, _)) = self.inner.pop_with_sequence(sequence) {
90 let charge = key.estimated_size() + value.estimated_size();
91 self.reporter.dec(charge);
92 }
93 }
94
95 pub fn put(&mut self, k: K, v: V) -> Option<V> {
96 let key_size = k.estimated_size();
97 self.reporter.inc(key_size + v.estimated_size());
98 let old_val = self.inner.put(k, v);
99 if let Some(old_val) = &old_val {
100 self.reporter.dec(key_size + old_val.estimated_size());
101 }
102 old_val
103 }
104
105 pub fn push(&mut self, k: K, v: V) -> Option<V> {
107 self.put(k, v)
108 }
109
110 pub fn get_mut(&mut self, k: &K) -> Option<MutGuard<'_, V>> {
111 let v = self.inner.get_mut(k);
112 v.map(|inner| MutGuard::new(inner, &mut self.reporter))
113 }
114
115 pub fn get<Q>(&mut self, k: &Q) -> Option<&V>
116 where
117 K: Borrow<Q>,
118 Q: Hash + Eq + ?Sized,
119 {
120 self.inner.get(k)
121 }
122
123 pub fn peek<Q>(&self, k: &Q) -> Option<&V>
124 where
125 K: Borrow<Q>,
126 Q: Hash + Eq + ?Sized,
127 {
128 self.inner.peek(k)
129 }
130
131 pub fn peek_mut(&mut self, k: &K) -> Option<MutGuard<'_, V>> {
132 let v = self.inner.peek_mut(k);
133 v.map(|inner| MutGuard::new(inner, &mut self.reporter))
134 }
135
136 pub fn contains<Q>(&self, k: &Q) -> bool
137 where
138 K: Borrow<Q>,
139 Q: Hash + Eq + ?Sized,
140 {
141 self.inner.contains(k)
142 }
143
144 pub fn len(&self) -> usize {
145 self.inner.len()
146 }
147
148 pub fn is_empty(&self) -> bool {
149 self.inner.is_empty()
150 }
151
152 pub fn clear(&mut self) {
153 self.inner.clear();
154 }
155}
156
157impl<K, V> ManagedLruCache<K, V>
158where
159 K: Hash + Eq + EstimateSize,
160 V: EstimateSize,
161{
162 pub fn unbounded(watermark_sequence: Arc<AtomicSequence>, metrics_info: MetricsInfo) -> Self {
163 Self::unbounded_with_hasher(watermark_sequence, metrics_info, RandomState::default())
164 }
165}
166
167impl<K, V, S> ManagedLruCache<K, V, S>
168where
169 K: Hash + Eq + EstimateSize,
170 V: EstimateSize,
171 S: BuildHasher + Send + Sync + 'static,
172{
173 pub fn unbounded_with_hasher(
174 watermark_sequence: Arc<AtomicSequence>,
175 metrics_info: MetricsInfo,
176 hash_builder: S,
177 ) -> Self {
178 Self::unbounded_with_hasher_in(watermark_sequence, metrics_info, hash_builder, Global)
179 }
180}
181
182pub struct MutGuard<'a, V: EstimateSize> {
183 inner: &'a mut V,
184 reporter: &'a mut HeapSizeReporter,
185 old_value_size: usize,
186}
187
188impl<'a, V: EstimateSize> MutGuard<'a, V> {
189 fn new(inner: &'a mut V, reporter: &'a mut HeapSizeReporter) -> Self {
190 let old_value_size = inner.estimated_size();
191 Self {
192 inner,
193 reporter,
194 old_value_size,
195 }
196 }
197}
198
199impl<V: EstimateSize> Drop for MutGuard<'_, V> {
200 fn drop(&mut self) {
201 let new_value_size = self.inner.estimated_size();
202 if new_value_size != self.old_value_size {
203 self.reporter.apply(|size| {
204 *size = size
205 .saturating_sub(self.old_value_size)
206 .saturating_add(new_value_size)
207 })
208 }
209 }
210}
211
212impl<V: EstimateSize> Deref for MutGuard<'_, V> {
213 type Target = V;
214
215 fn deref(&self) -> &Self::Target {
216 self.inner
217 }
218}
219
220impl<V: EstimateSize> DerefMut for MutGuard<'_, V> {
221 fn deref_mut(&mut self) -> &mut Self::Target {
222 self.inner
223 }
224}
225
226struct HeapSizeReporter {
227 metrics: LabelGuardedIntGauge<3>,
228 heap_size: usize,
229 last_reported: usize,
230}
231
232impl HeapSizeReporter {
233 fn new(
234 heap_size_metrics: LabelGuardedIntGauge<3>,
235 heap_size: usize,
236 last_reported: usize,
237 ) -> Self {
238 Self {
239 metrics: heap_size_metrics,
240 heap_size,
241 last_reported,
242 }
243 }
244
245 fn inc(&mut self, size: usize) {
246 self.heap_size = self.heap_size.saturating_add(size);
247 self.try_report();
248 }
249
250 fn dec(&mut self, size: usize) {
251 self.heap_size = self.heap_size.saturating_sub(size);
252 self.try_report();
253 }
254
255 fn apply<F>(&mut self, f: F)
256 where
257 F: FnOnce(&mut usize),
258 {
259 f(&mut self.heap_size);
260 self.try_report();
261 }
262
263 fn try_report(&mut self) -> bool {
264 if self.heap_size.abs_diff(self.last_reported) >= REPORT_SIZE_EVERY_N_KB_CHANGE << 10 {
265 self.metrics.set(self.heap_size as _);
266 self.last_reported = self.heap_size;
267 true
268 } else {
269 false
270 }
271 }
272}
273
274impl Drop for HeapSizeReporter {
275 fn drop(&mut self) {
276 self.metrics.set(0);
277 }
278}