1use std::sync::Arc;
16
17use risingwave_hummock_sdk::append_sstable_info_to_string;
18use risingwave_hummock_sdk::level::{InputLevel, Level, Levels};
19use risingwave_hummock_sdk::sstable_info::SstableInfo;
20use risingwave_pb::hummock::LevelType;
21
22use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
23use crate::hummock::compaction::overlap_strategy::OverlapStrategy;
24use crate::hummock::level_handler::LevelHandler;
25
26pub struct MinOverlappingPicker {
27 level: usize,
28 target_level: usize,
29 max_select_bytes: u64,
30 vnode_partition_count: u32,
31 overlap_strategy: Arc<dyn OverlapStrategy>,
32}
33
34impl MinOverlappingPicker {
35 pub fn new(
36 level: usize,
37 target_level: usize,
38 max_select_bytes: u64,
39 vnode_partition_count: u32,
40 overlap_strategy: Arc<dyn OverlapStrategy>,
41 ) -> MinOverlappingPicker {
42 MinOverlappingPicker {
43 level,
44 target_level,
45 max_select_bytes,
46 vnode_partition_count,
47 overlap_strategy,
48 }
49 }
50
51 pub fn pick_tables(
52 &self,
53 select_tables: &[SstableInfo],
54 target_tables: &[SstableInfo],
55 level_handlers: &[LevelHandler],
56 ) -> (Vec<SstableInfo>, Vec<SstableInfo>) {
57 let mut select_file_ranges = vec![];
58 for (idx, sst) in select_tables.iter().enumerate() {
59 if level_handlers[self.level].is_pending_compact(&sst.sst_id) {
60 continue;
61 }
62 let mut overlap_info = self.overlap_strategy.create_overlap_info();
63 overlap_info.update(sst);
64 let overlap_files_range = overlap_info.check_multiple_overlap(target_tables);
65
66 if overlap_files_range.is_empty() {
67 return (vec![sst.clone()], vec![]);
68 }
69 select_file_ranges.push((idx, overlap_files_range));
70 }
71 select_file_ranges.retain(|(_, range)| {
72 let mut pending_compact = false;
73 for other in &target_tables[range.clone()] {
74 if level_handlers[self.target_level].is_pending_compact(&other.sst_id) {
75 pending_compact = true;
76 break;
77 }
78 }
79 !pending_compact
80 });
81
82 let mut min_score = u64::MAX;
83 let mut min_score_select_range = 0..0;
84 let mut min_score_target_range = 0..0;
85 let mut min_score_select_file_size = 0;
86 for left in 0..select_file_ranges.len() {
87 let mut select_file_size = 0;
88 let mut target_level_overlap_range = select_file_ranges[left].1.clone();
89 let mut total_file_size = 0;
90 for other in &target_tables[target_level_overlap_range.clone()] {
91 total_file_size += other.sst_size;
92 }
93 let start_idx = select_file_ranges[left].0;
94 let mut end_idx = start_idx + 1;
95 for (idx, range) in select_file_ranges.iter().skip(left) {
96 if select_file_size > self.max_select_bytes
97 || *idx > end_idx
98 || range.start >= target_level_overlap_range.end
99 {
100 break;
101 }
102 select_file_size += select_tables[*idx].sst_size;
103 if range.end > target_level_overlap_range.end {
104 for other in &target_tables[target_level_overlap_range.end..range.end] {
105 total_file_size += other.sst_size;
106 }
107 target_level_overlap_range.end = range.end;
108 }
109 let score = if select_file_size == 0 {
110 total_file_size
111 } else {
112 total_file_size * 100 / select_file_size
113 };
114 end_idx = idx + 1;
115 if score < min_score
116 || (score == min_score && select_file_size < min_score_select_file_size)
117 {
118 min_score = score;
119 min_score_select_range = start_idx..end_idx;
120 min_score_target_range = target_level_overlap_range.clone();
121 min_score_select_file_size = select_file_size;
122 }
123 }
124 }
125 if min_score == u64::MAX {
126 return (vec![], vec![]);
127 }
128 let select_input_ssts = select_tables[min_score_select_range].to_vec();
129 let target_input_ssts = target_tables[min_score_target_range].to_vec();
130 (select_input_ssts, target_input_ssts)
131 }
132}
133
134impl CompactionPicker for MinOverlappingPicker {
135 fn pick_compaction(
136 &mut self,
137 levels: &Levels,
138 level_handlers: &[LevelHandler],
139 stats: &mut LocalPickerStatistic,
140 ) -> Option<CompactionInput> {
141 assert!(self.level > 0);
142 let (select_input_ssts, target_input_ssts) = self.pick_tables(
143 &levels.get_level(self.level).table_infos,
144 &levels.get_level(self.target_level).table_infos,
145 level_handlers,
146 );
147 if select_input_ssts.is_empty() {
148 stats.skip_by_pending_files += 1;
149 return None;
150 }
151 Some(CompactionInput {
152 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
153 target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
154 total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
155 input_levels: vec![
156 InputLevel {
157 level_idx: self.level as u32,
158 level_type: LevelType::Nonoverlapping,
159 table_infos: select_input_ssts,
160 },
161 InputLevel {
162 level_idx: self.target_level as u32,
163 level_type: LevelType::Nonoverlapping,
164 table_infos: target_input_ssts,
165 },
166 ],
167 target_level: self.target_level,
168 vnode_partition_count: self.vnode_partition_count,
169 ..Default::default()
170 })
171 }
172}
173
174#[derive(Default, Debug)]
175pub struct SubLevelSstables {
176 pub total_file_size: u64,
177 pub total_file_count: usize,
178 pub sstable_infos: Vec<Vec<SstableInfo>>,
179 pub expected: bool,
180}
181
182pub struct NonOverlapSubLevelPicker {
183 min_compaction_bytes: u64,
184 max_compaction_bytes: u64,
185 min_expected_level_count: usize,
186 max_file_count: u64,
187 overlap_strategy: Arc<dyn OverlapStrategy>,
188 enable_check_task_level_overlap: bool,
189 max_expected_level_count: usize,
190}
191
192impl NonOverlapSubLevelPicker {
193 pub fn new(
194 min_compaction_bytes: u64,
195 max_compaction_bytes: u64,
196 min_expected_level_count: usize,
197 max_file_count: u64,
198 overlap_strategy: Arc<dyn OverlapStrategy>,
199 enable_check_task_level_overlap: bool,
200 max_expected_level_count: usize,
201 ) -> Self {
202 Self {
203 min_compaction_bytes,
204 max_compaction_bytes,
205 min_expected_level_count,
206 max_file_count,
207 overlap_strategy,
208 enable_check_task_level_overlap,
209 max_expected_level_count,
210 }
211 }
212
213 #[cfg(test)]
214 pub fn for_test(
215 min_compaction_bytes: u64,
216 max_compaction_bytes: u64,
217 min_expected_level_count: usize,
218 max_file_count: u64,
219 overlap_strategy: Arc<dyn OverlapStrategy>,
220 enable_check_task_level_overlap: bool,
221 max_expected_level_count: usize,
222 ) -> Self {
223 Self {
224 min_compaction_bytes,
225 max_compaction_bytes,
226 min_expected_level_count,
227 max_file_count,
228 overlap_strategy,
229 enable_check_task_level_overlap,
230 max_expected_level_count,
231 }
232 }
233
234 fn pick_sub_level(
240 &self,
241 levels: &[Level],
242 level_handler: &LevelHandler,
243 sst: &SstableInfo,
244 ) -> SubLevelSstables {
245 let mut ret = SubLevelSstables {
246 sstable_infos: vec![vec![]; levels.len()],
247 ..Default::default()
248 };
249
250 let mut pick_levels_range = Vec::default();
251 let mut max_select_level_count = 0;
252
253 'expand_new_level: for (target_index, target_level) in levels.iter().enumerate().skip(1) {
256 if target_level.level_type != LevelType::Nonoverlapping {
257 break;
258 }
259
260 if ret
261 .sstable_infos
262 .iter()
263 .filter(|ssts| !ssts.is_empty())
264 .count()
265 > self.max_expected_level_count
266 {
267 break;
268 }
269
270 let mut basic_overlap_info = self.overlap_strategy.create_overlap_info();
272 basic_overlap_info.update(sst);
273
274 let mut overlap_files_range =
275 basic_overlap_info.check_multiple_include(&target_level.table_infos);
276 if overlap_files_range.is_empty() {
277 overlap_files_range =
278 basic_overlap_info.check_multiple_overlap(&target_level.table_infos);
279 }
280
281 if overlap_files_range.is_empty() {
282 continue;
283 }
284
285 let mut overlap_levels = vec![];
286
287 let mut add_files_size: u64 = 0;
288 let mut add_files_count: usize = 0;
289
290 let mut select_level_count = 0;
291 for reverse_index in (0..=target_index).rev() {
292 let target_tables = &levels[reverse_index].table_infos;
293
294 overlap_files_range = if target_index == reverse_index {
295 overlap_files_range
296 } else {
297 basic_overlap_info.check_multiple_overlap(target_tables)
298 };
299
300 if overlap_files_range.is_empty() {
303 continue;
305 }
306
307 for other in &target_tables[overlap_files_range.clone()] {
308 if level_handler.is_pending_compact(&other.sst_id) {
309 break 'expand_new_level;
310 }
311 basic_overlap_info.update(other);
312
313 add_files_size += other.sst_size;
314 add_files_count += 1;
315 }
316
317 overlap_levels.push((reverse_index, overlap_files_range.clone()));
318 select_level_count += 1;
319 }
320
321 if select_level_count > max_select_level_count {
322 max_select_level_count = select_level_count;
323 pick_levels_range = overlap_levels;
324 }
325
326 if max_select_level_count >= self.min_expected_level_count
328 && (add_files_size >= self.max_compaction_bytes
329 || add_files_count >= self.max_file_count as usize)
330 {
331 break 'expand_new_level;
332 }
333 }
334
335 if !pick_levels_range.is_empty() {
336 for (reverse_index, sst_range) in pick_levels_range {
337 let level_ssts = &levels[reverse_index].table_infos;
338 ret.sstable_infos[reverse_index] = level_ssts[sst_range].to_vec();
339 ret.total_file_count += ret.sstable_infos[reverse_index].len();
340 ret.total_file_size += ret.sstable_infos[reverse_index]
341 .iter()
342 .map(|sst| sst.sst_size)
343 .sum::<u64>();
344 }
345
346 ret.sstable_infos.iter_mut().for_each(|level_ssts| {
348 level_ssts.sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
349 });
350 } else {
351 ret.total_file_count = 1;
352 ret.total_file_size = sst.sst_size;
353 ret.sstable_infos[0].extend(vec![sst.clone()]);
354 }
355
356 if self.enable_check_task_level_overlap {
357 self.verify_task_level_overlap(&ret, levels);
358 }
359
360 ret.sstable_infos.retain(|ssts| !ssts.is_empty());
361
362 if ret.total_file_size > self.max_compaction_bytes
364 || ret.total_file_count as u64 > self.max_file_count
365 || ret.sstable_infos.len() > self.max_expected_level_count
366 {
367 let mut total_file_count = 0;
369 let mut total_file_size = 0;
370 let mut total_level_count = 0;
371 for (index, sstables) in ret.sstable_infos.iter().enumerate() {
372 total_file_count += sstables.len();
373 total_file_size += sstables.iter().map(|sst| sst.sst_size).sum::<u64>();
374 total_level_count += 1;
375
376 if total_level_count >= self.min_expected_level_count
378 && (total_file_count as u64 >= self.max_file_count
379 || total_file_size >= self.max_compaction_bytes
380 || total_level_count >= self.max_expected_level_count)
381 {
382 ret.total_file_count = total_file_count;
383 ret.total_file_size = total_file_size;
384 ret.sstable_infos.truncate(index + 1);
385 break;
386 }
387 }
388 }
389
390 ret
391 }
392
393 pub fn pick_l0_multi_non_overlap_level(
394 &self,
395 l0: &[Level],
396 level_handler: &LevelHandler,
397 ) -> Vec<SubLevelSstables> {
398 if l0.len() < self.min_expected_level_count {
399 return vec![];
400 }
401
402 let mut scores = vec![];
403 for sst in &l0[0].table_infos {
404 if level_handler.is_pending_compact(&sst.sst_id) {
405 continue;
406 }
407
408 scores.push(self.pick_sub_level(l0, level_handler, sst));
409 }
410
411 if scores.is_empty() {
412 return vec![];
413 }
414
415 let mut expected = Vec::with_capacity(scores.len());
416 let mut unexpected = vec![];
417
418 for mut selected_task in scores {
419 if selected_task.sstable_infos.len() > self.max_expected_level_count
420 || selected_task.sstable_infos.len() < self.min_expected_level_count
421 || selected_task.total_file_size < self.min_compaction_bytes
422 {
423 selected_task.expected = false;
424 unexpected.push(selected_task);
425 } else {
426 selected_task.expected = true;
427 expected.push(selected_task);
428 }
429 }
430
431 expected.sort_by(|a, b| {
435 b.sstable_infos
436 .len()
437 .cmp(&a.sstable_infos.len())
438 .then_with(|| a.total_file_count.cmp(&b.total_file_count))
439 .then_with(|| a.total_file_size.cmp(&b.total_file_size))
440 });
441
442 unexpected.sort_by(|a, b| {
448 let a_select_count_offset =
449 (a.sstable_infos.len() as i64 - self.max_expected_level_count as i64).abs();
450 let b_select_count_offset =
451 (b.sstable_infos.len() as i64 - self.max_expected_level_count as i64).abs();
452
453 let a_file_count_offset =
454 (a.total_file_count as i64 - self.max_file_count as i64).abs();
455 let b_file_count_offset =
456 (b.total_file_count as i64 - self.max_file_count as i64).abs();
457
458 let a_file_size_offset =
459 (a.total_file_size as i64 - self.max_compaction_bytes as i64).abs();
460 let b_file_size_offset =
461 (b.total_file_size as i64 - self.max_compaction_bytes as i64).abs();
462
463 a_select_count_offset
464 .cmp(&b_select_count_offset)
465 .then_with(|| a_file_count_offset.cmp(&b_file_count_offset))
466 .then_with(|| a_file_size_offset.cmp(&b_file_size_offset))
467 });
468
469 expected.extend(unexpected);
470
471 expected
472 }
473
474 fn verify_task_level_overlap(&self, ret: &SubLevelSstables, levels: &[Level]) {
475 use std::fmt::Write;
476
477 use itertools::Itertools;
478
479 use crate::hummock::compaction::overlap_strategy::OverlapInfo;
480 let mut overlap_info: Option<Box<dyn OverlapInfo>> = None;
481 for (level_idx, ssts) in ret.sstable_infos.iter().enumerate().rev() {
482 if let Some(overlap_info) = overlap_info.as_mut() {
483 let level = levels.get(level_idx).unwrap();
485 let overlap_sst_range = overlap_info.check_multiple_overlap(&level.table_infos);
486 if !overlap_sst_range.is_empty() {
487 let expected_sst_ids = level.table_infos[overlap_sst_range.clone()]
488 .iter()
489 .map(|s| s.object_id)
490 .collect_vec();
491 let actual_sst_ids = ssts.iter().map(|s| s.object_id).collect_vec();
492 let start_idx = actual_sst_ids
495 .iter()
496 .position(|sst_id| sst_id == expected_sst_ids.first().unwrap());
497 if start_idx.is_none_or(|idx| {
498 actual_sst_ids[idx..idx + expected_sst_ids.len()] != expected_sst_ids
499 }) {
500 let mut actual_sst_infos = String::new();
502 ssts.iter()
503 .for_each(|s| append_sstable_info_to_string(&mut actual_sst_infos, s));
504
505 let mut expected_sst_infos = String::new();
507 level.table_infos[overlap_sst_range.clone()]
508 .iter()
509 .for_each(|s| {
510 append_sstable_info_to_string(&mut expected_sst_infos, s)
511 });
512
513 let mut ret_sst_infos = String::new();
515 ret.sstable_infos
516 .iter()
517 .enumerate()
518 .for_each(|(idx, ssts)| {
519 writeln!(
520 ret_sst_infos,
521 "sub level {}",
522 levels.get(idx).unwrap().sub_level_id
523 )
524 .unwrap();
525 ssts.iter().for_each(|s| {
526 append_sstable_info_to_string(&mut ret_sst_infos, s)
527 });
528 });
529 panic!(
530 "Compact task overlap check fails. Actual: {} Expected: {} Ret {}",
531 actual_sst_infos, expected_sst_infos, ret_sst_infos
532 );
533 }
534 }
535 } else if !ssts.is_empty() {
536 overlap_info = Some(self.overlap_strategy.create_overlap_info());
538 }
539
540 for sst in ssts {
541 overlap_info.as_mut().unwrap().update(sst);
542 }
543 }
544 }
545}
546
547#[cfg(test)]
548pub mod tests {
549 use std::collections::BTreeSet;
550
551 use risingwave_common::config::default::compaction_config;
552
553 use super::*;
554 use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
555 use crate::hummock::compaction::selector::tests::{
556 generate_l0_nonoverlapping_sublevels, generate_table,
557 };
558
559 #[test]
560 fn test_compact_l1() {
561 let mut picker =
562 MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
563 let levels = vec![
564 Level {
565 level_idx: 1,
566 level_type: LevelType::Nonoverlapping,
567 table_infos: vec![
568 generate_table(0, 1, 0, 100, 1),
569 generate_table(1, 1, 101, 200, 1),
570 generate_table(2, 1, 222, 300, 1),
571 ],
572 ..Default::default()
573 },
574 Level {
575 level_idx: 2,
576 level_type: LevelType::Nonoverlapping,
577 table_infos: vec![
578 generate_table(4, 1, 0, 100, 1),
579 generate_table(5, 1, 101, 150, 1),
580 generate_table(6, 1, 151, 201, 1),
581 generate_table(7, 1, 501, 800, 1),
582 generate_table(8, 2, 301, 400, 1),
583 ],
584 ..Default::default()
585 },
586 ];
587 let levels = Levels {
588 levels,
589 l0: generate_l0_nonoverlapping_sublevels(vec![]),
590 ..Default::default()
591 };
592 let mut level_handlers = vec![
593 LevelHandler::new(0),
594 LevelHandler::new(1),
595 LevelHandler::new(2),
596 ];
597
598 let mut local_stats = LocalPickerStatistic::default();
601 let ret = picker
602 .pick_compaction(&levels, &level_handlers, &mut local_stats)
603 .unwrap();
604 assert_eq!(ret.input_levels[0].level_idx, 1);
605 assert_eq!(ret.target_level, 2);
606 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
607 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 2);
608 assert_eq!(ret.input_levels[1].table_infos.len(), 0);
609 ret.add_pending_task(0, &mut level_handlers);
610
611 let ret = picker
612 .pick_compaction(&levels, &level_handlers, &mut local_stats)
613 .unwrap();
614 assert_eq!(ret.input_levels[0].level_idx, 1);
615 assert_eq!(ret.target_level, 2);
616 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
617 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
618 assert_eq!(ret.input_levels[1].table_infos.len(), 1);
619 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
620 ret.add_pending_task(1, &mut level_handlers);
621
622 let ret = picker
623 .pick_compaction(&levels, &level_handlers, &mut local_stats)
624 .unwrap();
625 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
626 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 1);
627 assert_eq!(ret.input_levels[1].table_infos.len(), 2);
628 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
629 }
630
631 #[test]
632 fn test_expand_l1_files() {
633 let mut picker =
634 MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
635 let levels = vec![
636 Level {
637 level_idx: 1,
638 level_type: LevelType::Nonoverlapping,
639 table_infos: vec![
640 generate_table(0, 1, 50, 99, 2),
641 generate_table(1, 1, 100, 149, 2),
642 generate_table(2, 1, 150, 249, 2),
643 ],
644 ..Default::default()
645 },
646 Level {
647 level_idx: 2,
648 level_type: LevelType::Nonoverlapping,
649 table_infos: vec![
650 generate_table(4, 1, 50, 199, 1),
651 generate_table(5, 1, 200, 399, 1),
652 ],
653 ..Default::default()
654 },
655 ];
656 let levels = Levels {
657 levels,
658 l0: generate_l0_nonoverlapping_sublevels(vec![]),
659 ..Default::default()
660 };
661 let levels_handler = vec![
662 LevelHandler::new(0),
663 LevelHandler::new(1),
664 LevelHandler::new(2),
665 ];
666
667 let ret = picker
670 .pick_compaction(
671 &levels,
672 &levels_handler,
673 &mut LocalPickerStatistic::default(),
674 )
675 .unwrap();
676 assert_eq!(ret.input_levels[0].level_idx, 1);
677 assert_eq!(ret.input_levels[1].level_idx, 2);
678
679 assert_eq!(ret.input_levels[0].table_infos.len(), 2);
680 assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
681 assert_eq!(ret.input_levels[0].table_infos[1].sst_id, 1);
682
683 assert_eq!(ret.input_levels[1].table_infos.len(), 1);
684 assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
685 }
686
687 #[test]
688 fn test_pick_l0_multi_level() {
689 let levels = vec![
690 Level {
691 level_idx: 1,
692 level_type: LevelType::Nonoverlapping,
693 table_infos: vec![
694 generate_table(0, 1, 50, 99, 2),
695 generate_table(1, 1, 100, 149, 2),
696 generate_table(2, 1, 150, 249, 2),
697 generate_table(6, 1, 250, 300, 2),
698 generate_table(7, 1, 350, 400, 2),
699 generate_table(8, 1, 450, 500, 2),
700 ],
701 total_file_size: 800,
702 ..Default::default()
703 },
704 Level {
705 level_idx: 2,
706 level_type: LevelType::Nonoverlapping,
707 table_infos: vec![
708 generate_table(4, 1, 50, 199, 1),
709 generate_table(5, 1, 200, 249, 1),
710 generate_table(9, 1, 250, 300, 2),
711 generate_table(10, 1, 350, 400, 2),
712 generate_table(11, 1, 450, 500, 2),
713 ],
714 total_file_size: 350,
715 ..Default::default()
716 },
717 Level {
718 level_idx: 3,
719 level_type: LevelType::Nonoverlapping,
720 table_infos: vec![
721 generate_table(11, 1, 250, 300, 2),
722 generate_table(12, 1, 350, 400, 2),
723 generate_table(13, 1, 450, 500, 2),
724 ],
725 total_file_size: 150,
726 ..Default::default()
727 },
728 Level {
729 level_idx: 4,
730 level_type: LevelType::Nonoverlapping,
731 table_infos: vec![
732 generate_table(14, 1, 250, 300, 2),
733 generate_table(15, 1, 350, 400, 2),
734 generate_table(16, 1, 450, 500, 2),
735 ],
736 total_file_size: 150,
737 ..Default::default()
738 },
739 ];
740
741 let levels_handlers = vec![
742 LevelHandler::new(0),
743 LevelHandler::new(1),
744 LevelHandler::new(2),
745 ];
746
747 {
748 let picker = NonOverlapSubLevelPicker::new(
750 0,
751 10000,
752 1,
753 10000,
754 Arc::new(RangeOverlapStrategy::default()),
755 true,
756 compaction_config::max_l0_compact_level_count() as usize,
757 );
758 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
759 assert_eq!(6, ret.len());
760 }
761
762 {
763 let picker = NonOverlapSubLevelPicker::new(
765 0,
766 100,
767 1,
768 10000,
769 Arc::new(RangeOverlapStrategy::default()),
770 true,
771 compaction_config::max_l0_compact_level_count() as usize,
772 );
773 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
774 assert_eq!(6, ret.len());
775 }
776
777 {
778 let picker = NonOverlapSubLevelPicker::new(
780 0,
781 10000,
782 1,
783 5,
784 Arc::new(RangeOverlapStrategy::default()),
785 true,
786 compaction_config::max_l0_compact_level_count() as usize,
787 );
788 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
789 assert_eq!(6, ret.len());
790 }
791 }
792
793 #[test]
794 fn test_pick_l0_multi_level2() {
795 let levels = vec![
796 Level {
797 level_idx: 1,
798 level_type: LevelType::Nonoverlapping,
799 table_infos: vec![
800 generate_table(0, 1, 50, 99, 2),
801 generate_table(1, 1, 100, 149, 2),
802 generate_table(2, 1, 150, 249, 2),
803 generate_table(6, 1, 250, 300, 2),
804 generate_table(7, 1, 350, 400, 2),
805 generate_table(8, 1, 450, 500, 2),
806 ],
807 total_file_size: 800,
808 ..Default::default()
809 },
810 Level {
811 level_idx: 2,
812 level_type: LevelType::Nonoverlapping,
813 table_infos: vec![
814 generate_table(4, 1, 50, 99, 1),
815 generate_table(5, 1, 150, 200, 1),
816 generate_table(9, 1, 250, 300, 2),
817 generate_table(10, 1, 350, 400, 2),
818 generate_table(11, 1, 450, 500, 2),
819 ],
820 total_file_size: 250,
821 ..Default::default()
822 },
823 Level {
824 level_idx: 3,
825 level_type: LevelType::Nonoverlapping,
826 table_infos: vec![
827 generate_table(11, 1, 250, 300, 2),
828 generate_table(12, 1, 350, 400, 2),
829 generate_table(13, 1, 450, 500, 2),
830 ],
831 total_file_size: 150,
832 ..Default::default()
833 },
834 Level {
835 level_idx: 4,
836 level_type: LevelType::Nonoverlapping,
837 table_infos: vec![
838 generate_table(14, 1, 250, 300, 2),
839 generate_table(15, 1, 350, 400, 2),
840 generate_table(16, 1, 450, 500, 2),
841 ],
842 total_file_size: 150,
843 ..Default::default()
844 },
845 ];
846
847 let levels_handlers = vec![
848 LevelHandler::new(0),
849 LevelHandler::new(1),
850 LevelHandler::new(2),
851 ];
852
853 {
854 let picker = NonOverlapSubLevelPicker::new(
856 0,
857 10000,
858 1,
859 10000,
860 Arc::new(RangeOverlapStrategy::default()),
861 true,
862 compaction_config::max_l0_compact_level_count() as usize,
863 );
864 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
865 assert_eq!(6, ret.len());
866 }
867
868 {
869 let max_compaction_bytes = 100;
871 let picker = NonOverlapSubLevelPicker::new(
872 60,
873 max_compaction_bytes,
874 1,
875 10000,
876 Arc::new(RangeOverlapStrategy::default()),
877 true,
878 compaction_config::max_l0_compact_level_count() as usize,
879 );
880 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
881 assert_eq!(6, ret.len());
882
883 for plan in &ret {
884 if plan.total_file_size >= max_compaction_bytes {
885 assert!(plan.expected);
886 } else {
887 assert!(!plan.expected);
888 }
889 }
890 }
891
892 {
893 let max_file_count = 2;
895 let picker = NonOverlapSubLevelPicker::new(
896 0,
897 10000,
898 1,
899 max_file_count,
900 Arc::new(RangeOverlapStrategy::default()),
901 true,
902 compaction_config::max_l0_compact_level_count() as usize,
903 );
904 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
905 assert_eq!(6, ret.len());
906
907 for plan in ret {
908 let mut sst_id_set = BTreeSet::default();
909 for sst in &plan.sstable_infos {
910 sst_id_set.insert(sst[0].sst_id);
911 }
912 assert!(sst_id_set.len() <= max_file_count as usize);
913 }
914 }
915
916 {
917 let min_depth = 3;
919 let picker = NonOverlapSubLevelPicker::new(
920 10,
921 10000,
922 min_depth,
923 100,
924 Arc::new(RangeOverlapStrategy::default()),
925 true,
926 compaction_config::max_l0_compact_level_count() as usize,
927 );
928 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
929 assert_eq!(6, ret.len());
930
931 for plan in ret {
932 if plan.sstable_infos.len() >= min_depth {
933 assert!(plan.expected);
934 } else {
935 assert!(!plan.expected);
936 }
937 }
938 }
939 }
940
941 #[test]
942 fn test_trivial_move_bug() {
943 let levels = [
944 Level {
945 level_idx: 1,
946 level_type: LevelType::Nonoverlapping,
947 table_infos: vec![generate_table(0, 1, 400, 500, 2)],
948 total_file_size: 100,
949 ..Default::default()
950 },
951 Level {
952 level_idx: 2,
953 level_type: LevelType::Nonoverlapping,
954 table_infos: vec![
955 generate_table(1, 1, 100, 200, 1),
956 generate_table(2, 1, 600, 700, 1),
957 ],
958 total_file_size: 200,
959 ..Default::default()
960 },
961 Level {
962 level_idx: 3,
963 level_type: LevelType::Nonoverlapping,
964 table_infos: vec![
965 generate_table(3, 1, 100, 300, 2),
966 generate_table(4, 1, 600, 800, 1),
967 ],
968 total_file_size: 400,
969 ..Default::default()
970 },
971 ];
972
973 let levels_handlers = vec![
974 LevelHandler::new(0),
975 LevelHandler::new(1),
976 LevelHandler::new(2),
977 LevelHandler::new(3),
978 ];
979 let picker =
981 MinOverlappingPicker::new(2, 3, 1000, 0, Arc::new(RangeOverlapStrategy::default()));
982 let (select_files, target_files) = picker.pick_tables(
983 &levels[1].table_infos,
984 &levels[2].table_infos,
985 &levels_handlers,
986 );
987 let overlap_strategy = Arc::new(RangeOverlapStrategy::default());
988 let mut overlap_info = overlap_strategy.create_overlap_info();
989 for sst in &select_files {
990 overlap_info.update(sst);
991 }
992 let range = overlap_info.check_multiple_overlap(&levels[0].table_infos);
993 assert!(range.is_empty());
994 assert_eq!(select_files.len(), 1);
995 assert_eq!(target_files.len(), 1);
996 }
997
998 #[test]
999 fn test_pick_unexpected_task() {
1000 let levels = vec![
1001 Level {
1002 level_idx: 1,
1003 level_type: LevelType::Nonoverlapping,
1004 table_infos: vec![generate_table(0, 1, 50, 100, 2)], total_file_size: 50,
1006 ..Default::default()
1007 },
1008 Level {
1009 level_idx: 2,
1010 level_type: LevelType::Nonoverlapping,
1011 table_infos: vec![
1012 generate_table(1, 1, 101, 150, 1), ],
1014 total_file_size: 50,
1015 ..Default::default()
1016 },
1017 Level {
1018 level_idx: 3,
1019 level_type: LevelType::Nonoverlapping,
1020 table_infos: vec![
1021 generate_table(2, 1, 151, 200, 2), ],
1023 total_file_size: 50,
1024 ..Default::default()
1025 },
1026 Level {
1027 level_idx: 4,
1028 level_type: LevelType::Nonoverlapping,
1029 table_infos: vec![
1030 generate_table(3, 1, 50, 300, 2), ],
1032 total_file_size: 250,
1033 ..Default::default()
1034 },
1035 ];
1036
1037 let levels_handlers = vec![
1038 LevelHandler::new(0),
1039 LevelHandler::new(1),
1040 LevelHandler::new(2),
1041 LevelHandler::new(3),
1042 LevelHandler::new(4),
1043 ];
1044
1045 {
1046 let picker = NonOverlapSubLevelPicker::new(
1048 0,
1049 10000,
1050 1,
1051 10000,
1052 Arc::new(RangeOverlapStrategy::default()),
1053 true,
1054 compaction_config::max_l0_compact_level_count() as usize,
1055 );
1056 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1057 {
1058 let plan = &ret[0];
1059 assert_eq!(4, plan.sstable_infos.len());
1060 }
1061 }
1062
1063 {
1064 let picker = NonOverlapSubLevelPicker::new(
1066 0,
1067 150,
1068 1,
1069 10000,
1070 Arc::new(RangeOverlapStrategy::default()),
1071 true,
1072 compaction_config::max_l0_compact_level_count() as usize,
1073 );
1074 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1075 {
1076 let plan = &ret[0];
1077 assert_eq!(3, plan.sstable_infos.len());
1078 }
1079 }
1080
1081 {
1082 let picker = NonOverlapSubLevelPicker::new(
1084 0,
1085 10000,
1086 1,
1087 3,
1088 Arc::new(RangeOverlapStrategy::default()),
1089 true,
1090 compaction_config::max_l0_compact_level_count() as usize,
1091 );
1092 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1093 {
1094 let plan = &ret[0];
1095 assert_eq!(3, plan.sstable_infos.len());
1096 }
1097 }
1098
1099 {
1100 let max_expected_level_count = 3;
1102 let picker = NonOverlapSubLevelPicker::for_test(
1103 0,
1104 10000,
1105 1,
1106 3,
1107 Arc::new(RangeOverlapStrategy::default()),
1108 true,
1109 max_expected_level_count,
1110 );
1111 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1112 {
1113 let plan = &ret[0];
1114 assert_eq!(max_expected_level_count, plan.sstable_infos.len());
1115
1116 assert_eq!(0, plan.sstable_infos[0][0].sst_id);
1117 assert_eq!(1, plan.sstable_infos[1][0].sst_id);
1118 assert_eq!(2, plan.sstable_infos[2][0].sst_id);
1119 }
1120 }
1121
1122 {
1123 let max_expected_level_count = 100;
1125 let picker = NonOverlapSubLevelPicker::for_test(
1126 1000,
1127 10000,
1128 1,
1129 100,
1130 Arc::new(RangeOverlapStrategy::default()),
1131 true,
1132 max_expected_level_count,
1133 );
1134 let ret = picker.pick_l0_multi_non_overlap_level(&levels, &levels_handlers[0]);
1135 {
1136 let plan = &ret[0];
1137
1138 assert_eq!(0, plan.sstable_infos[0][0].sst_id);
1139 assert_eq!(1, plan.sstable_infos[1][0].sst_id);
1140 assert_eq!(2, plan.sstable_infos[2][0].sst_id);
1141 assert_eq!(3, plan.sstable_infos[3][0].sst_id);
1142 assert!(!plan.expected);
1143 }
1144 }
1145 }
1146}