1use std::sync::Arc;
16
17use risingwave_hummock_sdk::append_sstable_info_to_string;
18use risingwave_hummock_sdk::level::{InputLevel, Level, Levels};
19use risingwave_hummock_sdk::sstable_info::SstableInfo;
20use risingwave_pb::hummock::LevelType;
21
22use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
23use crate::hummock::compaction::overlap_strategy::OverlapStrategy;
24use crate::hummock::level_handler::LevelHandler;
25
26pub struct MinOverlappingPicker {
27 level: usize,
28 target_level: usize,
29 max_select_bytes: u64,
30 vnode_partition_count: u32,
31 overlap_strategy: Arc<dyn OverlapStrategy>,
32}
33
34impl MinOverlappingPicker {
35 pub fn new(
36 level: usize,
37 target_level: usize,
38 max_select_bytes: u64,
39 vnode_partition_count: u32,
40 overlap_strategy: Arc<dyn OverlapStrategy>,
41 ) -> MinOverlappingPicker {
42 MinOverlappingPicker {
43 level,
44 target_level,
45 max_select_bytes,
46 vnode_partition_count,
47 overlap_strategy,
48 }
49 }
50
51 pub fn pick_tables(
52 &self,
53 select_tables: &[SstableInfo],
54 target_tables: &[SstableInfo],
55 level_handlers: &[LevelHandler],
56 ) -> (Vec<SstableInfo>, Vec<SstableInfo>) {
57 let mut select_file_ranges = vec![];
58 for (idx, sst) in select_tables.iter().enumerate() {
59 if level_handlers[self.level].is_pending_compact(&sst.sst_id) {
60 continue;
61 }
62 let mut overlap_info = self.overlap_strategy.create_overlap_info();
63 overlap_info.update(&sst.key_range);
64 let overlap_files_range = overlap_info.check_multiple_overlap(target_tables);
65
66 if overlap_files_range.is_empty() {
67 return (vec![sst.clone()], vec![]);
68 }
69 select_file_ranges.push((idx, overlap_files_range));
70 }
71 select_file_ranges.retain(|(_, range)| {
72 let mut pending_compact = false;
73 for other in &target_tables[range.clone()] {
74 if level_handlers[self.target_level].is_pending_compact(&other.sst_id) {
75 pending_compact = true;
76 break;
77 }
78 }
79 !pending_compact
80 });
81
82 let mut min_score = u64::MAX;
83 let mut min_score_select_range = 0..0;
84 let mut min_score_target_range = 0..0;
85 let mut min_score_select_file_size = 0;
86 for left in 0..select_file_ranges.len() {
87 let mut select_file_size = 0;
88 let mut target_level_overlap_range = select_file_ranges[left].1.clone();
89 let mut total_file_size = 0;
90 for other in &target_tables[target_level_overlap_range.clone()] {
91 total_file_size += other.sst_size;
92 }
93 let start_idx = select_file_ranges[left].0;
94 let mut end_idx = start_idx + 1;
95 for (idx, range) in select_file_ranges.iter().skip(left) {
96 if select_file_size > self.max_select_bytes
97 || *idx > end_idx
98 || range.start >= target_level_overlap_range.end
99 {
100 break;
101 }
102 select_file_size += select_tables[*idx].sst_size;
103 if range.end > target_level_overlap_range.end {
104 for other in &target_tables[target_level_overlap_range.end..range.end] {
105 total_file_size += other.sst_size;
106 }
107 target_level_overlap_range.end = range.end;
108 }
109 let score = if select_file_size == 0 {
110 total_file_size
111 } else {
112 total_file_size * 100 / select_file_size
113 };
114 end_idx = idx + 1;
115 if score < min_score
116 || (score == min_score && select_file_size < min_score_select_file_size)
117 {
118 min_score = score;
119 min_score_select_range = start_idx..end_idx;
120 min_score_target_range = target_level_overlap_range.clone();
121 min_score_select_file_size = select_file_size;
122 }
123 }
124 }
125 if min_score == u64::MAX {
126 return (vec![], vec![]);
127 }
128 let select_input_ssts = select_tables[min_score_select_range].to_vec();
129 let target_input_ssts = target_tables[min_score_target_range].to_vec();
130 (select_input_ssts, target_input_ssts)
131 }
132}
133
134impl CompactionPicker for MinOverlappingPicker {
135 fn pick_compaction(
136 &mut self,
137 levels: &Levels,
138 level_handlers: &[LevelHandler],
139 stats: &mut LocalPickerStatistic,
140 ) -> Option<CompactionInput> {
141 assert!(self.level > 0);
142 let (select_input_ssts, target_input_ssts) = self.pick_tables(
143 &levels.get_level(self.level).table_infos,
144 &levels.get_level(self.target_level).table_infos,
145 level_handlers,
146 );
147 if select_input_ssts.is_empty() {
148 stats.skip_by_pending_files += 1;
149 return None;
150 }
151 Some(CompactionInput {
152 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
153 target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
154 total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
155 input_levels: vec![
156 InputLevel {
157 level_idx: self.level as u32,
158 level_type: LevelType::Nonoverlapping,
159 table_infos: select_input_ssts,
160 },
161 InputLevel {
162 level_idx: self.target_level as u32,
163 level_type: LevelType::Nonoverlapping,
164 table_infos: target_input_ssts,
165 },
166 ],
167 target_level: self.target_level,
168 vnode_partition_count: self.vnode_partition_count,
169 ..Default::default()
170 })
171 }
172}
173
174#[derive(Default, Debug)]
175pub struct SubLevelSstables {
176 pub total_file_size: u64,
177 pub total_file_count: usize,
178 pub sstable_infos: Vec<(u64, Vec<SstableInfo>)>,
179 pub expected: bool,
180}
181
182pub struct NonOverlapSubLevelPicker {
183 min_compaction_bytes: u64,
184 max_compaction_bytes: u64,
185 min_expected_level_count: usize,
186 max_file_count: u64,
187 overlap_strategy: Arc<dyn OverlapStrategy>,
188 enable_check_task_level_overlap: bool,
189 max_expected_level_count: usize,
190 enable_optimize_l0_interval_selection: bool,
191}
192
193impl NonOverlapSubLevelPicker {
194 pub fn new(
195 min_compaction_bytes: u64,
196 max_compaction_bytes: u64,
197 min_expected_level_count: usize,
198 max_file_count: u64,
199 overlap_strategy: Arc<dyn OverlapStrategy>,
200 enable_check_task_level_overlap: bool,
201 max_expected_level_count: usize,
202 enable_optimize_l0_interval_selection: bool,
203 ) -> Self {
204 Self {
205 min_compaction_bytes,
206 max_compaction_bytes,
207 min_expected_level_count,
208 max_file_count,
209 overlap_strategy,
210 enable_check_task_level_overlap,
211 max_expected_level_count,
212 enable_optimize_l0_interval_selection,
213 }
214 }
215
216 #[cfg(test)]
217 pub fn for_test(
218 min_compaction_bytes: u64,
219 max_compaction_bytes: u64,
220 min_expected_level_count: usize,
221 max_file_count: u64,
222 overlap_strategy: Arc<dyn OverlapStrategy>,
223 enable_check_task_level_overlap: bool,
224 max_expected_level_count: usize,
225 enable_optimize_l0_interval_selection: bool,
226 ) -> Self {
227 Self {
228 min_compaction_bytes,
229 max_compaction_bytes,
230 min_expected_level_count,
231 max_file_count,
232 overlap_strategy,
233 enable_check_task_level_overlap,
234 max_expected_level_count,
235 enable_optimize_l0_interval_selection,
236 }
237 }
238
239 fn pick_sub_level(
245 &self,
246 levels: &[Level],
247 level_handler: &LevelHandler,
248 sst: &SstableInfo,
249 ) -> Option<SubLevelSstables> {
250 let mut ret = SubLevelSstables::default();
251 for sub_level in levels {
252 ret.sstable_infos.push((sub_level.sub_level_id, vec![]));
253 }
254
255 let mut pick_levels_range = Vec::default();
256 let mut max_select_level_count = 0;
257
258 'expand_new_level: for (target_index, target_level) in levels.iter().enumerate() {
261 if target_level.level_type != LevelType::Nonoverlapping {
262 break;
263 }
264
265 if ret
266 .sstable_infos
267 .iter()
268 .filter(|(_sub_level_id, ssts)| !ssts.is_empty())
269 .count()
270 > self.max_expected_level_count
271 {
272 break;
273 }
274
275 let mut basic_overlap_info = self.overlap_strategy.create_overlap_info();
277 basic_overlap_info.update(&sst.key_range);
278
279 let mut overlap_files_range =
280 basic_overlap_info.check_multiple_include(&target_level.table_infos);
281 if overlap_files_range.is_empty() {
282 overlap_files_range =
283 basic_overlap_info.check_multiple_overlap(&target_level.table_infos);
284 }
285
286 if overlap_files_range.is_empty() {
287 continue;
288 }
289
290 let mut overlap_levels = vec![];
291
292 let mut add_files_size: u64 = 0;
293 let mut add_files_count: usize = 0;
294
295 let mut select_level_count = 0;
296 for reverse_index in (0..=target_index).rev() {
297 let target_tables = &levels[reverse_index].table_infos;
298
299 overlap_files_range = if target_index == reverse_index {
300 overlap_files_range
301 } else {
302 basic_overlap_info.check_multiple_overlap(target_tables)
303 };
304
305 if overlap_files_range.is_empty() {
308 continue;
310 }
311
312 for other in &target_tables[overlap_files_range.clone()] {
313 if level_handler.is_pending_compact(&other.sst_id) {
314 break 'expand_new_level;
315 }
316 basic_overlap_info.update(&other.key_range);
317
318 add_files_size += other.sst_size;
319 add_files_count += 1;
320 }
321
322 overlap_levels.push((reverse_index, overlap_files_range.clone()));
323 select_level_count += 1;
324 }
325
326 if select_level_count > max_select_level_count {
327 max_select_level_count = select_level_count;
328 pick_levels_range = overlap_levels;
329 }
330
331 if max_select_level_count >= self.min_expected_level_count
333 && (add_files_size >= self.max_compaction_bytes
334 || add_files_count >= self.max_file_count as usize)
335 {
336 break 'expand_new_level;
337 }
338 }
339
340 if !pick_levels_range.is_empty() {
341 for (reverse_index, sst_range) in pick_levels_range {
342 let level_ssts = &levels[reverse_index].table_infos;
343 ret.sstable_infos[reverse_index].1 = level_ssts[sst_range].to_vec();
344 ret.total_file_count += ret.sstable_infos[reverse_index].1.len();
345 ret.total_file_size += ret.sstable_infos[reverse_index]
346 .1
347 .iter()
348 .map(|sst| sst.sst_size)
349 .sum::<u64>();
350 }
351
352 ret.sstable_infos
354 .iter_mut()
355 .for_each(|(_sub_level_id, level_ssts)| {
356 level_ssts.sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
357 });
358 } else {
359 ret.total_file_count = 0;
360 ret.total_file_size = 0;
361 }
362
363 if self.enable_check_task_level_overlap {
364 self.verify_task_level_overlap(&ret, levels);
365 }
366
367 ret.sstable_infos
368 .retain(|(_sub_level_id, ssts)| !ssts.is_empty());
369
370 if ret.total_file_size > self.max_compaction_bytes
372 || ret.total_file_count as u64 > self.max_file_count
373 || ret.sstable_infos.len() > self.max_expected_level_count
374 {
375 let mut total_file_count = 0;
377 let mut total_file_size = 0;
378 let mut total_level_count = 0;
379 for (index, (_sub_level_id, sstables)) in ret.sstable_infos.iter().enumerate() {
380 total_file_count += sstables.len();
381 total_file_size += sstables.iter().map(|sst| sst.sst_size).sum::<u64>();
382 total_level_count += 1;
383
384 if total_level_count >= self.min_expected_level_count
386 && (total_file_count as u64 >= self.max_file_count
387 || total_file_size >= self.max_compaction_bytes
388 || total_level_count >= self.max_expected_level_count)
389 {
390 ret.total_file_count = total_file_count;
391 ret.total_file_size = total_file_size;
392 ret.sstable_infos.truncate(index + 1);
393 break;
394 }
395 }
396 }
397
398 if ret.sstable_infos.is_empty() {
399 return None;
400 }
401
402 Some(ret)
403 }
404
405 pub fn pick_l0_multi_non_overlap_level(
406 &self,
407 l0: &[Level],
408 level_handler: &LevelHandler,
409 ) -> Vec<SubLevelSstables> {
410 if l0.len() < self.min_expected_level_count {
411 return vec![];
412 }
413
414 let mut scores = vec![];
415 let intervals_level_idx = if self.enable_optimize_l0_interval_selection {
417 l0.iter()
418 .enumerate()
419 .max_by(|(idx1, levels1), (idx2, levels2)| {
420 levels1
421 .table_infos
422 .len()
423 .cmp(&levels2.table_infos.len())
424 .then(idx2.cmp(idx1))
425 })
426 .map(|(idx, _)| idx)
427 .unwrap()
428 } else {
429 0
430 };
431
432 let intervals = &l0[intervals_level_idx].table_infos;
433 for sst in intervals {
434 if level_handler.is_pending_compact(&sst.sst_id) {
435 continue;
436 }
437
438 if let Some(score) = self.pick_sub_level(l0, level_handler, sst) {
439 scores.push(score);
440 }
441 }
442
443 if scores.is_empty() {
444 return vec![];
445 }
446
447 let mut expected = Vec::with_capacity(scores.len());
448 let mut unexpected = vec![];
449
450 for mut selected_task in scores {
451 if selected_task.sstable_infos.len() > self.max_expected_level_count
452 || selected_task.sstable_infos.len() < self.min_expected_level_count
453 || selected_task.total_file_size < self.min_compaction_bytes
454 {
455 selected_task.expected = false;
456 unexpected.push(selected_task);
457 } else {
458 selected_task.expected = true;
459 expected.push(selected_task);
460 }
461 }
462
463 expected.sort_by(|a, b| {
467 b.sstable_infos
468 .len()
469 .cmp(&a.sstable_infos.len())
470 .then_with(|| a.total_file_count.cmp(&b.total_file_count))
471 .then_with(|| a.total_file_size.cmp(&b.total_file_size))
472 });
473
474 unexpected.sort_by(|a, b| {
480 let a_select_count_offset =
481 (a.sstable_infos.len() as i64 - self.max_expected_level_count as i64).abs();
482 let b_select_count_offset =
483 (b.sstable_infos.len() as i64 - self.max_expected_level_count as i64).abs();
484
485 let a_file_count_offset =
486 (a.total_file_count as i64 - self.max_file_count as i64).abs();
487 let b_file_count_offset =
488 (b.total_file_count as i64 - self.max_file_count as i64).abs();
489
490 let a_file_size_offset =
491 (a.total_file_size as i64 - self.max_compaction_bytes as i64).abs();
492 let b_file_size_offset =
493 (b.total_file_size as i64 - self.max_compaction_bytes as i64).abs();
494
495 a_select_count_offset
496 .cmp(&b_select_count_offset)
497 .then_with(|| a_file_count_offset.cmp(&b_file_count_offset))
498 .then_with(|| a_file_size_offset.cmp(&b_file_size_offset))
499 });
500
501 expected.extend(unexpected);
502
503 expected
504 }
505
506 fn verify_task_level_overlap(&self, ret: &SubLevelSstables, levels: &[Level]) {
507 use std::fmt::Write;
508
509 use itertools::Itertools;
510
511 use crate::hummock::compaction::overlap_strategy::OverlapInfo;
512 let mut overlap_info: Option<Box<dyn OverlapInfo>> = None;
513 for (level_idx, (_sub_level_id, ssts)) in ret.sstable_infos.iter().enumerate().rev() {
514 if let Some(overlap_info) = overlap_info.as_mut() {
515 let level = levels.get(level_idx).unwrap();
517 let overlap_sst_range = overlap_info.check_multiple_overlap(&level.table_infos);
518 if !overlap_sst_range.is_empty() {
519 let expected_sst_ids = level.table_infos[overlap_sst_range.clone()]
520 .iter()
521 .map(|s| s.object_id)
522 .collect_vec();
523 let actual_sst_ids = ssts.iter().map(|s| s.object_id).collect_vec();
524 let start_idx = actual_sst_ids
527 .iter()
528 .position(|sst_id| sst_id == expected_sst_ids.first().unwrap());
529 if start_idx.is_none_or(|idx| {
530 actual_sst_ids[idx..idx + expected_sst_ids.len()] != expected_sst_ids
531 }) {
532 let mut actual_sst_infos = String::new();
534 ssts.iter()
535 .for_each(|s| append_sstable_info_to_string(&mut actual_sst_infos, s));
536
537 let mut expected_sst_infos = String::new();
539 level.table_infos[overlap_sst_range.clone()]
540 .iter()
541 .for_each(|s| {
542 append_sstable_info_to_string(&mut expected_sst_infos, s)
543 });
544
545 let mut ret_sst_infos = String::new();
547 ret.sstable_infos.iter().enumerate().for_each(
548 |(idx, (_sub_level_id, ssts))| {
549 writeln!(
550 ret_sst_infos,
551 "sub level {}",
552 levels.get(idx).unwrap().sub_level_id
553 )
554 .unwrap();
555 ssts.iter().for_each(|s| {
556 append_sstable_info_to_string(&mut ret_sst_infos, s)
557 });
558 },
559 );
560 panic!(
561 "Compact task overlap check fails. Actual: {} Expected: {} Ret {}",
562 actual_sst_infos, expected_sst_infos, ret_sst_infos
563 );
564 }
565 }
566 } else if !ssts.is_empty() {
567 overlap_info = Some(self.overlap_strategy.create_overlap_info());
569 }
570
571 for sst in ssts {
572 overlap_info.as_mut().unwrap().update(&sst.key_range);
573 }
574 }
575 }
576}
577
578#[cfg(test)]
579pub mod tests {
580 use std::collections::BTreeSet;
581
582 use risingwave_common::config::default::compaction_config;
583
584 use super::*;
585 use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
586 use crate::hummock::compaction::selector::tests::{
587 generate_l0_nonoverlapping_sublevels, generate_table,
588 };
589
590 #[test]
591 fn test_compact_l1() {
592 let mut picker =
593 MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
594 let levels = vec![
595 Level {
596 level_idx: 1,
597 level_type: LevelType::Nonoverlapping,
598 table_infos: vec![
599 generate_table(0, 1, 0, 100, 1),
600 generate_table(1, 1, 101, 200, 1),
601 generate_table(2, 1, 222, 300, 1),
602 ],
603 ..Default::default()
604 },
605 Level {
606 level_idx: 2,
607 level_type: LevelType::Nonoverlapping,
608 table_infos: vec![
609 generate_table(4, 1, 0, 100, 1),
610 generate_table(5, 1, 101, 150, 1),
611 generate_table(6, 1, 151, 201, 1),
612 generate_table(7, 1, 501, 800, 1),
613 generate_table(8, 2, 301, 400, 1),
614 ],
615 ..Default::default()
616 },
617 ];
618 let levels = Levels {
619 levels,
620 l0: generate_l0_nonoverlapping_sublevels(vec![]),
621 ..Default::default()
622 };
623 let mut level_handlers = vec![
624 LevelHandler::new(0),
625 LevelHandler::new(1),
626 LevelHandler::new(2),
627 ];
628
629 let mut local_stats = LocalPickerStatistic::default();
632 let ret = picker
633 .pick_compaction(&levels, &level_handlers, &mut local_stats)
634 .unwrap();
635 assert_eq!(ret.input_levels[0].level_idx, 1);
636 assert_eq!(ret.target_level, 2);
637 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
638 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 2);
639 assert_eq!(ret.input_levels[1].table_infos.len(), 0);
640 ret.add_pending_task(0, &mut level_handlers);
641
642 let ret = picker
643 .pick_compaction(&levels, &level_handlers, &mut local_stats)
644 .unwrap();
645 assert_eq!(ret.input_levels[0].level_idx, 1);
646 assert_eq!(ret.target_level, 2);
647 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
648 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
649 assert_eq!(ret.input_levels[1].table_infos.len(), 1);
650 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
651 ret.add_pending_task(1, &mut level_handlers);
652
653 let ret = picker
654 .pick_compaction(&levels, &level_handlers, &mut local_stats)
655 .unwrap();
656 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
657 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 1);
658 assert_eq!(ret.input_levels[1].table_infos.len(), 2);
659 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
660 }
661
662 #[test]
663 fn test_expand_l1_files() {
664 let mut picker =
665 MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
666 let levels = vec![
667 Level {
668 level_idx: 1,
669 level_type: LevelType::Nonoverlapping,
670 table_infos: vec![
671 generate_table(0, 1, 50, 99, 2),
672 generate_table(1, 1, 100, 149, 2),
673 generate_table(2, 1, 150, 249, 2),
674 ],
675 ..Default::default()
676 },
677 Level {
678 level_idx: 2,
679 level_type: LevelType::Nonoverlapping,
680 table_infos: vec![
681 generate_table(4, 1, 50, 199, 1),
682 generate_table(5, 1, 200, 399, 1),
683 ],
684 ..Default::default()
685 },
686 ];
687 let levels = Levels {
688 levels,
689 l0: generate_l0_nonoverlapping_sublevels(vec![]),
690 ..Default::default()
691 };
692 let levels_handler = vec![
693 LevelHandler::new(0),
694 LevelHandler::new(1),
695 LevelHandler::new(2),
696 ];
697
698 let ret = picker
701 .pick_compaction(
702 &levels,
703 &levels_handler,
704 &mut LocalPickerStatistic::default(),
705 )
706 .unwrap();
707 assert_eq!(ret.input_levels[0].level_idx, 1);
708 assert_eq!(ret.input_levels[1].level_idx, 2);
709
710 assert_eq!(ret.input_levels[0].table_infos.len(), 2);
711 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
712 assert_eq!(ret.input_levels[0].table_infos[1].sst_id, 1);
713
714 assert_eq!(ret.input_levels[1].table_infos.len(), 1);
715 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
716 }
717
718 #[test]
719 fn test_pick_l0_multi_level() {
720 let levels = vec![
721 Level {
722 level_idx: 1,
723 level_type: LevelType::Nonoverlapping,
724 table_infos: vec![
725 generate_table(0, 1, 50, 99, 2),
726 generate_table(1, 1, 100, 149, 2),
727 generate_table(2, 1, 150, 249, 2),
728 generate_table(6, 1, 250, 300, 2),
729 generate_table(7, 1, 350, 400, 2),
730 generate_table(8, 1, 450, 500, 2),
731 ],
732 total_file_size: 800,
733 ..Default::default()
734 },
735 Level {
736 level_idx: 2,
737 level_type: LevelType::Nonoverlapping,
738 table_infos: vec![
739 generate_table(4, 1, 50, 199, 1),
740 generate_table(5, 1, 200, 249, 1),
741 generate_table(9, 1, 250, 300, 2),
742 generate_table(10, 1, 350, 400, 2),
743 generate_table(11, 1, 450, 500, 2),
744 ],
745 total_file_size: 350,
746 ..Default::default()
747 },
748 Level {
749 level_idx: 3,
750 level_type: LevelType::Nonoverlapping,
751 table_infos: vec![
752 generate_table(11, 1, 250, 300, 2),
753 generate_table(12, 1, 350, 400, 2),
754 generate_table(13, 1, 450, 500, 2),
755 ],
756 total_file_size: 150,
757 ..Default::default()
758 },
759 Level {
760 level_idx: 4,
761 level_type: LevelType::Nonoverlapping,
762 table_infos: vec![
763 generate_table(14, 1, 250, 300, 2),
764 generate_table(15, 1, 350, 400, 2),
765 generate_table(16, 1, 450, 500, 2),
766 ],
767 total_file_size: 150,
768 ..Default::default()
769 },
770 ];
771
772 let levels_handlers = vec![
773 LevelHandler::new(0),
774 LevelHandler::new(1),
775 LevelHandler::new(2),
776 ];
777
778 {
779 let picker = NonOverlapSubLevelPicker::new(
781 0,
782 10000,
783 1,
784 10000,
785 Arc::new(RangeOverlapStrategy::default()),
786 true,
787 compaction_config::max_l0_compact_level_count() as usize,
788 true,
789 );
790 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
791 assert_eq!(6, ret.len());
792 }
793
794 {
795 let picker = NonOverlapSubLevelPicker::new(
797 0,
798 100,
799 1,
800 10000,
801 Arc::new(RangeOverlapStrategy::default()),
802 true,
803 compaction_config::max_l0_compact_level_count() as usize,
804 true,
805 );
806 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
807 assert_eq!(6, ret.len());
808 }
809
810 {
811 let picker = NonOverlapSubLevelPicker::new(
813 0,
814 10000,
815 1,
816 5,
817 Arc::new(RangeOverlapStrategy::default()),
818 true,
819 compaction_config::max_l0_compact_level_count() as usize,
820 true,
821 );
822 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
823 assert_eq!(6, ret.len());
824 }
825 }
826
827 #[test]
828 fn test_pick_l0_multi_level2() {
829 let levels = vec![
830 Level {
831 level_idx: 1,
832 level_type: LevelType::Nonoverlapping,
833 table_infos: vec![
834 generate_table(0, 1, 50, 99, 2),
835 generate_table(1, 1, 100, 149, 2),
836 generate_table(2, 1, 150, 249, 2),
837 generate_table(6, 1, 250, 300, 2),
838 generate_table(7, 1, 350, 400, 2),
839 generate_table(8, 1, 450, 500, 2),
840 ],
841 total_file_size: 800,
842 ..Default::default()
843 },
844 Level {
845 level_idx: 2,
846 level_type: LevelType::Nonoverlapping,
847 table_infos: vec![
848 generate_table(4, 1, 50, 99, 1),
849 generate_table(5, 1, 150, 200, 1),
850 generate_table(9, 1, 250, 300, 2),
851 generate_table(10, 1, 350, 400, 2),
852 generate_table(11, 1, 450, 500, 2),
853 ],
854 total_file_size: 250,
855 ..Default::default()
856 },
857 Level {
858 level_idx: 3,
859 level_type: LevelType::Nonoverlapping,
860 table_infos: vec![
861 generate_table(11, 1, 250, 300, 2),
862 generate_table(12, 1, 350, 400, 2),
863 generate_table(13, 1, 450, 500, 2),
864 ],
865 total_file_size: 150,
866 ..Default::default()
867 },
868 Level {
869 level_idx: 4,
870 level_type: LevelType::Nonoverlapping,
871 table_infos: vec![
872 generate_table(14, 1, 250, 300, 2),
873 generate_table(15, 1, 350, 400, 2),
874 generate_table(16, 1, 450, 500, 2),
875 ],
876 total_file_size: 150,
877 ..Default::default()
878 },
879 ];
880
881 let levels_handlers = vec![
882 LevelHandler::new(0),
883 LevelHandler::new(1),
884 LevelHandler::new(2),
885 ];
886
887 {
888 let picker = NonOverlapSubLevelPicker::new(
890 0,
891 10000,
892 1,
893 10000,
894 Arc::new(RangeOverlapStrategy::default()),
895 true,
896 compaction_config::max_l0_compact_level_count() as usize,
897 true,
898 );
899 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
900 assert_eq!(6, ret.len());
901 }
902
903 {
904 let max_compaction_bytes = 100;
906 let picker = NonOverlapSubLevelPicker::new(
907 60,
908 max_compaction_bytes,
909 1,
910 10000,
911 Arc::new(RangeOverlapStrategy::default()),
912 true,
913 compaction_config::max_l0_compact_level_count() as usize,
914 true,
915 );
916 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
917 assert_eq!(6, ret.len());
918
919 for plan in &ret {
920 if plan.total_file_size >= max_compaction_bytes {
921 assert!(plan.expected);
922 } else {
923 assert!(!plan.expected);
924 }
925 }
926 }
927
928 {
929 let max_file_count = 2;
931 let picker = NonOverlapSubLevelPicker::new(
932 0,
933 10000,
934 1,
935 max_file_count,
936 Arc::new(RangeOverlapStrategy::default()),
937 true,
938 compaction_config::max_l0_compact_level_count() as usize,
939 true,
940 );
941 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
942 assert_eq!(6, ret.len());
943
944 for plan in ret {
945 let mut sst_id_set = BTreeSet::default();
946 for sst in &plan.sstable_infos {
947 sst_id_set.insert(sst.1[0].sst_id);
948 }
949 assert!(sst_id_set.len() <= max_file_count as usize);
950 }
951 }
952
953 {
954 let min_depth = 3;
956 let picker = NonOverlapSubLevelPicker::new(
957 10,
958 10000,
959 min_depth,
960 100,
961 Arc::new(RangeOverlapStrategy::default()),
962 true,
963 compaction_config::max_l0_compact_level_count() as usize,
964 true,
965 );
966 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
967 assert_eq!(6, ret.len());
968
969 for plan in ret {
970 if plan.sstable_infos.len() >= min_depth {
971 assert!(plan.expected);
972 } else {
973 assert!(!plan.expected);
974 }
975 }
976 }
977 }
978
979 #[test]
980 fn test_trivial_move_bug() {
981 let levels = [
982 Level {
983 level_idx: 1,
984 level_type: LevelType::Nonoverlapping,
985 table_infos: vec![generate_table(0, 1, 400, 500, 2)],
986 total_file_size: 100,
987 ..Default::default()
988 },
989 Level {
990 level_idx: 2,
991 level_type: LevelType::Nonoverlapping,
992 table_infos: vec![
993 generate_table(1, 1, 100, 200, 1),
994 generate_table(2, 1, 600, 700, 1),
995 ],
996 total_file_size: 200,
997 ..Default::default()
998 },
999 Level {
1000 level_idx: 3,
1001 level_type: LevelType::Nonoverlapping,
1002 table_infos: vec![
1003 generate_table(3, 1, 100, 300, 2),
1004 generate_table(4, 1, 600, 800, 1),
1005 ],
1006 total_file_size: 400,
1007 ..Default::default()
1008 },
1009 ];
1010
1011 let levels_handlers = vec![
1012 LevelHandler::new(0),
1013 LevelHandler::new(1),
1014 LevelHandler::new(2),
1015 LevelHandler::new(3),
1016 ];
1017 let picker =
1019 MinOverlappingPicker::new(2, 3, 1000, 0, Arc::new(RangeOverlapStrategy::default()));
1020 let (select_files, target_files) = picker.pick_tables(
1021 &levels[1].table_infos,
1022 &levels[2].table_infos,
1023 &levels_handlers,
1024 );
1025 let overlap_strategy = Arc::new(RangeOverlapStrategy::default());
1026 let mut overlap_info = overlap_strategy.create_overlap_info();
1027 for sst in &select_files {
1028 overlap_info.update(&sst.key_range);
1029 }
1030 let range = overlap_info.check_multiple_overlap(&levels[0].table_infos);
1031 assert!(range.is_empty());
1032 assert_eq!(select_files.len(), 1);
1033 assert_eq!(target_files.len(), 1);
1034 }
1035
1036 #[test]
1037 fn test_pick_unexpected_task() {
1038 let levels = vec![
1039 Level {
1040 level_idx: 1,
1041 level_type: LevelType::Nonoverlapping,
1042 table_infos: vec![generate_table(0, 1, 50, 100, 2)], total_file_size: 50,
1044 ..Default::default()
1045 },
1046 Level {
1047 level_idx: 2,
1048 level_type: LevelType::Nonoverlapping,
1049 table_infos: vec![
1050 generate_table(1, 1, 101, 150, 1), ],
1052 total_file_size: 50,
1053 ..Default::default()
1054 },
1055 Level {
1056 level_idx: 3,
1057 level_type: LevelType::Nonoverlapping,
1058 table_infos: vec![
1059 generate_table(2, 1, 151, 200, 2), ],
1061 total_file_size: 50,
1062 ..Default::default()
1063 },
1064 Level {
1065 level_idx: 4,
1066 level_type: LevelType::Nonoverlapping,
1067 table_infos: vec![
1068 generate_table(3, 1, 50, 300, 2), ],
1070 total_file_size: 250,
1071 ..Default::default()
1072 },
1073 ];
1074
1075 let levels_handlers = vec![
1076 LevelHandler::new(0),
1077 LevelHandler::new(1),
1078 LevelHandler::new(2),
1079 LevelHandler::new(3),
1080 LevelHandler::new(4),
1081 ];
1082
1083 {
1084 let picker = NonOverlapSubLevelPicker::new(
1086 0,
1087 10000,
1088 1,
1089 10000,
1090 Arc::new(RangeOverlapStrategy::default()),
1091 true,
1092 compaction_config::max_l0_compact_level_count() as usize,
1093 true,
1094 );
1095 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1096 {
1097 let plan = &ret[0];
1098 assert_eq!(4, plan.sstable_infos.len());
1099 }
1100 }
1101
1102 {
1103 let picker = NonOverlapSubLevelPicker::new(
1105 0,
1106 150,
1107 1,
1108 10000,
1109 Arc::new(RangeOverlapStrategy::default()),
1110 true,
1111 compaction_config::max_l0_compact_level_count() as usize,
1112 true,
1113 );
1114 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1115 {
1116 let plan = &ret[0];
1117 assert_eq!(3, plan.sstable_infos.len());
1118 }
1119 }
1120
1121 {
1122 let picker = NonOverlapSubLevelPicker::new(
1124 0,
1125 10000,
1126 1,
1127 3,
1128 Arc::new(RangeOverlapStrategy::default()),
1129 true,
1130 compaction_config::max_l0_compact_level_count() as usize,
1131 true,
1132 );
1133 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1134 {
1135 let plan = &ret[0];
1136 assert_eq!(3, plan.sstable_infos.len());
1137 }
1138 }
1139
1140 {
1141 let max_expected_level_count = 3;
1143 let picker = NonOverlapSubLevelPicker::for_test(
1144 0,
1145 10000,
1146 1,
1147 3,
1148 Arc::new(RangeOverlapStrategy::default()),
1149 true,
1150 max_expected_level_count,
1151 true,
1152 );
1153 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1154 {
1155 let plan = &ret[0];
1156 assert_eq!(max_expected_level_count, plan.sstable_infos.len());
1157
1158 assert_eq!(0, plan.sstable_infos[0].1[0].sst_id);
1159 assert_eq!(1, plan.sstable_infos[1].1[0].sst_id);
1160 assert_eq!(2, plan.sstable_infos[2].1[0].sst_id);
1161 }
1162 }
1163
1164 {
1165 let max_expected_level_count = 100;
1167 let picker = NonOverlapSubLevelPicker::for_test(
1168 1000,
1169 10000,
1170 1,
1171 100,
1172 Arc::new(RangeOverlapStrategy::default()),
1173 true,
1174 max_expected_level_count,
1175 true,
1176 );
1177 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1178 {
1179 let plan = &ret[0];
1180
1181 assert_eq!(0, plan.sstable_infos[0].1[0].sst_id);
1182 assert_eq!(1, plan.sstable_infos[1].1[0].sst_id);
1183 assert_eq!(2, plan.sstable_infos[2].1[0].sst_id);
1184 assert_eq!(3, plan.sstable_infos[3].1[0].sst_id);
1185 assert!(!plan.expected);
1186 }
1187 }
1188 }
1189}