risingwave_storage/hummock/
block_cache.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use foyer::{HybridCacheEntry, HybridGetOrFetch};
20use risingwave_common::config::EvictionConfig;
21
22use super::{Block, HummockResult, SstableBlockIndex};
23use crate::hummock::HummockError;
24
25type HybridCachedBlockEntry = HybridCacheEntry<SstableBlockIndex, Box<Block>>;
26
27pub enum BlockEntry {
28    HybridCache(#[allow(dead_code)] HybridCachedBlockEntry),
29    Owned(#[allow(dead_code)] Box<Block>),
30    RefEntry(#[allow(dead_code)] Arc<Block>),
31}
32
33pub struct BlockHolder {
34    _handle: BlockEntry,
35    pub block: *const Block,
36}
37
38impl BlockHolder {
39    pub fn from_ref_block(block: Arc<Block>) -> Self {
40        let ptr = block.as_ref() as *const _;
41        Self {
42            _handle: BlockEntry::RefEntry(block),
43            block: ptr,
44        }
45    }
46
47    pub fn from_owned_block(block: Box<Block>) -> Self {
48        let ptr = block.as_ref() as *const _;
49        Self {
50            _handle: BlockEntry::Owned(block),
51            block: ptr,
52        }
53    }
54
55    pub fn from_hybrid_cache_entry(entry: HybridCachedBlockEntry) -> Self {
56        let ptr = entry.value().as_ref() as *const _;
57        Self {
58            _handle: BlockEntry::HybridCache(entry),
59            block: ptr,
60        }
61    }
62
63    pub fn entry(&self) -> &BlockEntry {
64        &self._handle
65    }
66}
67
68impl Deref for BlockHolder {
69    type Target = Block;
70
71    fn deref(&self) -> &Self::Target {
72        unsafe { &(*self.block) }
73    }
74}
75
76unsafe impl Send for BlockHolder {}
77unsafe impl Sync for BlockHolder {}
78
79#[derive(Debug)]
80pub struct BlockCacheConfig {
81    pub capacity: usize,
82    pub shard_num: usize,
83    pub eviction: EvictionConfig,
84}
85
86pub enum BlockResponse {
87    Block(BlockHolder),
88    Fetch(HybridGetOrFetch<SstableBlockIndex, Box<Block>>),
89}
90
91impl BlockResponse {
92    pub async fn wait(self) -> HummockResult<BlockHolder> {
93        let fetch = match self {
94            BlockResponse::Block(block) => return Ok(block),
95            BlockResponse::Fetch(fetch) => fetch,
96        };
97        let fetch = match fetch.try_unwrap() {
98            Ok(entry) => return Ok(BlockHolder::from_hybrid_cache_entry(entry)),
99            Err(fetch) => fetch,
100        };
101        fetch
102            .instrument_await("wait_pending_fetch_block".verbose())
103            .await
104            .map(BlockHolder::from_hybrid_cache_entry)
105            .map_err(HummockError::foyer_error)
106    }
107}