risingwave_storage/hummock/
block_cache.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use foyer::{FetchState, HybridCacheEntry, HybridFetch};
20use risingwave_common::config::EvictionConfig;
21
22use super::{Block, HummockResult, SstableBlockIndex};
23use crate::hummock::HummockError;
24
25type HybridCachedBlockEntry = HybridCacheEntry<SstableBlockIndex, Box<Block>>;
26
27enum BlockEntry {
28    HybridCache(#[allow(dead_code)] HybridCachedBlockEntry),
29    Owned(#[allow(dead_code)] Box<Block>),
30    RefEntry(#[allow(dead_code)] Arc<Block>),
31}
32
33pub struct BlockHolder {
34    _handle: BlockEntry,
35    pub block: *const Block,
36}
37
38impl BlockHolder {
39    pub fn from_ref_block(block: Arc<Block>) -> Self {
40        let ptr = block.as_ref() as *const _;
41        Self {
42            _handle: BlockEntry::RefEntry(block),
43            block: ptr,
44        }
45    }
46
47    pub fn from_owned_block(block: Box<Block>) -> Self {
48        let ptr = block.as_ref() as *const _;
49        Self {
50            _handle: BlockEntry::Owned(block),
51            block: ptr,
52        }
53    }
54
55    pub fn from_hybrid_cache_entry(entry: HybridCachedBlockEntry) -> Self {
56        let ptr = entry.value().as_ref() as *const _;
57        Self {
58            _handle: BlockEntry::HybridCache(entry),
59            block: ptr,
60        }
61    }
62}
63
64impl Deref for BlockHolder {
65    type Target = Block;
66
67    fn deref(&self) -> &Self::Target {
68        unsafe { &(*self.block) }
69    }
70}
71
72unsafe impl Send for BlockHolder {}
73unsafe impl Sync for BlockHolder {}
74
75#[derive(Debug)]
76pub struct BlockCacheConfig {
77    pub capacity: usize,
78    pub shard_num: usize,
79    pub eviction: EvictionConfig,
80}
81
82pub enum BlockResponse {
83    Block(BlockHolder),
84    Entry(HybridFetch<SstableBlockIndex, Box<Block>>),
85}
86
87impl BlockResponse {
88    pub async fn wait(self) -> HummockResult<BlockHolder> {
89        let entry = match self {
90            BlockResponse::Block(block) => return Ok(block),
91            BlockResponse::Entry(entry) => entry,
92        };
93        match entry.state() {
94            FetchState::Hit => entry
95                .await
96                .map(BlockHolder::from_hybrid_cache_entry)
97                .map_err(HummockError::foyer_error),
98            FetchState::Wait => entry
99                .instrument_await("wait_pending_fetch_block".verbose())
100                .await
101                .map(BlockHolder::from_hybrid_cache_entry)
102                .map_err(HummockError::foyer_error),
103            FetchState::Miss => entry
104                .instrument_await("fetch_block".verbose())
105                .await
106                .map(BlockHolder::from_hybrid_cache_entry)
107                .map_err(HummockError::foyer_error),
108        }
109    }
110}