risingwave_meta/hummock/compaction/picker/
compaction_task_validator.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use risingwave_common::config::default::compaction_config;
19use risingwave_pb::hummock::CompactionConfig;
20
21use super::{CompactionInput, LocalPickerStatistic};
22
23#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
24pub enum ValidationRuleType {
25 Tier = 0,
26 Intra = 1,
27 ToBase = 2,
28}
29
30pub struct CompactionTaskValidator {
31 validation_rules: HashMap<ValidationRuleType, Box<dyn CompactionTaskValidationRule>>,
32}
33
34impl CompactionTaskValidator {
35 pub fn new(config: Arc<CompactionConfig>) -> Self {
36 let mut validation_rules: HashMap<
37 ValidationRuleType,
38 Box<dyn CompactionTaskValidationRule>,
39 > = HashMap::default();
40
41 validation_rules.insert(
42 ValidationRuleType::Tier,
43 Box::new(TierCompactionTaskValidationRule {
44 config: config.clone(),
45 }),
46 );
47
48 validation_rules.insert(
49 ValidationRuleType::Intra,
50 Box::new(IntraCompactionTaskValidationRule {
51 config: config.clone(),
52 }),
53 );
54
55 validation_rules.insert(
56 ValidationRuleType::ToBase,
57 Box::new(BaseCompactionTaskValidationRule { config }),
58 );
59
60 CompactionTaskValidator { validation_rules }
61 }
62
63 pub fn unused() -> Self {
64 CompactionTaskValidator {
65 validation_rules: HashMap::default(),
66 }
67 }
68
69 pub fn valid_compact_task(
70 &self,
71 input: &CompactionInput,
72 picker_type: ValidationRuleType,
73 stats: &mut LocalPickerStatistic,
74 ) -> bool {
75 if let Some(validation_rule) = self.validation_rules.get(&picker_type) {
76 validation_rule.validate(input, stats)
77 } else {
78 true
79 }
80 }
81
82 pub fn is_enable(&self) -> bool {
83 !self.validation_rules.is_empty()
84 }
85}
86
87pub trait CompactionTaskValidationRule {
88 fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool;
89}
90
91struct TierCompactionTaskValidationRule {
92 config: Arc<CompactionConfig>,
93}
94
95impl CompactionTaskValidationRule for TierCompactionTaskValidationRule {
96 fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool {
97 if input.total_file_count >= self.config.level0_max_compact_file_number
98 || input.input_levels.len()
99 >= self
100 .config
101 .max_l0_compact_level_count
102 .unwrap_or(compaction_config::max_l0_compact_level_count())
103 as usize
104 {
105 return true;
106 }
107
108 let max_compaction_bytes = std::cmp::min(
110 self.config.max_compaction_bytes,
111 self.config.sub_level_max_compaction_bytes
112 * self.config.level0_overlapping_sub_level_compact_level_count as u64,
113 );
114
115 let tier_sub_level_compact_level_count =
118 self.config.level0_overlapping_sub_level_compact_level_count as usize;
119 if input.input_levels.len() < tier_sub_level_compact_level_count
120 && input.select_input_size < max_compaction_bytes
121 {
122 stats.skip_by_count_limit += 1;
123 return false;
124 }
125 true
126 }
127}
128
129struct IntraCompactionTaskValidationRule {
130 config: Arc<CompactionConfig>,
131}
132
133impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule {
134 fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool {
135 if (input.total_file_count >= self.config.level0_max_compact_file_number
136 && input.input_levels.len() > 1)
137 || input.input_levels.len()
138 >= self
139 .config
140 .max_l0_compact_level_count
141 .unwrap_or(compaction_config::max_l0_compact_level_count())
142 as usize
143 {
144 return true;
145 }
146
147 let intra_sub_level_compact_level_count =
148 self.config.level0_sub_level_compact_level_count as usize;
149
150 if input.input_levels.len() < intra_sub_level_compact_level_count {
151 stats.skip_by_count_limit += 1;
152 return false;
153 }
154
155 let mut max_level_size = 0;
156 for select_level in &input.input_levels {
157 let level_select_size = select_level
158 .table_infos
159 .iter()
160 .map(|sst| sst.sst_size)
161 .sum::<u64>();
162
163 max_level_size = std::cmp::max(max_level_size, level_select_size);
164 }
165
166 let is_write_amp_large =
171 max_level_size * self.config.level0_sub_level_compact_level_count as u64 / 2
172 >= input.select_input_size;
173
174 if is_write_amp_large {
175 stats.skip_by_write_amp_limit += 1;
176 return false;
177 }
178
179 true
180 }
181}
182
183struct BaseCompactionTaskValidationRule {
184 config: Arc<CompactionConfig>,
185}
186
187impl CompactionTaskValidationRule for BaseCompactionTaskValidationRule {
188 fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool {
189 if input.total_file_count >= self.config.level0_max_compact_file_number
190 || input.input_levels.len()
191 >= self
192 .config
193 .max_l0_compact_level_count
194 .unwrap_or(compaction_config::max_l0_compact_level_count())
195 as usize
196 {
197 return true;
198 }
199
200 if input.target_input_size > self.config.max_compaction_bytes {
203 stats.skip_by_count_limit += 1;
204 return false;
205 }
206
207 if input.select_input_size < input.target_input_size {
208 stats.skip_by_write_amp_limit += 1;
209 return false;
210 }
211
212 true
213 }
214}