1use std::sync::Arc;
16
17use risingwave_hummock_sdk::level::{InputLevel, Levels};
18use risingwave_hummock_sdk::sstable_info::SstableInfo;
19use risingwave_pb::hummock::LevelType;
20
21use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
22use crate::hummock::compaction::overlap_strategy::OverlapStrategy;
23use crate::hummock::level_handler::LevelHandler;
24
25pub struct MinOverlappingPicker {
26 level: usize,
27 target_level: usize,
28 max_select_bytes: u64,
29 vnode_partition_count: u32,
30 overlap_strategy: Arc<dyn OverlapStrategy>,
31}
32
33impl MinOverlappingPicker {
34 pub fn new(
35 level: usize,
36 target_level: usize,
37 max_select_bytes: u64,
38 vnode_partition_count: u32,
39 overlap_strategy: Arc<dyn OverlapStrategy>,
40 ) -> MinOverlappingPicker {
41 MinOverlappingPicker {
42 level,
43 target_level,
44 max_select_bytes,
45 vnode_partition_count,
46 overlap_strategy,
47 }
48 }
49
50 pub fn pick_tables(
51 &self,
52 select_tables: &[SstableInfo],
53 target_tables: &[SstableInfo],
54 level_handlers: &[LevelHandler],
55 ) -> (Vec<SstableInfo>, Vec<SstableInfo>) {
56 let mut select_file_ranges = vec![];
57 for (idx, sst) in select_tables.iter().enumerate() {
58 if level_handlers[self.level].is_pending_compact(&sst.sst_id) {
59 continue;
60 }
61 let mut overlap_info = self.overlap_strategy.create_overlap_info();
62 overlap_info.update(&sst.key_range);
63 let overlap_files_range = overlap_info.check_multiple_overlap(target_tables);
64
65 if overlap_files_range.is_empty() {
66 return (vec![sst.clone()], vec![]);
67 }
68 select_file_ranges.push((idx, overlap_files_range));
69 }
70 select_file_ranges.retain(|(_, range)| {
71 let mut pending_compact = false;
72 for other in &target_tables[range.clone()] {
73 if level_handlers[self.target_level].is_pending_compact(&other.sst_id) {
74 pending_compact = true;
75 break;
76 }
77 }
78 !pending_compact
79 });
80
81 let mut min_score = u64::MAX;
82 let mut min_score_select_range = 0..0;
83 let mut min_score_target_range = 0..0;
84 let mut min_score_select_file_size = 0;
85 for left in 0..select_file_ranges.len() {
86 let mut select_file_size = 0;
87 let mut target_level_overlap_range = select_file_ranges[left].1.clone();
88 let mut total_file_size = 0;
89 for other in &target_tables[target_level_overlap_range.clone()] {
90 total_file_size += other.sst_size;
91 }
92 let start_idx = select_file_ranges[left].0;
93 let mut end_idx = start_idx + 1;
94 for (idx, range) in select_file_ranges.iter().skip(left) {
95 if select_file_size > self.max_select_bytes
96 || *idx > end_idx
97 || range.start >= target_level_overlap_range.end
98 {
99 break;
100 }
101 select_file_size += select_tables[*idx].sst_size;
102 if range.end > target_level_overlap_range.end {
103 for other in &target_tables[target_level_overlap_range.end..range.end] {
104 total_file_size += other.sst_size;
105 }
106 target_level_overlap_range.end = range.end;
107 }
108 let score = (total_file_size * 100)
109 .checked_div(select_file_size)
110 .unwrap_or(total_file_size);
111 end_idx = idx + 1;
112 if score < min_score
113 || (score == min_score && select_file_size < min_score_select_file_size)
114 {
115 min_score = score;
116 min_score_select_range = start_idx..end_idx;
117 min_score_target_range = target_level_overlap_range.clone();
118 min_score_select_file_size = select_file_size;
119 }
120 }
121 }
122 if min_score == u64::MAX {
123 return (vec![], vec![]);
124 }
125 let select_input_ssts = select_tables[min_score_select_range].to_vec();
126 let target_input_ssts = target_tables[min_score_target_range].to_vec();
127 (select_input_ssts, target_input_ssts)
128 }
129}
130
131impl CompactionPicker for MinOverlappingPicker {
132 fn pick_compaction(
133 &mut self,
134 levels: &Levels,
135 level_handlers: &[LevelHandler],
136 stats: &mut LocalPickerStatistic,
137 ) -> Option<CompactionInput> {
138 assert!(self.level > 0);
139 let (select_input_ssts, target_input_ssts) = self.pick_tables(
140 &levels.get_level(self.level).table_infos,
141 &levels.get_level(self.target_level).table_infos,
142 level_handlers,
143 );
144 if select_input_ssts.is_empty() {
145 stats.skip_by_pending_files += 1;
146 return None;
147 }
148 Some(CompactionInput {
149 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
150 target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
151 total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
152 input_levels: vec![
153 InputLevel {
154 level_idx: self.level as u32,
155 level_type: LevelType::Nonoverlapping,
156 table_infos: select_input_ssts,
157 },
158 InputLevel {
159 level_idx: self.target_level as u32,
160 level_type: LevelType::Nonoverlapping,
161 table_infos: target_input_ssts,
162 },
163 ],
164 target_level: self.target_level,
165 vnode_partition_count: self.vnode_partition_count,
166 ..Default::default()
167 })
168 }
169}
170
171#[cfg(test)]
172pub mod tests {
173 use risingwave_hummock_sdk::level::Level;
174
175 use super::*;
176 use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
177 use crate::hummock::compaction::selector::tests::{
178 generate_l0_nonoverlapping_sublevels, generate_table,
179 };
180
181 #[test]
182 fn test_compact_l1() {
183 let mut picker =
184 MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
185 let levels = vec![
186 Level {
187 level_idx: 1,
188 level_type: LevelType::Nonoverlapping,
189 table_infos: vec![
190 generate_table(0, 1, 0, 100, 1),
191 generate_table(1, 1, 101, 200, 1),
192 generate_table(2, 1, 222, 300, 1),
193 ],
194 ..Default::default()
195 },
196 Level {
197 level_idx: 2,
198 level_type: LevelType::Nonoverlapping,
199 table_infos: vec![
200 generate_table(4, 1, 0, 100, 1),
201 generate_table(5, 1, 101, 150, 1),
202 generate_table(6, 1, 151, 201, 1),
203 generate_table(7, 1, 501, 800, 1),
204 generate_table(8, 2, 301, 400, 1),
205 ],
206 ..Default::default()
207 },
208 ];
209 let levels = Levels {
210 levels,
211 l0: generate_l0_nonoverlapping_sublevels(vec![]),
212 ..Default::default()
213 };
214 let mut level_handlers = vec![
215 LevelHandler::new(0),
216 LevelHandler::new(1),
217 LevelHandler::new(2),
218 ];
219
220 let mut local_stats = LocalPickerStatistic::default();
223 let ret = picker
224 .pick_compaction(&levels, &level_handlers, &mut local_stats)
225 .unwrap();
226 assert_eq!(ret.input_levels[0].level_idx, 1);
227 assert_eq!(ret.target_level, 2);
228 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
229 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 2);
230 assert_eq!(ret.input_levels[1].table_infos.len(), 0);
231 ret.add_pending_task(0, &mut level_handlers);
232
233 let ret = picker
234 .pick_compaction(&levels, &level_handlers, &mut local_stats)
235 .unwrap();
236 assert_eq!(ret.input_levels[0].level_idx, 1);
237 assert_eq!(ret.target_level, 2);
238 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
239 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
240 assert_eq!(ret.input_levels[1].table_infos.len(), 1);
241 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
242 ret.add_pending_task(1, &mut level_handlers);
243
244 let ret = picker
245 .pick_compaction(&levels, &level_handlers, &mut local_stats)
246 .unwrap();
247 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
248 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 1);
249 assert_eq!(ret.input_levels[1].table_infos.len(), 2);
250 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
251 }
252
253 #[test]
254 fn test_expand_l1_files() {
255 let mut picker =
256 MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
257 let levels = vec![
258 Level {
259 level_idx: 1,
260 level_type: LevelType::Nonoverlapping,
261 table_infos: vec![
262 generate_table(0, 1, 50, 99, 2),
263 generate_table(1, 1, 100, 149, 2),
264 generate_table(2, 1, 150, 249, 2),
265 ],
266 ..Default::default()
267 },
268 Level {
269 level_idx: 2,
270 level_type: LevelType::Nonoverlapping,
271 table_infos: vec![
272 generate_table(4, 1, 50, 199, 1),
273 generate_table(5, 1, 200, 399, 1),
274 ],
275 ..Default::default()
276 },
277 ];
278 let levels = Levels {
279 levels,
280 l0: generate_l0_nonoverlapping_sublevels(vec![]),
281 ..Default::default()
282 };
283 let levels_handler = vec![
284 LevelHandler::new(0),
285 LevelHandler::new(1),
286 LevelHandler::new(2),
287 ];
288
289 let ret = picker
292 .pick_compaction(
293 &levels,
294 &levels_handler,
295 &mut LocalPickerStatistic::default(),
296 )
297 .unwrap();
298 assert_eq!(ret.input_levels[0].level_idx, 1);
299 assert_eq!(ret.input_levels[1].level_idx, 2);
300
301 assert_eq!(ret.input_levels[0].table_infos.len(), 2);
302 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
303 assert_eq!(ret.input_levels[0].table_infos[1].sst_id, 1);
304
305 assert_eq!(ret.input_levels[1].table_infos.len(), 1);
306 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
307 }
308
309 #[test]
310 fn test_trivial_move_bug() {
311 let levels = [
312 Level {
313 level_idx: 1,
314 level_type: LevelType::Nonoverlapping,
315 table_infos: vec![generate_table(0, 1, 400, 500, 2)],
316 total_file_size: 100,
317 ..Default::default()
318 },
319 Level {
320 level_idx: 2,
321 level_type: LevelType::Nonoverlapping,
322 table_infos: vec![
323 generate_table(1, 1, 100, 200, 1),
324 generate_table(2, 1, 600, 700, 1),
325 ],
326 total_file_size: 200,
327 ..Default::default()
328 },
329 Level {
330 level_idx: 3,
331 level_type: LevelType::Nonoverlapping,
332 table_infos: vec![
333 generate_table(3, 1, 100, 300, 2),
334 generate_table(4, 1, 600, 800, 1),
335 ],
336 total_file_size: 400,
337 ..Default::default()
338 },
339 ];
340
341 let levels_handlers = vec![
342 LevelHandler::new(0),
343 LevelHandler::new(1),
344 LevelHandler::new(2),
345 LevelHandler::new(3),
346 ];
347 let picker =
349 MinOverlappingPicker::new(2, 3, 1000, 0, Arc::new(RangeOverlapStrategy::default()));
350 let (select_files, target_files) = picker.pick_tables(
351 &levels[1].table_infos,
352 &levels[2].table_infos,
353 &levels_handlers,
354 );
355 let overlap_strategy = Arc::new(RangeOverlapStrategy::default());
356 let mut overlap_info = overlap_strategy.create_overlap_info();
357 for sst in &select_files {
358 overlap_info.update(&sst.key_range);
359 }
360 let range = overlap_info.check_multiple_overlap(&levels[0].table_infos);
361 assert!(range.is_empty());
362 assert_eq!(select_files.len(), 1);
363 assert_eq!(target_files.len(), 1);
364 }
365}