risingwave_stream/common/state_cache/mod.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::Op;
16use risingwave_common_estimate_size::EstimateSize;
17
18mod ordered;
19mod top_n;
20
21pub use ordered::*;
22pub use top_n::*;
23
24/// A common interface for state table cache.
25pub trait StateCache: EstimateSize {
26 type Key: Ord + EstimateSize;
27 type Value: EstimateSize;
28
29 /// Type of state cache filler, for syncing the cache with the state table.
30 type Filler<'a>: StateCacheFiller<Key = Self::Key, Value = Self::Value> + 'a
31 where
32 Self: 'a;
33
34 /// Check if the cache is synced with the state table.
35 fn is_synced(&self) -> bool;
36
37 /// Begin syncing the cache with the state table.
38 fn begin_syncing(&mut self) -> Self::Filler<'_>;
39
40 /// Insert an entry into the cache. Should not break cache validity.
41 fn insert(&mut self, key: Self::Key, value: Self::Value) -> Option<Self::Value>;
42
43 /// Delete an entry from the cache. Should not break cache validity.
44 fn delete(&mut self, key: &Self::Key) -> Option<Self::Value>;
45
46 /// Apply a batch of operations to the cache. Should not break cache validity.
47 fn apply_batch(&mut self, batch: impl IntoIterator<Item = (Op, Self::Key, Self::Value)>);
48
49 /// Clear the cache.
50 fn clear(&mut self);
51
52 /// Iterate over the values in the cache.
53 fn values(&self) -> impl Iterator<Item = &Self::Value>;
54
55 /// Get the reference of first key-value pair in the cache.
56 fn first_key_value(&self) -> Option<(&Self::Key, &Self::Value)>;
57}
58
59pub trait StateCacheFiller {
60 type Key: Ord;
61 type Value;
62
63 /// Get the capacity of the cache.
64 fn capacity(&self) -> Option<usize>;
65
66 /// Insert an entry into the cache without cache validity check.
67 fn insert_unchecked(&mut self, key: Self::Key, value: Self::Value);
68
69 /// Finish syncing the cache with the state table. This should mark the cache as synced.
70 fn finish(self);
71}