1use std::sync::Arc;
16
17use risingwave_hummock_sdk::level::{InputLevel, Levels, OverlappingLevel};
18use risingwave_pb::hummock::{CompactionConfig, LevelType};
19
20use super::{
21 CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
22 ValidationRuleType,
23};
24use crate::hummock::level_handler::LevelHandler;
25
26pub struct TierCompactionPicker {
27 config: Arc<CompactionConfig>,
28 compaction_task_validator: Arc<CompactionTaskValidator>,
29}
30
31impl TierCompactionPicker {
32 #[cfg(test)]
33 pub fn new(config: Arc<CompactionConfig>) -> TierCompactionPicker {
34 TierCompactionPicker {
35 compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
36 config,
37 }
38 }
39
40 pub fn new_with_validator(
41 config: Arc<CompactionConfig>,
42 compaction_task_validator: Arc<CompactionTaskValidator>,
43 ) -> TierCompactionPicker {
44 TierCompactionPicker {
45 config,
46 compaction_task_validator,
47 }
48 }
49
50 fn pick_overlapping_level(
51 &self,
52 l0: &OverlappingLevel,
53 level_handler: &LevelHandler,
54 mut vnode_partition_count: u32,
55 stats: &mut LocalPickerStatistic,
56 ) -> Option<CompactionInput> {
57 for (idx, level) in l0.sub_levels.iter().enumerate() {
58 if level.level_type != LevelType::Overlapping {
59 continue;
60 }
61
62 if level.table_infos.is_empty() {
63 continue;
64 }
65
66 if level_handler.is_level_pending_compact(level) {
67 continue;
68 }
69
70 let input_level = InputLevel {
71 level_idx: 0,
72 level_type: level.level_type,
73 table_infos: level.table_infos.clone(),
74 };
75
76 let mut select_level_inputs = vec![input_level];
77
78 let max_compaction_bytes = std::cmp::min(
81 self.config.max_compaction_bytes,
82 self.config.sub_level_max_compaction_bytes
83 * self.config.level0_overlapping_sub_level_compact_level_count as u64,
84 );
85
86 let mut compaction_bytes = level.total_file_size;
87 let mut compact_file_count = level.table_infos.len() as u64;
88 let overlapping_max_compact_file_numer = self.config.level0_max_compact_file_number;
90
91 for other in &l0.sub_levels[idx + 1..] {
92 if compaction_bytes > max_compaction_bytes {
93 break;
94 }
95
96 if compact_file_count > overlapping_max_compact_file_numer {
97 break;
98 }
99
100 if level_handler.is_level_pending_compact(other) {
101 break;
102 }
103
104 compaction_bytes += other.total_file_size;
105 compact_file_count += other.table_infos.len() as u64;
106 select_level_inputs.push(InputLevel {
107 level_idx: 0,
108 level_type: other.level_type,
109 table_infos: other.table_infos.clone(),
110 });
111 }
112
113 select_level_inputs.reverse();
114 if compaction_bytes < self.config.sub_level_max_compaction_bytes / 2 {
115 vnode_partition_count = 0;
116 }
117
118 let result = CompactionInput {
119 input_levels: select_level_inputs,
120 target_level: 0,
121 target_sub_level_id: level.sub_level_id,
122 select_input_size: compaction_bytes,
123 target_input_size: 0,
124 total_file_count: compact_file_count,
125 vnode_partition_count,
126 skip_target_range_conflict_check: true,
127 };
128
129 if !self.compaction_task_validator.valid_compact_task(
130 &result,
131 ValidationRuleType::Tier,
132 stats,
133 ) {
134 continue;
135 }
136
137 return Some(result);
138 }
139 None
140 }
141}
142
143impl CompactionPicker for TierCompactionPicker {
144 fn pick_compaction(
145 &mut self,
146 levels: &Levels,
147 level_handlers: &[LevelHandler],
148 stats: &mut LocalPickerStatistic,
149 ) -> Option<CompactionInput> {
150 let l0 = &levels.l0;
151 if l0.sub_levels.is_empty() {
152 return None;
153 }
154
155 self.pick_overlapping_level(
156 l0,
157 &level_handlers[0],
158 self.config.split_weight_by_vnode,
159 stats,
160 )
161 }
162}
163
164#[cfg(test)]
165pub mod tests {
166 use std::sync::Arc;
167
168 use risingwave_hummock_sdk::compaction_group::hummock_version_ext::new_sub_level;
169 use risingwave_hummock_sdk::level::{Levels, OverlappingLevel};
170 use risingwave_pb::hummock::LevelType;
171
172 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
173 use crate::hummock::compaction::picker::{
174 CompactionPicker, LocalPickerStatistic, TierCompactionPicker,
175 };
176 use crate::hummock::compaction::selector::tests::{
177 generate_l0_overlapping_sublevels, generate_table, push_table_level0_overlapping,
178 };
179 use crate::hummock::level_handler::LevelHandler;
180
181 #[test]
182 fn test_pick_whole_level_basic() {
183 let l0 = generate_l0_overlapping_sublevels(vec![
184 vec![
185 generate_table(1, 1, 100, 200, 1),
186 generate_table(2, 1, 150, 250, 1),
187 ],
188 vec![generate_table(3, 1, 10, 90, 1)],
189 vec![
190 generate_table(4, 1, 100, 200, 1),
191 generate_table(5, 1, 50, 150, 1),
192 ],
193 vec![
194 generate_table(6, 1, 100, 200, 1),
195 generate_table(7, 1, 50, 150, 1),
196 ],
197 ]);
198 let levels = Levels {
199 l0,
200 levels: vec![],
201 ..Default::default()
202 };
203 let levels_handler = vec![LevelHandler::new(0)];
204 let config = Arc::new(
205 CompactionConfigBuilder::new()
206 .level0_tier_compact_file_number(2)
207 .level0_sub_level_compact_level_count(2)
208 .level0_overlapping_sub_level_compact_level_count(4)
209 .build(),
210 );
211 let mut picker = TierCompactionPicker::new(config);
212 let mut local_stats = LocalPickerStatistic::default();
213 let ret = picker
214 .pick_compaction(&levels, &levels_handler, &mut local_stats)
215 .unwrap();
216 assert_eq!(ret.input_levels.len(), 4);
217 assert_eq!(
218 ret.input_levels
219 .iter()
220 .map(|i| i.table_infos.len())
221 .sum::<usize>(),
222 7
223 );
224
225 let empty_level = Levels {
226 l0: generate_l0_overlapping_sublevels(vec![]),
227 levels: vec![],
228 ..Default::default()
229 };
230 assert!(
231 picker
232 .pick_compaction(&empty_level, &levels_handler, &mut local_stats)
233 .is_none()
234 );
235 }
236
237 #[test]
238 fn test_pick_whole_level_skip_sublevel() {
239 let l0 = generate_l0_overlapping_sublevels(vec![
240 vec![
241 generate_table(4, 1, 10, 90, 1),
242 generate_table(5, 1, 200, 220, 1),
243 ],
244 vec![generate_table(6, 1, 1, 100, 1)],
245 vec![generate_table(7, 1, 1, 100, 1)],
246 ]);
247
248 let levels = Levels {
249 l0,
250 levels: vec![],
251 ..Default::default()
252 };
253 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
254 let config = Arc::new(
255 CompactionConfigBuilder::new()
256 .level0_tier_compact_file_number(2)
257 .sub_level_max_compaction_bytes(500)
258 .max_compaction_bytes(500000)
259 .level0_sub_level_compact_level_count(2)
260 .level0_overlapping_sub_level_compact_level_count(4)
261 .build(),
262 );
263
264 let mut local_stats = LocalPickerStatistic::default();
265 let mut picker = TierCompactionPicker::new(config);
268 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
269 assert!(ret.is_none());
270 }
271
272 #[test]
273 fn test_write_amp_bug_skip() {
274 let l1 = new_sub_level(
275 1,
276 LevelType::Nonoverlapping,
277 vec![
278 generate_table(3, 1, 1, 50, 1),
279 generate_table(4, 1, 51, 100, 1),
280 ],
281 );
282 let l2 = new_sub_level(
283 2,
284 LevelType::Nonoverlapping,
285 vec![
286 generate_table(3, 1, 1, 50, 1),
287 generate_table(4, 1, 51, 200, 1),
288 ],
289 );
290 let levels = Levels {
291 l0: OverlappingLevel {
292 total_file_size: l1.total_file_size + l2.total_file_size,
293 uncompressed_file_size: l1.total_file_size + l2.total_file_size,
294 sub_levels: vec![l1, l2],
295 },
296 levels: vec![],
297 ..Default::default()
298 };
299 let config = Arc::new(
300 CompactionConfigBuilder::new()
301 .level0_tier_compact_file_number(4)
302 .sub_level_max_compaction_bytes(100)
303 .max_compaction_bytes(500000)
304 .build(),
305 );
306 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
307 let mut local_stats = LocalPickerStatistic::default();
308 let mut picker = TierCompactionPicker::new(config);
309 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
310 assert!(ret.is_none());
311 }
312
313 #[test]
314 fn test_pick_overlapping_sublevel_more_than_max_compact_file_number() {
315 let l0 = generate_l0_overlapping_sublevels(vec![vec![
316 generate_table(4, 1, 10, 90, 1),
317 generate_table(5, 1, 200, 220, 1),
318 generate_table(6, 1, 1, 100, 1),
319 generate_table(7, 1, 1, 100, 1),
320 generate_table(8, 1, 1, 100, 1),
321 generate_table(9, 1, 1, 100, 1),
322 generate_table(10, 1, 1, 100, 1),
323 ]]);
324 let mut levels = Levels {
325 l0,
326 levels: vec![],
327 ..Default::default()
328 };
329 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
330 let config = Arc::new(
331 CompactionConfigBuilder::new()
332 .level0_tier_compact_file_number(2)
333 .sub_level_max_compaction_bytes(100)
334 .max_compaction_bytes(500000)
335 .level0_sub_level_compact_level_count(2)
336 .level0_max_compact_file_number(3)
337 .build(),
338 );
339
340 let mut local_stats = LocalPickerStatistic::default();
341 let mut picker = TierCompactionPicker::new(config);
342 let ret = picker
343 .pick_compaction(&levels, &levels_handler, &mut local_stats)
344 .unwrap();
345 assert_eq!(1, ret.input_levels.len());
346
347 push_table_level0_overlapping(&mut levels, generate_table(11, 1, 1, 100, 1));
348 let ret = picker
349 .pick_compaction(&levels, &levels_handler, &mut local_stats)
350 .unwrap();
351 assert_eq!(1, ret.input_levels.len());
352 }
353}