risingwave_storage/hummock/compactor/
context.rsuse std::sync::Arc;
use super::task_progress::TaskProgressManagerRef;
use crate::hummock::compactor::CompactionExecutor;
use crate::hummock::sstable_store::SstableStoreRef;
use crate::hummock::MemoryLimiter;
use crate::monitor::CompactorMetrics;
use crate::opts::StorageOpts;
pub type CompactionAwaitTreeRegRef = await_tree::Registry;
pub fn new_compaction_await_tree_reg_ref(config: await_tree::Config) -> CompactionAwaitTreeRegRef {
await_tree::Registry::new(config)
}
pub mod await_tree_key {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Compaction {
CompactRunner { task_id: u64, split_index: usize },
CompactSharedBuffer { id: usize },
SpawnUploadTask { id: usize },
MergingTask { id: usize },
}
pub use Compaction::*;
}
#[derive(Clone)]
pub struct CompactorContext {
pub storage_opts: Arc<StorageOpts>,
pub sstable_store: SstableStoreRef,
pub compactor_metrics: Arc<CompactorMetrics>,
pub is_share_buffer_compact: bool,
pub compaction_executor: Arc<CompactionExecutor>,
pub memory_limiter: Arc<MemoryLimiter>,
pub task_progress_manager: TaskProgressManagerRef,
pub await_tree_reg: Option<CompactionAwaitTreeRegRef>,
}
impl CompactorContext {
pub fn new_local_compact_context(
storage_opts: Arc<StorageOpts>,
sstable_store: SstableStoreRef,
compactor_metrics: Arc<CompactorMetrics>,
await_tree_reg: Option<CompactionAwaitTreeRegRef>,
) -> Self {
let compaction_executor = if storage_opts.share_buffer_compaction_worker_threads_number == 0
{
Arc::new(CompactionExecutor::new(None))
} else {
Arc::new(CompactionExecutor::new(Some(
storage_opts.share_buffer_compaction_worker_threads_number as usize,
)))
};
Self {
storage_opts,
sstable_store,
compactor_metrics,
is_share_buffer_compact: true,
compaction_executor,
memory_limiter: MemoryLimiter::unlimit(),
task_progress_manager: Default::default(),
await_tree_reg,
}
}
}