risingwave_storage/hummock/
time_travel_version_cache.rs1use std::future::Future;
16use std::pin::Pin;
17
18use futures::FutureExt;
19use futures::future::Shared;
20use moka::sync::Cache;
21use risingwave_hummock_sdk::HummockEpoch;
22
23use crate::hummock::HummockResult;
24use crate::hummock::local_version::pinned_version::PinnedVersion;
25
26type InflightResult = Shared<Pin<Box<dyn Future<Output = HummockResult<PinnedVersion>> + Send>>>;
27
28pub struct SimpleTimeTravelVersionCache {
30 cache: Cache<(u32, HummockEpoch), InflightResult>,
31}
32
33impl SimpleTimeTravelVersionCache {
34 pub fn new(capacity: u64) -> Self {
35 let cache = Cache::builder().max_capacity(capacity).build();
36 Self { cache }
37 }
38
39 pub async fn get_or_insert(
40 &self,
41 table_id: u32,
42 epoch: HummockEpoch,
43 fetch: impl Future<Output = HummockResult<PinnedVersion>> + Send + 'static,
44 ) -> HummockResult<PinnedVersion> {
45 self.cache
46 .entry((table_id, epoch))
47 .or_insert_with_if(
48 || fetch.boxed().shared(),
49 |inflight| {
50 if let Some(result) = inflight.peek() {
51 return result.is_err();
52 }
53 false
54 },
55 )
56 .value()
57 .clone()
58 .await
59 }
60}