risingwave_stream/cache/mod.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod managed_lru;
16pub use managed_lru::*;
17use risingwave_common::bitmap::Bitmap;
18use risingwave_common::util::iter_util::ZipEqFast;
19
20/// Returns whether we're unsure about the fressness of the cache after the scaling from the
21/// previous partition to the current one, denoted by vnode bitmaps. If the value is `true`, we must
22/// evict the cache entries that does not belong to the previous partition before further
23/// processing.
24///
25/// TODO: currently most executors simply clear all the cache entries if `true`, we can make the
26/// cache aware of the consistent hashing to avoid unnecessary eviction in the future.
27/// Check this [issue](https://github.com/risingwavelabs/risingwave/issues/5567).
28///
29/// TODO: may encapsulate the logic into `ExecutorCache` when ready.
30///
31/// # Explanation
32/// We use a lazy manner to manipulate the cache. When scaling out, the partition of the existing
33/// executors will likely shrink and becomes a subset of the previous one (to ensure the best
34/// locality). In this case, this function will return `false` and the cache entries that're not in
35/// the current partition anymore are still kept. This achieves the best performance as we won't
36/// touch and valiate the cache at all when scaling-out, which is the common case and the critical
37/// path.
38///
39/// This brings a problem when scaling in after a while. Some partitions may be reassigned back to
40/// the current executor, while the cache entries of these partitions are still unevicted. So it's
41/// possible that these entries have been updated by other executors on other workers, and
42/// the content is now stale! The executor must evict these entries which are not in the
43/// **previous** partition before further processing.
44pub(super) fn cache_may_stale(
45 previous_vnode_bitmap: &Bitmap,
46 current_vnode_bitmap: &Bitmap,
47) -> bool {
48 let current_is_subset = previous_vnode_bitmap
49 .iter()
50 .zip_eq_fast(current_vnode_bitmap.iter())
51 .all(|(p, c)| p >= c);
52
53 !current_is_subset
54}
55#[cfg(test)]
56mod tests {
57 use super::*;
58
59 #[expect(clippy::bool_assert_comparison)]
60 #[test]
61 fn test_cache_may_stale() {
62 let p123 = Bitmap::from_bytes(&[0b_0000_0111]);
63 let p1234 = Bitmap::from_bytes(&[0b_0000_1111]);
64 let p1245 = Bitmap::from_bytes(&[0b_0001_1011]);
65
66 assert_eq!(cache_may_stale(&p123, &p123), false); // unchanged
67 assert_eq!(cache_may_stale(&p1234, &p123), false); // scale-out
68 assert_eq!(cache_may_stale(&p123, &p1234), true); // scale-in
69 assert_eq!(cache_may_stale(&p123, &p1245), true); // scale-in
70 }
71}