risingwave_stream/common/state_cache/
ordered.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::Op;
16use risingwave_common_estimate_size::EstimateSize;
17use risingwave_common_estimate_size::collections::EstimatedBTreeMap;
18
19use super::{StateCache, StateCacheFiller};
20
21/// An implementation of [`StateCache`] that keeps all entries in an ordered in-memory map.
22#[derive(Clone, EstimateSize)]
23pub struct OrderedStateCache<K: Ord + EstimateSize, V: EstimateSize> {
24    cache: EstimatedBTreeMap<K, V>,
25    synced: bool,
26}
27
28impl<K: Ord + EstimateSize, V: EstimateSize> OrderedStateCache<K, V> {
29    pub fn new() -> Self {
30        Self {
31            cache: Default::default(),
32            synced: false,
33        }
34    }
35}
36
37impl<K: Ord + EstimateSize, V: EstimateSize> Default for OrderedStateCache<K, V> {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl<K: Ord + EstimateSize, V: EstimateSize> StateCache for OrderedStateCache<K, V> {
44    type Filler<'a>
45        = &'a mut Self
46    where
47        Self: 'a;
48    type Key = K;
49    type Value = V;
50
51    fn is_synced(&self) -> bool {
52        self.synced
53    }
54
55    fn begin_syncing(&mut self) -> Self::Filler<'_> {
56        self.synced = false;
57        self.cache.clear();
58        self
59    }
60
61    fn insert(&mut self, key: Self::Key, value: Self::Value) -> Option<Self::Value> {
62        if self.synced {
63            self.cache.insert(key, value)
64        } else {
65            None
66        }
67    }
68
69    fn delete(&mut self, key: &Self::Key) -> Option<Self::Value> {
70        if self.synced {
71            self.cache.remove(key)
72        } else {
73            None
74        }
75    }
76
77    fn apply_batch(&mut self, batch: impl IntoIterator<Item = (Op, Self::Key, Self::Value)>) {
78        if self.synced {
79            for (op, key, value) in batch {
80                match op {
81                    Op::Insert | Op::UpdateInsert => {
82                        self.cache.insert(key, value);
83                    }
84                    Op::Delete | Op::UpdateDelete => {
85                        self.cache.remove(&key);
86                    }
87                }
88            }
89        }
90    }
91
92    fn clear(&mut self) {
93        self.cache.clear();
94        self.synced = false;
95    }
96
97    fn values(&self) -> impl Iterator<Item = &Self::Value> {
98        assert!(self.synced);
99        self.cache.values()
100    }
101
102    fn first_key_value(&self) -> Option<(&Self::Key, &Self::Value)> {
103        assert!(self.synced);
104        self.cache.first_key_value()
105    }
106}
107
108impl<K: Ord + EstimateSize, V: EstimateSize> StateCacheFiller for &mut OrderedStateCache<K, V> {
109    type Key = K;
110    type Value = V;
111
112    fn capacity(&self) -> Option<usize> {
113        None
114    }
115
116    fn insert_unchecked(&mut self, key: Self::Key, value: Self::Value) {
117        self.cache.insert(key, value);
118    }
119
120    fn finish(self) {
121        self.synced = true;
122    }
123}