1use std::sync::Arc;
16
17use risingwave_hummock_sdk::level::{InputLevel, Levels, OverlappingLevel};
18use risingwave_pb::hummock::{CompactionConfig, LevelType};
19
20use super::{
21 CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
22 ValidationRuleType,
23};
24use crate::hummock::level_handler::LevelHandler;
25
26pub struct TierCompactionPicker {
27 config: Arc<CompactionConfig>,
28 compaction_task_validator: Arc<CompactionTaskValidator>,
29}
30
31impl TierCompactionPicker {
32 #[cfg(test)]
33 pub fn new(config: Arc<CompactionConfig>) -> TierCompactionPicker {
34 TierCompactionPicker {
35 compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
36 config,
37 }
38 }
39
40 pub fn new_with_validator(
41 config: Arc<CompactionConfig>,
42 compaction_task_validator: Arc<CompactionTaskValidator>,
43 ) -> TierCompactionPicker {
44 TierCompactionPicker {
45 config,
46 compaction_task_validator,
47 }
48 }
49
50 fn pick_overlapping_level(
51 &self,
52 l0: &OverlappingLevel,
53 level_handler: &LevelHandler,
54 mut vnode_partition_count: u32,
55 stats: &mut LocalPickerStatistic,
56 ) -> Option<CompactionInput> {
57 for (idx, level) in l0.sub_levels.iter().enumerate() {
58 if level.level_type != LevelType::Overlapping {
59 continue;
60 }
61
62 if level.table_infos.is_empty() {
63 continue;
64 }
65
66 if level_handler.is_level_pending_compact(level) {
67 continue;
68 }
69
70 let input_level = InputLevel {
71 level_idx: 0,
72 level_type: level.level_type,
73 table_infos: level.table_infos.clone(),
74 };
75
76 let mut select_level_inputs = vec![input_level];
77
78 let max_compaction_bytes = std::cmp::min(
81 self.config.max_compaction_bytes,
82 self.config.sub_level_max_compaction_bytes
83 * self.config.level0_overlapping_sub_level_compact_level_count as u64,
84 );
85
86 let mut compaction_bytes = level.total_file_size;
87 let mut compact_file_count = level.table_infos.len() as u64;
88 let overlapping_max_compact_file_numer = self.config.level0_max_compact_file_number;
90
91 for other in &l0.sub_levels[idx + 1..] {
92 if compaction_bytes > max_compaction_bytes {
93 break;
94 }
95
96 if compact_file_count > overlapping_max_compact_file_numer {
97 break;
98 }
99
100 if level_handler.is_level_pending_compact(other) {
101 break;
102 }
103
104 compaction_bytes += other.total_file_size;
105 compact_file_count += other.table_infos.len() as u64;
106 select_level_inputs.push(InputLevel {
107 level_idx: 0,
108 level_type: other.level_type,
109 table_infos: other.table_infos.clone(),
110 });
111 }
112
113 select_level_inputs.reverse();
114 if compaction_bytes < self.config.sub_level_max_compaction_bytes / 2 {
115 vnode_partition_count = 0;
116 }
117
118 let result = CompactionInput {
119 input_levels: select_level_inputs,
120 target_level: 0,
121 target_sub_level_id: level.sub_level_id,
122 select_input_size: compaction_bytes,
123 target_input_size: 0,
124 total_file_count: compact_file_count,
125 vnode_partition_count,
126 };
127
128 if !self.compaction_task_validator.valid_compact_task(
129 &result,
130 ValidationRuleType::Tier,
131 stats,
132 ) {
133 continue;
134 }
135
136 return Some(result);
137 }
138 None
139 }
140}
141
142impl CompactionPicker for TierCompactionPicker {
143 fn pick_compaction(
144 &mut self,
145 levels: &Levels,
146 level_handlers: &[LevelHandler],
147 stats: &mut LocalPickerStatistic,
148 ) -> Option<CompactionInput> {
149 let l0 = &levels.l0;
150 if l0.sub_levels.is_empty() {
151 return None;
152 }
153
154 self.pick_overlapping_level(
155 l0,
156 &level_handlers[0],
157 self.config.split_weight_by_vnode,
158 stats,
159 )
160 }
161}
162
163#[cfg(test)]
164pub mod tests {
165 use std::sync::Arc;
166
167 use risingwave_hummock_sdk::compaction_group::hummock_version_ext::new_sub_level;
168 use risingwave_hummock_sdk::level::{Levels, OverlappingLevel};
169 use risingwave_pb::hummock::LevelType;
170
171 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
172 use crate::hummock::compaction::picker::{
173 CompactionPicker, LocalPickerStatistic, TierCompactionPicker,
174 };
175 use crate::hummock::compaction::selector::tests::{
176 generate_l0_overlapping_sublevels, generate_table, push_table_level0_overlapping,
177 };
178 use crate::hummock::level_handler::LevelHandler;
179
180 #[test]
181 fn test_pick_whole_level_basic() {
182 let l0 = generate_l0_overlapping_sublevels(vec![
183 vec![
184 generate_table(1, 1, 100, 200, 1),
185 generate_table(2, 1, 150, 250, 1),
186 ],
187 vec![generate_table(3, 1, 10, 90, 1)],
188 vec![
189 generate_table(4, 1, 100, 200, 1),
190 generate_table(5, 1, 50, 150, 1),
191 ],
192 vec![
193 generate_table(6, 1, 100, 200, 1),
194 generate_table(7, 1, 50, 150, 1),
195 ],
196 ]);
197 let levels = Levels {
198 l0,
199 levels: vec![],
200 ..Default::default()
201 };
202 let levels_handler = vec![LevelHandler::new(0)];
203 let config = Arc::new(
204 CompactionConfigBuilder::new()
205 .level0_tier_compact_file_number(2)
206 .level0_sub_level_compact_level_count(2)
207 .level0_overlapping_sub_level_compact_level_count(4)
208 .build(),
209 );
210 let mut picker = TierCompactionPicker::new(config);
211 let mut local_stats = LocalPickerStatistic::default();
212 let ret = picker
213 .pick_compaction(&levels, &levels_handler, &mut local_stats)
214 .unwrap();
215 assert_eq!(ret.input_levels.len(), 4);
216 assert_eq!(
217 ret.input_levels
218 .iter()
219 .map(|i| i.table_infos.len())
220 .sum::<usize>(),
221 7
222 );
223
224 let empty_level = Levels {
225 l0: generate_l0_overlapping_sublevels(vec![]),
226 levels: vec![],
227 ..Default::default()
228 };
229 assert!(
230 picker
231 .pick_compaction(&empty_level, &levels_handler, &mut local_stats)
232 .is_none()
233 );
234 }
235
236 #[test]
237 fn test_pick_whole_level_skip_sublevel() {
238 let l0 = generate_l0_overlapping_sublevels(vec![
239 vec![
240 generate_table(4, 1, 10, 90, 1),
241 generate_table(5, 1, 200, 220, 1),
242 ],
243 vec![generate_table(6, 1, 1, 100, 1)],
244 vec![generate_table(7, 1, 1, 100, 1)],
245 ]);
246
247 let levels = Levels {
248 l0,
249 levels: vec![],
250 ..Default::default()
251 };
252 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
253 let config = Arc::new(
254 CompactionConfigBuilder::new()
255 .level0_tier_compact_file_number(2)
256 .sub_level_max_compaction_bytes(500)
257 .max_compaction_bytes(500000)
258 .level0_sub_level_compact_level_count(2)
259 .level0_overlapping_sub_level_compact_level_count(4)
260 .build(),
261 );
262
263 let mut local_stats = LocalPickerStatistic::default();
264 let mut picker = TierCompactionPicker::new(config);
267 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
268 assert!(ret.is_none());
269 }
270
271 #[test]
272 fn test_write_amp_bug_skip() {
273 let l1 = new_sub_level(
274 1,
275 LevelType::Nonoverlapping,
276 vec![
277 generate_table(3, 1, 1, 50, 1),
278 generate_table(4, 1, 51, 100, 1),
279 ],
280 );
281 let l2 = new_sub_level(
282 2,
283 LevelType::Nonoverlapping,
284 vec![
285 generate_table(3, 1, 1, 50, 1),
286 generate_table(4, 1, 51, 200, 1),
287 ],
288 );
289 let levels = Levels {
290 l0: OverlappingLevel {
291 total_file_size: l1.total_file_size + l2.total_file_size,
292 uncompressed_file_size: l1.total_file_size + l2.total_file_size,
293 sub_levels: vec![l1, l2],
294 },
295 levels: vec![],
296 ..Default::default()
297 };
298 let config = Arc::new(
299 CompactionConfigBuilder::new()
300 .level0_tier_compact_file_number(4)
301 .sub_level_max_compaction_bytes(100)
302 .max_compaction_bytes(500000)
303 .build(),
304 );
305 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
306 let mut local_stats = LocalPickerStatistic::default();
307 let mut picker = TierCompactionPicker::new(config);
308 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
309 assert!(ret.is_none());
310 }
311
312 #[test]
313 fn test_pick_overlapping_sublevel_more_than_max_compact_file_number() {
314 let l0 = generate_l0_overlapping_sublevels(vec![vec![
315 generate_table(4, 1, 10, 90, 1),
316 generate_table(5, 1, 200, 220, 1),
317 generate_table(6, 1, 1, 100, 1),
318 generate_table(7, 1, 1, 100, 1),
319 generate_table(8, 1, 1, 100, 1),
320 generate_table(9, 1, 1, 100, 1),
321 generate_table(10, 1, 1, 100, 1),
322 ]]);
323 let mut levels = Levels {
324 l0,
325 levels: vec![],
326 ..Default::default()
327 };
328 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
329 let config = Arc::new(
330 CompactionConfigBuilder::new()
331 .level0_tier_compact_file_number(2)
332 .sub_level_max_compaction_bytes(100)
333 .max_compaction_bytes(500000)
334 .level0_sub_level_compact_level_count(2)
335 .level0_max_compact_file_number(3)
336 .build(),
337 );
338
339 let mut local_stats = LocalPickerStatistic::default();
340 let mut picker = TierCompactionPicker::new(config);
341 let ret = picker
342 .pick_compaction(&levels, &levels_handler, &mut local_stats)
343 .unwrap();
344 assert_eq!(1, ret.input_levels.len());
345
346 push_table_level0_overlapping(&mut levels, generate_table(11, 1, 1, 100, 1));
347 let ret = picker
348 .pick_compaction(&levels, &levels_handler, &mut local_stats)
349 .unwrap();
350 assert_eq!(1, ret.input_levels.len());
351 }
352}