1use std::sync::Arc;
16
17use risingwave_hummock_sdk::level::{InputLevel, Levels};
18use risingwave_hummock_sdk::sstable_info::SstableInfo;
19use risingwave_pb::hummock::LevelType;
20
21use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
22use crate::hummock::compaction::overlap_strategy::OverlapStrategy;
23use crate::hummock::level_handler::LevelHandler;
24
25pub struct MinOverlappingPicker {
26 level: usize,
27 target_level: usize,
28 max_select_bytes: u64,
29 vnode_partition_count: u32,
30 overlap_strategy: Arc<dyn OverlapStrategy>,
31}
32
33impl MinOverlappingPicker {
34 pub fn new(
35 level: usize,
36 target_level: usize,
37 max_select_bytes: u64,
38 vnode_partition_count: u32,
39 overlap_strategy: Arc<dyn OverlapStrategy>,
40 ) -> MinOverlappingPicker {
41 MinOverlappingPicker {
42 level,
43 target_level,
44 max_select_bytes,
45 vnode_partition_count,
46 overlap_strategy,
47 }
48 }
49
50 pub fn pick_tables(
51 &self,
52 select_tables: &[SstableInfo],
53 target_tables: &[SstableInfo],
54 level_handlers: &[LevelHandler],
55 ) -> (Vec<SstableInfo>, Vec<SstableInfo>) {
56 let mut select_file_ranges = vec![];
57 for (idx, sst) in select_tables.iter().enumerate() {
58 if level_handlers[self.level].is_pending_compact(&sst.sst_id) {
59 continue;
60 }
61 let mut overlap_info = self.overlap_strategy.create_overlap_info();
62 overlap_info.update(&sst.key_range);
63 let overlap_files_range = overlap_info.check_multiple_overlap(target_tables);
64
65 if overlap_files_range.is_empty() {
66 return (vec![sst.clone()], vec![]);
67 }
68 select_file_ranges.push((idx, overlap_files_range));
69 }
70 select_file_ranges.retain(|(_, range)| {
71 let mut pending_compact = false;
72 for other in &target_tables[range.clone()] {
73 if level_handlers[self.target_level].is_pending_compact(&other.sst_id) {
74 pending_compact = true;
75 break;
76 }
77 }
78 !pending_compact
79 });
80
81 let mut min_score = u64::MAX;
82 let mut min_score_select_range = 0..0;
83 let mut min_score_target_range = 0..0;
84 let mut min_score_select_file_size = 0;
85 for left in 0..select_file_ranges.len() {
86 let mut select_file_size = 0;
87 let mut target_level_overlap_range = select_file_ranges[left].1.clone();
88 let mut total_file_size = 0;
89 for other in &target_tables[target_level_overlap_range.clone()] {
90 total_file_size += other.sst_size;
91 }
92 let start_idx = select_file_ranges[left].0;
93 let mut end_idx = start_idx + 1;
94 for (idx, range) in select_file_ranges.iter().skip(left) {
95 if select_file_size > self.max_select_bytes
96 || *idx > end_idx
97 || range.start >= target_level_overlap_range.end
98 {
99 break;
100 }
101 select_file_size += select_tables[*idx].sst_size;
102 if range.end > target_level_overlap_range.end {
103 for other in &target_tables[target_level_overlap_range.end..range.end] {
104 total_file_size += other.sst_size;
105 }
106 target_level_overlap_range.end = range.end;
107 }
108 let score = if select_file_size == 0 {
109 total_file_size
110 } else {
111 total_file_size * 100 / select_file_size
112 };
113 end_idx = idx + 1;
114 if score < min_score
115 || (score == min_score && select_file_size < min_score_select_file_size)
116 {
117 min_score = score;
118 min_score_select_range = start_idx..end_idx;
119 min_score_target_range = target_level_overlap_range.clone();
120 min_score_select_file_size = select_file_size;
121 }
122 }
123 }
124 if min_score == u64::MAX {
125 return (vec![], vec![]);
126 }
127 let select_input_ssts = select_tables[min_score_select_range].to_vec();
128 let target_input_ssts = target_tables[min_score_target_range].to_vec();
129 (select_input_ssts, target_input_ssts)
130 }
131}
132
133impl CompactionPicker for MinOverlappingPicker {
134 fn pick_compaction(
135 &mut self,
136 levels: &Levels,
137 level_handlers: &[LevelHandler],
138 stats: &mut LocalPickerStatistic,
139 ) -> Option<CompactionInput> {
140 assert!(self.level > 0);
141 let (select_input_ssts, target_input_ssts) = self.pick_tables(
142 &levels.get_level(self.level).table_infos,
143 &levels.get_level(self.target_level).table_infos,
144 level_handlers,
145 );
146 if select_input_ssts.is_empty() {
147 stats.skip_by_pending_files += 1;
148 return None;
149 }
150 Some(CompactionInput {
151 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
152 target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
153 total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
154 input_levels: vec![
155 InputLevel {
156 level_idx: self.level as u32,
157 level_type: LevelType::Nonoverlapping,
158 table_infos: select_input_ssts,
159 },
160 InputLevel {
161 level_idx: self.target_level as u32,
162 level_type: LevelType::Nonoverlapping,
163 table_infos: target_input_ssts,
164 },
165 ],
166 target_level: self.target_level,
167 vnode_partition_count: self.vnode_partition_count,
168 ..Default::default()
169 })
170 }
171}
172
173#[cfg(test)]
174pub mod tests {
175 use risingwave_hummock_sdk::level::Level;
176
177 use super::*;
178 use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
179 use crate::hummock::compaction::selector::tests::{
180 generate_l0_nonoverlapping_sublevels, generate_table,
181 };
182
183 #[test]
184 fn test_compact_l1() {
185 let mut picker =
186 MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
187 let levels = vec![
188 Level {
189 level_idx: 1,
190 level_type: LevelType::Nonoverlapping,
191 table_infos: vec![
192 generate_table(0, 1, 0, 100, 1),
193 generate_table(1, 1, 101, 200, 1),
194 generate_table(2, 1, 222, 300, 1),
195 ],
196 ..Default::default()
197 },
198 Level {
199 level_idx: 2,
200 level_type: LevelType::Nonoverlapping,
201 table_infos: vec![
202 generate_table(4, 1, 0, 100, 1),
203 generate_table(5, 1, 101, 150, 1),
204 generate_table(6, 1, 151, 201, 1),
205 generate_table(7, 1, 501, 800, 1),
206 generate_table(8, 2, 301, 400, 1),
207 ],
208 ..Default::default()
209 },
210 ];
211 let levels = Levels {
212 levels,
213 l0: generate_l0_nonoverlapping_sublevels(vec![]),
214 ..Default::default()
215 };
216 let mut level_handlers = vec![
217 LevelHandler::new(0),
218 LevelHandler::new(1),
219 LevelHandler::new(2),
220 ];
221
222 let mut local_stats = LocalPickerStatistic::default();
225 let ret = picker
226 .pick_compaction(&levels, &level_handlers, &mut local_stats)
227 .unwrap();
228 assert_eq!(ret.input_levels[0].level_idx, 1);
229 assert_eq!(ret.target_level, 2);
230 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
231 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 2);
232 assert_eq!(ret.input_levels[1].table_infos.len(), 0);
233 ret.add_pending_task(0, &mut level_handlers);
234
235 let ret = picker
236 .pick_compaction(&levels, &level_handlers, &mut local_stats)
237 .unwrap();
238 assert_eq!(ret.input_levels[0].level_idx, 1);
239 assert_eq!(ret.target_level, 2);
240 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
241 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
242 assert_eq!(ret.input_levels[1].table_infos.len(), 1);
243 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
244 ret.add_pending_task(1, &mut level_handlers);
245
246 let ret = picker
247 .pick_compaction(&levels, &level_handlers, &mut local_stats)
248 .unwrap();
249 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
250 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 1);
251 assert_eq!(ret.input_levels[1].table_infos.len(), 2);
252 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
253 }
254
255 #[test]
256 fn test_expand_l1_files() {
257 let mut picker =
258 MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
259 let levels = vec![
260 Level {
261 level_idx: 1,
262 level_type: LevelType::Nonoverlapping,
263 table_infos: vec![
264 generate_table(0, 1, 50, 99, 2),
265 generate_table(1, 1, 100, 149, 2),
266 generate_table(2, 1, 150, 249, 2),
267 ],
268 ..Default::default()
269 },
270 Level {
271 level_idx: 2,
272 level_type: LevelType::Nonoverlapping,
273 table_infos: vec![
274 generate_table(4, 1, 50, 199, 1),
275 generate_table(5, 1, 200, 399, 1),
276 ],
277 ..Default::default()
278 },
279 ];
280 let levels = Levels {
281 levels,
282 l0: generate_l0_nonoverlapping_sublevels(vec![]),
283 ..Default::default()
284 };
285 let levels_handler = vec![
286 LevelHandler::new(0),
287 LevelHandler::new(1),
288 LevelHandler::new(2),
289 ];
290
291 let ret = picker
294 .pick_compaction(
295 &levels,
296 &levels_handler,
297 &mut LocalPickerStatistic::default(),
298 )
299 .unwrap();
300 assert_eq!(ret.input_levels[0].level_idx, 1);
301 assert_eq!(ret.input_levels[1].level_idx, 2);
302
303 assert_eq!(ret.input_levels[0].table_infos.len(), 2);
304 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
305 assert_eq!(ret.input_levels[0].table_infos[1].sst_id, 1);
306
307 assert_eq!(ret.input_levels[1].table_infos.len(), 1);
308 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
309 }
310
311 #[test]
312 fn test_trivial_move_bug() {
313 let levels = [
314 Level {
315 level_idx: 1,
316 level_type: LevelType::Nonoverlapping,
317 table_infos: vec![generate_table(0, 1, 400, 500, 2)],
318 total_file_size: 100,
319 ..Default::default()
320 },
321 Level {
322 level_idx: 2,
323 level_type: LevelType::Nonoverlapping,
324 table_infos: vec![
325 generate_table(1, 1, 100, 200, 1),
326 generate_table(2, 1, 600, 700, 1),
327 ],
328 total_file_size: 200,
329 ..Default::default()
330 },
331 Level {
332 level_idx: 3,
333 level_type: LevelType::Nonoverlapping,
334 table_infos: vec![
335 generate_table(3, 1, 100, 300, 2),
336 generate_table(4, 1, 600, 800, 1),
337 ],
338 total_file_size: 400,
339 ..Default::default()
340 },
341 ];
342
343 let levels_handlers = vec![
344 LevelHandler::new(0),
345 LevelHandler::new(1),
346 LevelHandler::new(2),
347 LevelHandler::new(3),
348 ];
349 let picker =
351 MinOverlappingPicker::new(2, 3, 1000, 0, Arc::new(RangeOverlapStrategy::default()));
352 let (select_files, target_files) = picker.pick_tables(
353 &levels[1].table_infos,
354 &levels[2].table_infos,
355 &levels_handlers,
356 );
357 let overlap_strategy = Arc::new(RangeOverlapStrategy::default());
358 let mut overlap_info = overlap_strategy.create_overlap_info();
359 for sst in &select_files {
360 overlap_info.update(&sst.key_range);
361 }
362 let range = overlap_info.check_multiple_overlap(&levels[0].table_infos);
363 assert!(range.is_empty());
364 assert_eq!(select_files.len(), 1);
365 assert_eq!(target_files.len(), 1);
366 }
367}