risingwave_stream/common/state_cache/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::Op;
16use risingwave_common_estimate_size::EstimateSize;
17
18mod ordered;
19mod top_n;
20
21pub use ordered::*;
22pub use top_n::*;
23
24/// A common interface for state table cache.
25pub trait StateCache: EstimateSize {
26    type Key: Ord + EstimateSize;
27    type Value: EstimateSize;
28
29    /// Type of state cache filler, for syncing the cache with the state table.
30    type Filler<'a>: StateCacheFiller<Key = Self::Key, Value = Self::Value> + 'a
31    where
32        Self: 'a;
33
34    /// Check if the cache is synced with the state table.
35    fn is_synced(&self) -> bool;
36
37    /// Begin syncing the cache with the state table.
38    fn begin_syncing(&mut self) -> Self::Filler<'_>;
39
40    /// Insert an entry into the cache. Should not break cache validity.
41    fn insert(&mut self, key: Self::Key, value: Self::Value) -> Option<Self::Value>;
42
43    /// Delete an entry from the cache. Should not break cache validity.
44    fn delete(&mut self, key: &Self::Key) -> Option<Self::Value>;
45
46    /// Apply a batch of operations to the cache. Should not break cache validity.
47    fn apply_batch(&mut self, batch: impl IntoIterator<Item = (Op, Self::Key, Self::Value)>);
48
49    /// Clear the cache.
50    fn clear(&mut self);
51
52    /// Iterate over the values in the cache.
53    fn values(&self) -> impl Iterator<Item = &Self::Value>;
54
55    /// Get the reference of first key-value pair in the cache.
56    fn first_key_value(&self) -> Option<(&Self::Key, &Self::Value)>;
57}
58
59pub trait StateCacheFiller {
60    type Key: Ord;
61    type Value;
62
63    /// Get the capacity of the cache.
64    fn capacity(&self) -> Option<usize>;
65
66    /// Insert an entry into the cache without cache validity check.
67    fn insert_unchecked(&mut self, key: Self::Key, value: Self::Value);
68
69    /// Finish syncing the cache with the state table. This should mark the cache as synced.
70    fn finish(self);
71}