risingwave_stream/common/state_cache/
ordered.rs1use risingwave_common::array::Op;
16use risingwave_common_estimate_size::EstimateSize;
17use risingwave_common_estimate_size::collections::EstimatedBTreeMap;
18
19use super::{StateCache, StateCacheFiller};
20
21#[derive(Clone, EstimateSize)]
23pub struct OrderedStateCache<K: Ord + EstimateSize, V: EstimateSize> {
24 cache: EstimatedBTreeMap<K, V>,
25 synced: bool,
26}
27
28impl<K: Ord + EstimateSize, V: EstimateSize> OrderedStateCache<K, V> {
29 pub fn new() -> Self {
30 Self {
31 cache: Default::default(),
32 synced: false,
33 }
34 }
35}
36
37impl<K: Ord + EstimateSize, V: EstimateSize> Default for OrderedStateCache<K, V> {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl<K: Ord + EstimateSize, V: EstimateSize> StateCache for OrderedStateCache<K, V> {
44 type Filler<'a>
45 = &'a mut Self
46 where
47 Self: 'a;
48 type Key = K;
49 type Value = V;
50
51 fn is_synced(&self) -> bool {
52 self.synced
53 }
54
55 fn begin_syncing(&mut self) -> Self::Filler<'_> {
56 self.synced = false;
57 self.cache.clear();
58 self
59 }
60
61 fn insert(&mut self, key: Self::Key, value: Self::Value) -> Option<Self::Value> {
62 if self.synced {
63 self.cache.insert(key, value)
64 } else {
65 None
66 }
67 }
68
69 fn delete(&mut self, key: &Self::Key) -> Option<Self::Value> {
70 if self.synced {
71 self.cache.remove(key)
72 } else {
73 None
74 }
75 }
76
77 fn apply_batch(&mut self, batch: impl IntoIterator<Item = (Op, Self::Key, Self::Value)>) {
78 if self.synced {
79 for (op, key, value) in batch {
80 match op {
81 Op::Insert | Op::UpdateInsert => {
82 self.cache.insert(key, value);
83 }
84 Op::Delete | Op::UpdateDelete => {
85 self.cache.remove(&key);
86 }
87 }
88 }
89 }
90 }
91
92 fn clear(&mut self) {
93 self.cache.clear();
94 self.synced = false;
95 }
96
97 fn values(&self) -> impl Iterator<Item = &Self::Value> {
98 assert!(self.synced);
99 self.cache.values()
100 }
101
102 fn first_key_value(&self) -> Option<(&Self::Key, &Self::Value)> {
103 assert!(self.synced);
104 self.cache.first_key_value()
105 }
106}
107
108impl<K: Ord + EstimateSize, V: EstimateSize> StateCacheFiller for &mut OrderedStateCache<K, V> {
109 type Key = K;
110 type Value = V;
111
112 fn capacity(&self) -> Option<usize> {
113 None
114 }
115
116 fn insert_unchecked(&mut self, key: Self::Key, value: Self::Value) {
117 self.cache.insert(key, value);
118 }
119
120 fn finish(self) {
121 self.synced = true;
122 }
123}