risingwave_meta/hummock/compaction/picker/
emergency_compaction_picker.rs1use std::sync::Arc;
16
17use risingwave_hummock_sdk::level::Levels;
18use risingwave_pb::hummock::{CompactionConfig, LevelType};
19
20use super::{
21 CompactionInput, CompactionPicker, CompactionTaskValidator, LevelCompactionPicker,
22 LocalPickerStatistic, TierCompactionPicker,
23};
24use crate::hummock::compaction::CompactionDeveloperConfig;
25use crate::hummock::compaction::picker::intra_compaction_picker::WholeLevelCompactionPicker;
26use crate::hummock::level_handler::LevelHandler;
27
28pub struct EmergencyCompactionPicker {
29 target_level: usize,
30 config: Arc<CompactionConfig>,
31 developer_config: Arc<CompactionDeveloperConfig>,
32}
33
34impl EmergencyCompactionPicker {
35 pub fn new(
36 target_level: usize,
37 config: Arc<CompactionConfig>,
38 developer_config: Arc<CompactionDeveloperConfig>,
39 ) -> Self {
40 Self {
41 target_level,
42 config,
43 developer_config,
44 }
45 }
46
47 pub fn pick_compaction(
48 &self,
49 levels: &Levels,
50 level_handlers: &[LevelHandler],
51 stats: &mut LocalPickerStatistic,
52 ) -> Option<CompactionInput> {
53 let unused_validator = Arc::new(CompactionTaskValidator::unused());
54 let l0 = &levels.l0;
55 let overlapping_count = l0
56 .sub_levels
57 .iter()
58 .filter(|level| level.level_type == LevelType::Overlapping)
59 .count();
60 let no_overlap_count = l0
61 .sub_levels
62 .iter()
63 .filter(|level| {
64 level.level_type == LevelType::Nonoverlapping && level.vnode_partition_count == 0
65 })
66 .count();
67 let partitioned_count = l0
68 .sub_levels
69 .iter()
70 .filter(|level| {
71 level.level_type == LevelType::Nonoverlapping && level.vnode_partition_count > 0
72 })
73 .count();
74 if (self.config.split_weight_by_vnode == 0 && no_overlap_count > overlapping_count)
78 || (self.config.split_weight_by_vnode > 0
79 && partitioned_count > no_overlap_count
80 && partitioned_count > overlapping_count)
81 {
82 let mut base_level_compaction_picker = LevelCompactionPicker::new_with_validator(
83 self.target_level,
84 self.config.clone(),
85 unused_validator.clone(),
86 self.developer_config.clone(),
87 );
88
89 if let Some(ret) =
90 base_level_compaction_picker.pick_compaction(levels, level_handlers, stats)
91 {
92 return Some(ret);
93 }
94 }
95 if self.config.split_weight_by_vnode > 0
96 && no_overlap_count > partitioned_count
97 && no_overlap_count > overlapping_count
98 {
99 let intral_level_compaction_picker =
100 WholeLevelCompactionPicker::new(self.config.clone(), unused_validator.clone());
101
102 if let Some(ret) = intral_level_compaction_picker.pick_whole_level(
103 &levels.l0,
104 &level_handlers[0],
105 self.config.split_weight_by_vnode,
106 stats,
107 ) {
108 return Some(ret);
109 }
110 }
111 let mut tier_compaction_picker =
112 TierCompactionPicker::new_with_validator(self.config.clone(), unused_validator);
113
114 tier_compaction_picker.pick_compaction(levels, level_handlers, stats)
115 }
116}