risingwave_storage/hummock/
block_cache.rs1use std::ops::Deref;
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use foyer::{HybridCacheEntry, HybridGetOrFetch};
20use risingwave_common::config::EvictionConfig;
21
22use super::{Block, HummockResult, SstableBlockIndex};
23use crate::hummock::HummockError;
24
25type HybridCachedBlockEntry = HybridCacheEntry<SstableBlockIndex, Box<Block>>;
26
27pub enum BlockEntry {
28 HybridCache(#[allow(dead_code)] HybridCachedBlockEntry),
29 Owned(#[allow(dead_code)] Box<Block>),
30 RefEntry(#[allow(dead_code)] Arc<Block>),
31}
32
33pub struct BlockHolder {
34 _handle: BlockEntry,
35 pub block: *const Block,
36}
37
38impl BlockHolder {
39 pub fn from_ref_block(block: Arc<Block>) -> Self {
40 let ptr = block.as_ref() as *const _;
41 Self {
42 _handle: BlockEntry::RefEntry(block),
43 block: ptr,
44 }
45 }
46
47 pub fn from_owned_block(block: Box<Block>) -> Self {
48 let ptr = block.as_ref() as *const _;
49 Self {
50 _handle: BlockEntry::Owned(block),
51 block: ptr,
52 }
53 }
54
55 pub fn from_hybrid_cache_entry(entry: HybridCachedBlockEntry) -> Self {
56 let ptr = entry.value().as_ref() as *const _;
57 Self {
58 _handle: BlockEntry::HybridCache(entry),
59 block: ptr,
60 }
61 }
62
63 pub fn entry(&self) -> &BlockEntry {
64 &self._handle
65 }
66}
67
68impl Deref for BlockHolder {
69 type Target = Block;
70
71 fn deref(&self) -> &Self::Target {
72 unsafe { &(*self.block) }
73 }
74}
75
76unsafe impl Send for BlockHolder {}
77unsafe impl Sync for BlockHolder {}
78
79#[derive(Debug)]
80pub struct BlockCacheConfig {
81 pub capacity: usize,
82 pub shard_num: usize,
83 pub eviction: EvictionConfig,
84}
85
86pub enum BlockResponse {
87 Block(BlockHolder),
88 Fetch(HybridGetOrFetch<SstableBlockIndex, Box<Block>>),
89}
90
91impl BlockResponse {
92 pub async fn wait(self) -> HummockResult<BlockHolder> {
93 let fetch = match self {
94 BlockResponse::Block(block) => return Ok(block),
95 BlockResponse::Fetch(fetch) => fetch,
96 };
97 let fetch = match fetch.try_unwrap() {
98 Ok(entry) => return Ok(BlockHolder::from_hybrid_cache_entry(entry)),
99 Err(fetch) => fetch,
100 };
101 fetch
102 .instrument_await("wait_pending_fetch_block".verbose())
103 .await
104 .map(BlockHolder::from_hybrid_cache_entry)
105 .map_err(HummockError::foyer_error)
106 }
107}