risingwave_storage/hummock/
block_cache.rsuse std::ops::Deref;
use std::sync::Arc;
use await_tree::InstrumentAwait;
use foyer::{FetchState, HybridCacheEntry, HybridFetch};
use risingwave_common::config::EvictionConfig;
use super::{Block, HummockResult, SstableBlockIndex};
use crate::hummock::HummockError;
type HybridCachedBlockEntry = HybridCacheEntry<SstableBlockIndex, Box<Block>>;
enum BlockEntry {
HybridCache(#[allow(dead_code)] HybridCachedBlockEntry),
Owned(#[allow(dead_code)] Box<Block>),
RefEntry(#[allow(dead_code)] Arc<Block>),
}
pub struct BlockHolder {
_handle: BlockEntry,
pub block: *const Block,
}
impl BlockHolder {
pub fn from_ref_block(block: Arc<Block>) -> Self {
let ptr = block.as_ref() as *const _;
Self {
_handle: BlockEntry::RefEntry(block),
block: ptr,
}
}
pub fn from_owned_block(block: Box<Block>) -> Self {
let ptr = block.as_ref() as *const _;
Self {
_handle: BlockEntry::Owned(block),
block: ptr,
}
}
pub fn from_hybrid_cache_entry(entry: HybridCachedBlockEntry) -> Self {
let ptr = entry.value().as_ref() as *const _;
Self {
_handle: BlockEntry::HybridCache(entry),
block: ptr,
}
}
}
impl Deref for BlockHolder {
type Target = Block;
fn deref(&self) -> &Self::Target {
unsafe { &(*self.block) }
}
}
unsafe impl Send for BlockHolder {}
unsafe impl Sync for BlockHolder {}
#[derive(Debug)]
pub struct BlockCacheConfig {
pub capacity: usize,
pub shard_num: usize,
pub eviction: EvictionConfig,
}
pub enum BlockResponse {
Block(BlockHolder),
Entry(HybridFetch<SstableBlockIndex, Box<Block>>),
}
impl BlockResponse {
pub async fn wait(self) -> HummockResult<BlockHolder> {
let entry = match self {
BlockResponse::Block(block) => return Ok(block),
BlockResponse::Entry(entry) => entry,
};
match entry.state() {
FetchState::Hit => entry
.await
.map(BlockHolder::from_hybrid_cache_entry)
.map_err(HummockError::foyer_error),
FetchState::Wait => entry
.verbose_instrument_await("wait_pending_fetch_block")
.await
.map(BlockHolder::from_hybrid_cache_entry)
.map_err(HummockError::foyer_error),
FetchState::Miss => entry
.verbose_instrument_await("fetch_block")
.await
.map(BlockHolder::from_hybrid_cache_entry)
.map_err(HummockError::foyer_error),
}
}
}