risingwave_storage/hummock/
block_cache.rs1use std::ops::Deref;
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use foyer::{FetchState, HybridCacheEntry, HybridFetch};
20use risingwave_common::config::EvictionConfig;
21
22use super::{Block, HummockResult, SstableBlockIndex};
23use crate::hummock::HummockError;
24
25type HybridCachedBlockEntry = HybridCacheEntry<SstableBlockIndex, Box<Block>>;
26
27enum BlockEntry {
28 HybridCache(#[allow(dead_code)] HybridCachedBlockEntry),
29 Owned(#[allow(dead_code)] Box<Block>),
30 RefEntry(#[allow(dead_code)] Arc<Block>),
31}
32
33pub struct BlockHolder {
34 _handle: BlockEntry,
35 pub block: *const Block,
36}
37
38impl BlockHolder {
39 pub fn from_ref_block(block: Arc<Block>) -> Self {
40 let ptr = block.as_ref() as *const _;
41 Self {
42 _handle: BlockEntry::RefEntry(block),
43 block: ptr,
44 }
45 }
46
47 pub fn from_owned_block(block: Box<Block>) -> Self {
48 let ptr = block.as_ref() as *const _;
49 Self {
50 _handle: BlockEntry::Owned(block),
51 block: ptr,
52 }
53 }
54
55 pub fn from_hybrid_cache_entry(entry: HybridCachedBlockEntry) -> Self {
56 let ptr = entry.value().as_ref() as *const _;
57 Self {
58 _handle: BlockEntry::HybridCache(entry),
59 block: ptr,
60 }
61 }
62}
63
64impl Deref for BlockHolder {
65 type Target = Block;
66
67 fn deref(&self) -> &Self::Target {
68 unsafe { &(*self.block) }
69 }
70}
71
72unsafe impl Send for BlockHolder {}
73unsafe impl Sync for BlockHolder {}
74
75#[derive(Debug)]
76pub struct BlockCacheConfig {
77 pub capacity: usize,
78 pub shard_num: usize,
79 pub eviction: EvictionConfig,
80}
81
82pub enum BlockResponse {
83 Block(BlockHolder),
84 Entry(HybridFetch<SstableBlockIndex, Box<Block>>),
85}
86
87impl BlockResponse {
88 pub async fn wait(self) -> HummockResult<BlockHolder> {
89 let entry = match self {
90 BlockResponse::Block(block) => return Ok(block),
91 BlockResponse::Entry(entry) => entry,
92 };
93 match entry.state() {
94 FetchState::Hit => entry
95 .await
96 .map(BlockHolder::from_hybrid_cache_entry)
97 .map_err(HummockError::foyer_error),
98 FetchState::Wait => entry
99 .instrument_await("wait_pending_fetch_block".verbose())
100 .await
101 .map(BlockHolder::from_hybrid_cache_entry)
102 .map_err(HummockError::foyer_error),
103 FetchState::Miss => entry
104 .instrument_await("fetch_block".verbose())
105 .await
106 .map(BlockHolder::from_hybrid_cache_entry)
107 .map_err(HummockError::foyer_error),
108 }
109 }
110}