risingwave_common/
lru.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::alloc::{Allocator, Global};
16use std::borrow::Borrow;
17use std::cell::RefCell;
18use std::hash::{BuildHasher, Hash};
19use std::mem::MaybeUninit;
20use std::ptr::NonNull;
21use std::sync::atomic::Ordering;
22
23pub use ahash::RandomState;
24use hashbrown::HashTable;
25use hashbrown::hash_table::Entry;
26use hashbrown0_14 as _;
27
28use crate::sequence::{AtomicSequence, Sequence, Sequencer};
29
30thread_local! {
31    pub static SEQUENCER: RefCell<Sequencer> = const { RefCell::new(Sequencer::new(Sequencer::DEFAULT_STEP, Sequencer::DEFAULT_LAG)) };
32}
33
34static SEQUENCER_DEFAULT_STEP: AtomicSequence = AtomicSequence::new(Sequencer::DEFAULT_STEP);
35static SEQUENCER_DEFAULT_LAG: AtomicSequence = AtomicSequence::new(Sequencer::DEFAULT_LAG);
36
37pub fn init_global_sequencer_args(step: Sequence, lag: Sequence) {
38    SEQUENCER_DEFAULT_STEP.store(step, Ordering::Relaxed);
39    SEQUENCER_DEFAULT_LAG.store(lag, Ordering::Relaxed);
40}
41
42struct LruEntry<K, V>
43where
44    K: Hash + Eq,
45{
46    prev: Option<NonNull<LruEntry<K, V>>>,
47    next: Option<NonNull<LruEntry<K, V>>>,
48    key: MaybeUninit<K>,
49    value: MaybeUninit<V>,
50    hash: u64,
51    sequence: Sequence,
52}
53
54impl<K, V> LruEntry<K, V>
55where
56    K: Hash + Eq,
57{
58    fn key(&self) -> &K {
59        unsafe { self.key.assume_init_ref() }
60    }
61
62    fn value(&self) -> &V {
63        unsafe { self.value.assume_init_ref() }
64    }
65
66    fn value_mut(&mut self) -> &mut V {
67        unsafe { self.value.assume_init_mut() }
68    }
69}
70
71unsafe impl<K, V> Send for LruEntry<K, V> where K: Hash + Eq {}
72unsafe impl<K, V> Sync for LruEntry<K, V> where K: Hash + Eq {}
73
74pub struct LruCache<K, V, S = RandomState, A = Global>
75where
76    K: Hash + Eq,
77    S: BuildHasher + Send + Sync + 'static,
78    A: Clone + Allocator,
79{
80    map: HashTable<NonNull<LruEntry<K, V>>, A>,
81    /// dummy node of the lru linked list
82    dummy: Box<LruEntry<K, V>, A>,
83
84    alloc: A,
85    hash_builder: S,
86}
87
88unsafe impl<K, V, S, A> Send for LruCache<K, V, S, A>
89where
90    K: Hash + Eq,
91    S: BuildHasher + Send + Sync + 'static,
92    A: Clone + Allocator,
93{
94}
95unsafe impl<K, V, S, A> Sync for LruCache<K, V, S, A>
96where
97    K: Hash + Eq,
98    S: BuildHasher + Send + Sync + 'static,
99    A: Clone + Allocator,
100{
101}
102
103impl<K, V> LruCache<K, V>
104where
105    K: Hash + Eq,
106{
107    pub fn unbounded() -> Self {
108        Self::unbounded_with_hasher_in(RandomState::default(), Global)
109    }
110}
111
112impl<K, V, S, A> LruCache<K, V, S, A>
113where
114    K: Hash + Eq,
115    S: BuildHasher + Send + Sync + 'static,
116    A: Clone + Allocator,
117{
118    pub fn unbounded_with_hasher_in(hash_builder: S, alloc: A) -> Self {
119        let map = HashTable::new_in(alloc.clone());
120        let mut dummy = Box::new_in(
121            LruEntry {
122                prev: None,
123                next: None,
124                key: MaybeUninit::uninit(),
125                value: MaybeUninit::uninit(),
126                hash: 0,
127                sequence: Sequence::default(),
128            },
129            alloc.clone(),
130        );
131        let ptr = unsafe { NonNull::new_unchecked(dummy.as_mut() as *mut _) };
132        dummy.next = Some(ptr);
133        dummy.prev = Some(ptr);
134        Self {
135            map,
136            dummy,
137            alloc,
138            hash_builder,
139        }
140    }
141
142    pub fn put(&mut self, key: K, mut value: V) -> Option<V> {
143        unsafe {
144            let hash = self.hash_builder.hash_one(&key);
145
146            match self
147                .map
148                .entry(hash, |p| p.as_ref().key() == &key, |p| p.as_ref().hash)
149            {
150                Entry::Occupied(o) => {
151                    let mut ptr = *o.get();
152                    let entry = ptr.as_mut();
153                    std::mem::swap(&mut value, entry.value_mut());
154                    Self::detach(ptr);
155                    self.attach(ptr);
156                    Some(value)
157                }
158                Entry::Vacant(v) => {
159                    let entry = Box::new_in(
160                        LruEntry {
161                            prev: None,
162                            next: None,
163                            key: MaybeUninit::new(key),
164                            value: MaybeUninit::new(value),
165                            hash,
166                            // sequence will be updated by `attach`
167                            sequence: 0,
168                        },
169                        self.alloc.clone(),
170                    );
171                    let ptr = NonNull::new_unchecked(Box::into_raw_with_allocator(entry).0);
172                    v.insert(ptr);
173                    self.attach(ptr);
174                    None
175                }
176            }
177        }
178    }
179
180    pub fn remove(&mut self, key: &K) -> Option<V> {
181        unsafe {
182            let hash = self.hash_builder.hash_one(key);
183
184            match self
185                .map
186                .entry(hash, |p| p.as_ref().key() == key, |p| p.as_ref().hash)
187            {
188                Entry::Occupied(o) => {
189                    let ptr = *o.get();
190
191                    // Detach the entry from the LRU list
192                    Self::detach(ptr);
193
194                    // Extract entry from the box and get its value
195                    let mut entry = Box::from_raw_in(ptr.as_ptr(), self.alloc.clone());
196                    entry.key.assume_init_drop();
197                    let value = entry.value.assume_init();
198
199                    // Remove entry from the hash table
200                    o.remove();
201
202                    Some(value)
203                }
204                Entry::Vacant(_) => None,
205            }
206        }
207    }
208
209    pub fn get<'a, Q>(&'a mut self, key: &Q) -> Option<&'a V>
210    where
211        K: Borrow<Q>,
212        Q: Hash + Eq + ?Sized,
213    {
214        unsafe {
215            let key = key.borrow();
216            let hash = self.hash_builder.hash_one(key);
217            if let Some(ptr) = self.map.find(hash, |p| p.as_ref().key().borrow() == key) {
218                let ptr = *ptr;
219                Self::detach(ptr);
220                self.attach(ptr);
221                Some(ptr.as_ref().value())
222            } else {
223                None
224            }
225        }
226    }
227
228    pub fn get_mut<'a, Q>(&'a mut self, key: &Q) -> Option<&'a mut V>
229    where
230        K: Borrow<Q>,
231        Q: Hash + Eq + ?Sized,
232    {
233        unsafe {
234            let key = key.borrow();
235            let hash = self.hash_builder.hash_one(key);
236            if let Some(ptr) = self
237                .map
238                .find_mut(hash, |p| p.as_ref().key().borrow() == key)
239            {
240                let mut ptr = *ptr;
241                Self::detach(ptr);
242                self.attach(ptr);
243                Some(ptr.as_mut().value_mut())
244            } else {
245                None
246            }
247        }
248    }
249
250    pub fn peek<'a, Q>(&'a self, key: &Q) -> Option<&'a V>
251    where
252        K: Borrow<Q>,
253        Q: Hash + Eq + ?Sized,
254    {
255        unsafe {
256            let key = key.borrow();
257            let hash = self.hash_builder.hash_one(key);
258            self.map
259                .find(hash, |p| p.as_ref().key().borrow() == key)
260                .map(|ptr| ptr.as_ref().value())
261        }
262    }
263
264    pub fn peek_mut<'a, Q>(&'a mut self, key: &Q) -> Option<&'a mut V>
265    where
266        K: Borrow<Q>,
267        Q: Hash + Eq + ?Sized,
268    {
269        unsafe {
270            let key = key.borrow();
271            let hash = self.hash_builder.hash_one(key);
272            self.map
273                .find(hash, |p| p.as_ref().key().borrow() == key)
274                .map(|ptr| ptr.clone().as_mut().value_mut())
275        }
276    }
277
278    pub fn contains<Q>(&self, key: &Q) -> bool
279    where
280        K: Borrow<Q>,
281        Q: Hash + Eq + ?Sized,
282    {
283        unsafe {
284            let key = key.borrow();
285            let hash = self.hash_builder.hash_one(key);
286            self.map
287                .find(hash, |p| p.as_ref().key().borrow() == key)
288                .is_some()
289        }
290    }
291
292    pub fn len(&self) -> usize {
293        self.map.len()
294    }
295
296    pub fn is_empty(&self) -> bool {
297        self.len() == 0
298    }
299
300    /// Pop first entry if its sequence is less than the given sequence.
301    pub fn pop_with_sequence(&mut self, sequence: Sequence) -> Option<(K, V, Sequence)> {
302        unsafe {
303            if self.is_empty() {
304                return None;
305            }
306
307            let ptr = self.dummy.next.unwrap_unchecked();
308            if ptr.as_ref().sequence >= sequence {
309                return None;
310            }
311
312            Self::detach(ptr);
313
314            let entry = Box::from_raw_in(ptr.as_ptr(), self.alloc.clone());
315
316            let key = entry.key.assume_init();
317            let value = entry.value.assume_init();
318            let sequence = entry.sequence;
319
320            let hash = self.hash_builder.hash_one(&key);
321
322            match self
323                .map
324                .entry(hash, |p| p.as_ref().key() == &key, |p| p.as_ref().hash)
325            {
326                Entry::Occupied(o) => {
327                    o.remove();
328                }
329                Entry::Vacant(_) => {}
330            }
331
332            Some((key, value, sequence))
333        }
334    }
335
336    pub fn clear(&mut self) {
337        unsafe {
338            let mut map = HashTable::new_in(self.alloc.clone());
339            std::mem::swap(&mut map, &mut self.map);
340
341            for ptr in map.drain() {
342                Self::detach(ptr);
343                let mut entry = Box::from_raw_in(ptr.as_ptr(), self.alloc.clone());
344                entry.key.assume_init_drop();
345                entry.value.assume_init_drop();
346            }
347
348            debug_assert!(self.is_empty());
349            debug_assert_eq!(
350                self.dummy.as_mut() as *mut _,
351                self.dummy.next.unwrap_unchecked().as_ptr()
352            )
353        }
354    }
355
356    fn detach(mut ptr: NonNull<LruEntry<K, V>>) {
357        unsafe {
358            let entry = ptr.as_mut();
359
360            debug_assert!(entry.prev.is_some() && entry.next.is_some());
361
362            entry.prev.unwrap_unchecked().as_mut().next = entry.next;
363            entry.next.unwrap_unchecked().as_mut().prev = entry.prev;
364
365            entry.next = None;
366            entry.prev = None;
367        }
368    }
369
370    fn attach(&mut self, mut ptr: NonNull<LruEntry<K, V>>) {
371        unsafe {
372            let entry = ptr.as_mut();
373
374            debug_assert!(entry.prev.is_none() && entry.next.is_none());
375
376            entry.next = Some(NonNull::new_unchecked(self.dummy.as_mut() as *mut _));
377            entry.prev = self.dummy.prev;
378
379            self.dummy.prev.unwrap_unchecked().as_mut().next = Some(ptr);
380            self.dummy.prev = Some(ptr);
381
382            entry.sequence = SEQUENCER.with(|s| s.borrow_mut().alloc());
383        }
384    }
385}
386
387impl<K, V, S, A> Drop for LruCache<K, V, S, A>
388where
389    K: Hash + Eq,
390    S: BuildHasher + Send + Sync + 'static,
391    A: Clone + Allocator,
392{
393    fn drop(&mut self) {
394        self.clear()
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401
402    #[test]
403    fn test_unbounded() {
404        let cache: LruCache<i32, &str> = LruCache::unbounded();
405        assert!(cache.is_empty());
406        assert_eq!(cache.len(), 0);
407    }
408
409    #[test]
410    fn test_unbounded_with_hasher_in() {
411        let cache: LruCache<i32, &str, RandomState, Global> =
412            LruCache::unbounded_with_hasher_in(RandomState::default(), Global);
413        assert!(cache.is_empty());
414        assert_eq!(cache.len(), 0);
415    }
416
417    #[test]
418    fn test_put() {
419        let mut cache = LruCache::unbounded();
420
421        // Put new entry
422        assert_eq!(cache.put(1, "one"), None);
423        assert_eq!(cache.len(), 1);
424
425        // Update existing entry
426        assert_eq!(cache.put(1, "ONE"), Some("one"));
427        assert_eq!(cache.len(), 1);
428
429        // Multiple entries
430        assert_eq!(cache.put(2, "two"), None);
431        assert_eq!(cache.len(), 2);
432    }
433
434    #[test]
435    fn test_remove() {
436        let mut cache = LruCache::unbounded();
437
438        // Remove non-existent key
439        assert_eq!(cache.remove(&1), None);
440
441        // Remove existing key
442        cache.put(1, "one");
443        assert_eq!(cache.remove(&1), Some("one"));
444        assert!(cache.is_empty());
445
446        // Remove already removed key
447        assert_eq!(cache.remove(&1), None);
448
449        // Multiple entries
450        cache.put(1, "one");
451        cache.put(2, "two");
452        cache.put(3, "three");
453        assert_eq!(cache.remove(&2), Some("two"));
454        assert_eq!(cache.len(), 2);
455        assert!(!cache.contains(&2));
456    }
457
458    #[test]
459    fn test_get() {
460        let mut cache = LruCache::unbounded();
461
462        // Get non-existent key
463        assert_eq!(cache.get(&1), None);
464
465        // Get existing key
466        cache.put(1, "one");
467        assert_eq!(cache.get(&1), Some(&"one"));
468
469        // Check LRU order updated after get
470        cache.put(2, "two");
471        let _ = cache.get(&1); // Moves 1 to most recently used
472
473        // Verify LRU order by using pop_with_sequence
474        let (key, _, _) = cache.pop_with_sequence(u64::MAX).unwrap();
475        assert_eq!(key, 2); // key 2 should be least recently used
476    }
477
478    #[test]
479    fn test_get_mut() {
480        let mut cache = LruCache::unbounded();
481
482        // Get_mut non-existent key
483        assert_eq!(cache.get_mut(&1), None);
484
485        // Get_mut and modify existing key
486        cache.put(1, String::from("one"));
487        {
488            let val = cache.get_mut(&1).unwrap();
489            *val = String::from("ONE");
490        }
491        assert_eq!(cache.get(&1), Some(&String::from("ONE")));
492
493        // Check LRU order updated after get_mut
494        cache.put(2, String::from("two"));
495        let _ = cache.get_mut(&1); // Moves 1 to most recently used
496
497        // Verify LRU order by using pop_with_sequence
498        let (key, _, _) = cache.pop_with_sequence(u64::MAX).unwrap();
499        assert_eq!(key, 2); // key 2 should be least recently used
500    }
501
502    #[test]
503    fn test_peek() {
504        let mut cache = LruCache::unbounded();
505
506        // Peek non-existent key
507        assert_eq!(cache.peek(&1), None);
508
509        // Peek existing key
510        cache.put(1, "one");
511        cache.put(2, "two");
512        assert_eq!(cache.peek(&1), Some(&"one"));
513
514        // Verify LRU order NOT updated after peek
515        let (key, _, _) = cache.pop_with_sequence(u64::MAX).unwrap();
516        assert_eq!(key, 1); // key 1 should still be least recently used
517    }
518
519    #[test]
520    fn test_peek_mut() {
521        let mut cache = LruCache::unbounded();
522
523        // Peek_mut non-existent key
524        assert_eq!(cache.peek_mut(&1), None);
525
526        // Peek_mut and modify existing key
527        cache.put(1, String::from("one"));
528        cache.put(2, String::from("two"));
529        {
530            let val = cache.peek_mut(&1).unwrap();
531            *val = String::from("ONE");
532        }
533        assert_eq!(cache.peek(&1), Some(&String::from("ONE")));
534
535        // Verify LRU order NOT updated after peek_mut
536        let (key, _, _) = cache.pop_with_sequence(u64::MAX).unwrap();
537        assert_eq!(key, 1); // key 1 should still be least recently used
538    }
539
540    #[test]
541    fn test_contains() {
542        let mut cache = LruCache::unbounded();
543
544        // Contains on empty cache
545        assert!(!cache.contains(&1));
546
547        // Contains after put
548        cache.put(1, "one");
549        assert!(cache.contains(&1));
550
551        // Contains after remove
552        cache.remove(&1);
553        assert!(!cache.contains(&1));
554    }
555
556    #[test]
557    fn test_len_and_is_empty() {
558        let mut cache = LruCache::unbounded();
559
560        // Empty cache
561        assert!(cache.is_empty());
562        assert_eq!(cache.len(), 0);
563
564        // Non-empty cache
565        cache.put(1, "one");
566        assert!(!cache.is_empty());
567        assert_eq!(cache.len(), 1);
568
569        // After multiple operations
570        cache.put(2, "two");
571        assert_eq!(cache.len(), 2);
572        cache.remove(&1);
573        assert_eq!(cache.len(), 1);
574        cache.clear();
575        assert!(cache.is_empty());
576        assert_eq!(cache.len(), 0);
577    }
578
579    #[test]
580    fn test_clear() {
581        let mut cache = LruCache::unbounded();
582
583        // Clear empty cache
584        cache.clear();
585        assert!(cache.is_empty());
586
587        // Clear non-empty cache
588        cache.put(1, "one");
589        cache.put(2, "two");
590        cache.put(3, "three");
591        assert_eq!(cache.len(), 3);
592        cache.clear();
593        assert!(cache.is_empty());
594        assert_eq!(cache.len(), 0);
595        assert!(!cache.contains(&1));
596        assert!(!cache.contains(&2));
597        assert!(!cache.contains(&3));
598    }
599
600    #[test]
601    fn test_lru_behavior() {
602        let mut cache = LruCache::unbounded();
603
604        // Insert in order
605        cache.put(1, "one");
606        cache.put(2, "two");
607        cache.put(3, "three");
608
609        // Manipulate LRU order
610        let _ = cache.get(&1); // Moves 1 to most recently used
611
612        // Check order: 2->3->1
613        let (key, _, _) = cache.pop_with_sequence(u64::MAX).unwrap();
614        assert_eq!(key, 2);
615        let (key, _, _) = cache.pop_with_sequence(u64::MAX).unwrap();
616        assert_eq!(key, 3);
617        let (key, _, _) = cache.pop_with_sequence(u64::MAX).unwrap();
618        assert_eq!(key, 1);
619        assert!(cache.is_empty());
620    }
621}