risingwave_meta/hummock/compaction/picker/
emergency_compaction_picker.rsuse std::sync::Arc;
use risingwave_hummock_sdk::level::Levels;
use risingwave_pb::hummock::{CompactionConfig, LevelType};
use super::{
CompactionInput, CompactionPicker, CompactionTaskValidator, LevelCompactionPicker,
LocalPickerStatistic, TierCompactionPicker,
};
use crate::hummock::compaction::picker::intra_compaction_picker::WholeLevelCompactionPicker;
use crate::hummock::compaction::CompactionDeveloperConfig;
use crate::hummock::level_handler::LevelHandler;
pub struct EmergencyCompactionPicker {
target_level: usize,
config: Arc<CompactionConfig>,
developer_config: Arc<CompactionDeveloperConfig>,
}
impl EmergencyCompactionPicker {
pub fn new(
target_level: usize,
config: Arc<CompactionConfig>,
developer_config: Arc<CompactionDeveloperConfig>,
) -> Self {
Self {
target_level,
config,
developer_config,
}
}
pub fn pick_compaction(
&self,
levels: &Levels,
level_handlers: &[LevelHandler],
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
let unused_validator = Arc::new(CompactionTaskValidator::unused());
let l0 = &levels.l0;
let overlapping_count = l0
.sub_levels
.iter()
.filter(|level| level.level_type == LevelType::Overlapping)
.count();
let no_overlap_count = l0
.sub_levels
.iter()
.filter(|level| {
level.level_type == LevelType::Nonoverlapping && level.vnode_partition_count == 0
})
.count();
let partitioned_count = l0
.sub_levels
.iter()
.filter(|level| {
level.level_type == LevelType::Nonoverlapping && level.vnode_partition_count > 0
})
.count();
if (self.config.split_weight_by_vnode == 0 && no_overlap_count > overlapping_count)
|| (self.config.split_weight_by_vnode > 0
&& partitioned_count > no_overlap_count
&& partitioned_count > overlapping_count)
{
let mut base_level_compaction_picker = LevelCompactionPicker::new_with_validator(
self.target_level,
self.config.clone(),
unused_validator.clone(),
self.developer_config.clone(),
);
if let Some(ret) =
base_level_compaction_picker.pick_compaction(levels, level_handlers, stats)
{
return Some(ret);
}
}
if self.config.split_weight_by_vnode > 0
&& no_overlap_count > partitioned_count
&& no_overlap_count > overlapping_count
{
let intral_level_compaction_picker =
WholeLevelCompactionPicker::new(self.config.clone(), unused_validator.clone());
if let Some(ret) = intral_level_compaction_picker.pick_whole_level(
&levels.l0,
&level_handlers[0],
self.config.split_weight_by_vnode,
stats,
) {
return Some(ret);
}
}
let mut tier_compaction_picker =
TierCompactionPicker::new_with_validator(self.config.clone(), unused_validator);
tier_compaction_picker.pick_compaction(levels, level_handlers, stats)
}
}