risingwave_stream/common/state_cache/
ordered.rsuse risingwave_common::array::Op;
use risingwave_common_estimate_size::collections::EstimatedBTreeMap;
use risingwave_common_estimate_size::EstimateSize;
use super::{StateCache, StateCacheFiller};
#[derive(Clone, EstimateSize)]
pub struct OrderedStateCache<K: Ord + EstimateSize, V: EstimateSize> {
cache: EstimatedBTreeMap<K, V>,
synced: bool,
}
impl<K: Ord + EstimateSize, V: EstimateSize> OrderedStateCache<K, V> {
pub fn new() -> Self {
Self {
cache: Default::default(),
synced: false,
}
}
}
impl<K: Ord + EstimateSize, V: EstimateSize> Default for OrderedStateCache<K, V> {
fn default() -> Self {
Self::new()
}
}
impl<K: Ord + EstimateSize, V: EstimateSize> StateCache for OrderedStateCache<K, V> {
type Filler<'a>
= &'a mut Self
where
Self: 'a;
type Key = K;
type Value = V;
fn is_synced(&self) -> bool {
self.synced
}
fn begin_syncing(&mut self) -> Self::Filler<'_> {
self.synced = false;
self.cache.clear();
self
}
fn insert(&mut self, key: Self::Key, value: Self::Value) -> Option<Self::Value> {
if self.synced {
self.cache.insert(key, value)
} else {
None
}
}
fn delete(&mut self, key: &Self::Key) -> Option<Self::Value> {
if self.synced {
self.cache.remove(key)
} else {
None
}
}
fn apply_batch(&mut self, batch: impl IntoIterator<Item = (Op, Self::Key, Self::Value)>) {
if self.synced {
for (op, key, value) in batch {
match op {
Op::Insert | Op::UpdateInsert => {
self.cache.insert(key, value);
}
Op::Delete | Op::UpdateDelete => {
self.cache.remove(&key);
}
}
}
}
}
fn clear(&mut self) {
self.cache.clear();
self.synced = false;
}
fn values(&self) -> impl Iterator<Item = &Self::Value> {
assert!(self.synced);
self.cache.values()
}
fn first_key_value(&self) -> Option<(&Self::Key, &Self::Value)> {
assert!(self.synced);
self.cache.first_key_value()
}
}
impl<K: Ord + EstimateSize, V: EstimateSize> StateCacheFiller for &mut OrderedStateCache<K, V> {
type Key = K;
type Value = V;
fn capacity(&self) -> Option<usize> {
None
}
fn insert_unchecked(&mut self, key: Self::Key, value: Self::Value) {
self.cache.insert(key, value);
}
fn finish(self) {
self.synced = true;
}
}