1use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_hummock_sdk::level::{InputLevel, Level, Levels, OverlappingLevel};
20use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
21use risingwave_pb::hummock::LevelType;
22
23use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
24use crate::hummock::compaction::overlap_strategy::{
25 OverlapInfo, OverlapStrategy, RangeOverlapInfo,
26};
27use crate::hummock::compaction::selector::ManualCompactionOption;
28use crate::hummock::level_handler::LevelHandler;
29
30pub struct ManualCompactionPicker {
31 overlap_strategy: Arc<dyn OverlapStrategy>,
32 option: ManualCompactionOption,
33 target_level: usize,
34}
35
36impl ManualCompactionPicker {
37 pub fn new(
38 overlap_strategy: Arc<dyn OverlapStrategy>,
39 option: ManualCompactionOption,
40 target_level: usize,
41 ) -> Self {
42 Self {
43 overlap_strategy,
44 option,
45 target_level,
46 }
47 }
48
49 fn pick_l0_to_sub_level(
50 &self,
51 l0: &OverlappingLevel,
52 level_handlers: &[LevelHandler],
53 ) -> Option<CompactionInput> {
54 assert_eq!(self.option.level, 0);
55 let mut input_levels = vec![];
56 let mut sub_level_id = 0;
57 let mut start_idx = None;
58 let mut end_idx = None;
59 for (idx, level) in l0.sub_levels.iter().enumerate() {
62 if !self.filter_level_by_option(level) {
63 continue;
64 }
65 if level_handlers[0].is_level_pending_compact(level) {
66 return None;
67 }
68 if start_idx.is_none() {
70 sub_level_id = level.sub_level_id;
71 start_idx = Some(idx as u64);
72 end_idx = start_idx;
73 } else {
74 end_idx = Some(idx as u64);
75 }
76 }
77 let (start_idx, end_idx) = match (start_idx, end_idx) {
78 (Some(start_idx), Some(end_idx)) => (start_idx, end_idx),
79 _ => {
80 return None;
81 }
82 };
83 for level in l0
85 .sub_levels
86 .iter()
87 .skip(start_idx as usize)
88 .take((end_idx - start_idx + 1) as usize)
89 {
90 input_levels.push(InputLevel {
91 level_idx: 0,
92 level_type: level.level_type,
93 table_infos: level.table_infos.clone(),
94 });
95 }
96 if input_levels.is_empty() {
97 return None;
98 }
99 input_levels.reverse();
100 Some(CompactionInput {
101 input_levels,
102 target_level: 0,
103 target_sub_level_id: sub_level_id,
104 ..Default::default()
105 })
106 }
107
108 fn pick_l0_to_base_level(
109 &self,
110 levels: &Levels,
111 level_handlers: &[LevelHandler],
112 ) -> Option<CompactionInput> {
113 assert!(self.option.level == 0 && self.target_level > 0);
114 for l in 1..self.target_level {
115 assert!(levels.levels[l - 1].table_infos.is_empty());
116 }
117 let l0 = &levels.l0;
118 let mut input_levels = vec![];
119 let mut max_sub_level_idx = usize::MAX;
120 let mut info = self.overlap_strategy.create_overlap_info();
121 for (idx, level) in l0.sub_levels.iter().enumerate() {
123 if !self.filter_level_by_option(level) {
124 continue;
125 }
126 if level_handlers[0].is_level_pending_compact(level) {
127 return None;
128 }
129
130 max_sub_level_idx = idx;
132 }
133 if max_sub_level_idx == usize::MAX {
134 return None;
135 }
136 for idx in 0..=max_sub_level_idx {
138 for table in &l0.sub_levels[idx].table_infos {
139 info.update(table);
140 }
141 input_levels.push(InputLevel {
142 level_idx: 0,
143 level_type: l0.sub_levels[idx].level_type,
144 table_infos: l0.sub_levels[idx].table_infos.clone(),
145 })
146 }
147 let target_input_ssts_range =
148 info.check_multiple_overlap(&levels.levels[self.target_level - 1].table_infos);
149 let target_input_ssts = if target_input_ssts_range.is_empty() {
150 vec![]
151 } else {
152 levels.levels[self.target_level - 1].table_infos[target_input_ssts_range].to_vec()
153 };
154 if target_input_ssts
155 .iter()
156 .any(|table| level_handlers[self.target_level].is_pending_compact(&table.sst_id))
157 {
158 return None;
159 }
160 if input_levels.is_empty() {
161 return None;
162 }
163 input_levels.reverse();
164 input_levels.push(InputLevel {
165 level_idx: self.target_level as u32,
166 level_type: LevelType::Nonoverlapping,
167 table_infos: target_input_ssts,
168 });
169
170 Some(CompactionInput {
171 input_levels,
172 target_level: self.target_level,
173 target_sub_level_id: 0,
174 ..Default::default()
175 })
176 }
177
178 fn filter_level_by_option(&self, level: &Level) -> bool {
181 let mut hint_sst_ids: HashSet<u64> = HashSet::new();
182 hint_sst_ids.extend(self.option.sst_ids.iter());
183 let tmp_sst_info = SstableInfoInner {
184 key_range: self.option.key_range.clone(),
185 ..Default::default()
186 }
187 .into();
188 if self
189 .overlap_strategy
190 .check_overlap_with_tables(&[tmp_sst_info], &level.table_infos)
191 .is_empty()
192 {
193 return false;
194 }
195 if !hint_sst_ids.is_empty()
196 && !level
197 .table_infos
198 .iter()
199 .any(|t| hint_sst_ids.contains(&t.sst_id))
200 {
201 return false;
202 }
203 if !self.option.internal_table_id.is_empty()
204 && !level.table_infos.iter().any(|sst_info| {
205 sst_info
206 .table_ids
207 .iter()
208 .any(|t| self.option.internal_table_id.contains(t))
209 })
210 {
211 return false;
212 }
213 true
214 }
215}
216
217impl CompactionPicker for ManualCompactionPicker {
218 fn pick_compaction(
219 &mut self,
220 levels: &Levels,
221 level_handlers: &[LevelHandler],
222 _stats: &mut LocalPickerStatistic,
223 ) -> Option<CompactionInput> {
224 if self.option.level == 0 {
225 if !self.option.sst_ids.is_empty() {
226 return self.pick_l0_to_sub_level(&levels.l0, level_handlers);
227 } else if self.target_level > 0 {
228 return self.pick_l0_to_base_level(levels, level_handlers);
229 } else {
230 return None;
231 }
232 }
233 let mut hint_sst_ids: HashSet<u64> = HashSet::new();
234 hint_sst_ids.extend(self.option.sst_ids.iter());
235 let mut tmp_sst_info = SstableInfoInner::default();
236 let mut range_overlap_info = RangeOverlapInfo::default();
237 tmp_sst_info.key_range = self.option.key_range.clone();
238 range_overlap_info.update(&tmp_sst_info.into());
239 let level = self.option.level;
240 let target_level = self.target_level;
241 assert!(
242 self.option.level == self.target_level || self.option.level + 1 == self.target_level
243 );
244 let mut select_input_ssts: Vec<SstableInfo> = levels
246 .get_level(self.option.level)
247 .table_infos
248 .iter()
249 .filter(|sst_info| hint_sst_ids.is_empty() || hint_sst_ids.contains(&sst_info.sst_id))
250 .filter(|sst_info| range_overlap_info.check_overlap(sst_info))
251 .filter(|sst_info| {
252 if self.option.internal_table_id.is_empty() {
253 return true;
254 }
255
256 for table_id in &sst_info.table_ids {
258 if self.option.internal_table_id.contains(table_id) {
259 return true;
260 }
261 }
262 false
263 })
264 .cloned()
265 .collect();
266 if select_input_ssts.is_empty() {
267 return None;
268 }
269 let target_input_ssts = if target_level == level {
270 let (left, _) = levels
272 .get_level(level)
273 .table_infos
274 .iter()
275 .find_position(|p| p.sst_id == select_input_ssts.first().unwrap().sst_id)
276 .unwrap();
277 let (right, _) = levels
278 .get_level(level)
279 .table_infos
280 .iter()
281 .find_position(|p| p.sst_id == select_input_ssts.last().unwrap().sst_id)
282 .unwrap();
283 select_input_ssts = levels.get_level(level).table_infos[left..=right].to_vec();
284 vec![]
285 } else {
286 self.overlap_strategy.check_base_level_overlap(
287 &select_input_ssts,
288 &levels.get_level(target_level).table_infos,
289 )
290 };
291 if select_input_ssts
292 .iter()
293 .any(|table| level_handlers[level].is_pending_compact(&table.sst_id))
294 {
295 return None;
296 }
297 if target_input_ssts
298 .iter()
299 .any(|table| level_handlers[target_level].is_pending_compact(&table.sst_id))
300 {
301 return None;
302 }
303
304 Some(CompactionInput {
305 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
306 target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
307 total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
308 input_levels: vec![
309 InputLevel {
310 level_idx: level as u32,
311 level_type: levels.levels[level - 1].level_type,
312 table_infos: select_input_ssts,
313 },
314 InputLevel {
315 level_idx: target_level as u32,
316 level_type: levels.levels[target_level - 1].level_type,
317 table_infos: target_input_ssts,
318 },
319 ],
320 target_level,
321 ..Default::default()
322 })
323 }
324}
325
326#[cfg(test)]
327pub mod tests {
328 use std::collections::{BTreeSet, HashMap};
329
330 use bytes::Bytes;
331 use risingwave_hummock_sdk::key_range::KeyRange;
332 use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
333 use risingwave_pb::hummock::compact_task;
334
335 use super::*;
336 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
337 use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
338 use crate::hummock::compaction::selector::tests::{
339 assert_compaction_task, generate_l0_nonoverlapping_sublevels,
340 generate_l0_overlapping_sublevels, generate_level, generate_table,
341 };
342 use crate::hummock::compaction::selector::{CompactionSelector, ManualCompactionSelector};
343 use crate::hummock::compaction::{CompactionDeveloperConfig, LocalSelectorStatistic};
344 use crate::hummock::model::CompactionGroup;
345 use crate::hummock::test_utils::{compaction_selector_context, iterator_test_key_of_epoch};
346
347 fn clean_task_state(level_handler: &mut LevelHandler) {
348 for pending_task_id in &level_handler.pending_tasks_ids() {
349 level_handler.remove_task(*pending_task_id);
350 }
351 }
352
353 fn is_l0_to_lbase(compaction_input: &CompactionInput) -> bool {
354 compaction_input
355 .input_levels
356 .iter()
357 .take(compaction_input.input_levels.len() - 1)
358 .all(|i| i.level_idx == 0)
359 && compaction_input
360 .input_levels
361 .iter()
362 .last()
363 .unwrap()
364 .level_idx as usize
365 == compaction_input.target_level
366 && compaction_input.target_level > 0
367 }
368
369 fn is_l0_to_l0(compaction_input: &CompactionInput) -> bool {
370 compaction_input
371 .input_levels
372 .iter()
373 .all(|i| i.level_idx == 0)
374 && compaction_input.target_level == 0
375 }
376
377 #[test]
378 fn test_manual_compaction_picker() {
379 let levels = vec![
380 Level {
381 level_idx: 1,
382 level_type: LevelType::Nonoverlapping,
383 table_infos: vec![
384 generate_table(0, 1, 0, 100, 1),
385 generate_table(1, 1, 101, 200, 1),
386 generate_table(2, 1, 222, 300, 1),
387 ],
388 ..Default::default()
389 },
390 Level {
391 level_idx: 2,
392 level_type: LevelType::Nonoverlapping,
393 table_infos: vec![
394 generate_table(4, 1, 0, 100, 1),
395 generate_table(5, 1, 101, 150, 1),
396 generate_table(6, 1, 151, 201, 1),
397 generate_table(7, 1, 501, 800, 1),
398 generate_table(8, 2, 301, 400, 1),
399 ],
400 ..Default::default()
401 },
402 ];
403 let mut levels = Levels {
404 levels,
405 l0: generate_l0_nonoverlapping_sublevels(vec![]),
406 ..Default::default()
407 };
408 let mut levels_handler = vec![
409 LevelHandler::new(0),
410 LevelHandler::new(1),
411 LevelHandler::new(2),
412 ];
413 let mut local_stats = LocalPickerStatistic::default();
414
415 {
416 let option = ManualCompactionOption {
418 level: 1,
419 key_range: KeyRange {
420 left: Bytes::from(iterator_test_key_of_epoch(1, 0, 1)),
421 right: Bytes::from(iterator_test_key_of_epoch(1, 201, 1)),
422 right_exclusive: false,
423 },
424 ..Default::default()
425 };
426
427 let target_level = option.level + 1;
428 let mut picker = ManualCompactionPicker::new(
429 Arc::new(RangeOverlapStrategy::default()),
430 option,
431 target_level,
432 );
433 let result = picker
434 .pick_compaction(&levels, &levels_handler, &mut local_stats)
435 .unwrap();
436 result.add_pending_task(0, &mut levels_handler);
437
438 assert_eq!(2, result.input_levels[0].table_infos.len());
439 assert_eq!(3, result.input_levels[1].table_infos.len());
440 }
441
442 {
443 clean_task_state(&mut levels_handler[1]);
444 clean_task_state(&mut levels_handler[2]);
445
446 let option = ManualCompactionOption::default();
448 let target_level = option.level + 1;
449 let mut picker = ManualCompactionPicker::new(
450 Arc::new(RangeOverlapStrategy::default()),
451 option,
452 target_level,
453 );
454 let result = picker
455 .pick_compaction(&levels, &levels_handler, &mut local_stats)
456 .unwrap();
457 result.add_pending_task(0, &mut levels_handler);
458
459 assert_eq!(3, result.input_levels[0].table_infos.len());
460 assert_eq!(3, result.input_levels[1].table_infos.len());
461 }
462
463 {
464 clean_task_state(&mut levels_handler[1]);
465 clean_task_state(&mut levels_handler[2]);
466
467 let level_table_info = &mut levels.levels[0].table_infos;
468 let table_info_1 = &mut level_table_info[1];
469 let mut t_inner = table_info_1.get_inner();
470 t_inner.table_ids.resize(2, 0);
471 t_inner.table_ids[0] = 1;
472 t_inner.table_ids[1] = 2;
473 *table_info_1 = t_inner.into();
474
475 let option = ManualCompactionOption {
477 level: 1,
478 internal_table_id: HashSet::from([2]),
479 ..Default::default()
480 };
481
482 let target_level = option.level + 1;
483 let mut picker = ManualCompactionPicker::new(
484 Arc::new(RangeOverlapStrategy::default()),
485 option,
486 target_level,
487 );
488
489 let result = picker
490 .pick_compaction(&levels, &levels_handler, &mut local_stats)
491 .unwrap();
492 result.add_pending_task(0, &mut levels_handler);
493
494 assert_eq!(1, result.input_levels[0].table_infos.len());
495 assert_eq!(2, result.input_levels[1].table_infos.len());
496 }
497
498 {
499 clean_task_state(&mut levels_handler[1]);
500 clean_task_state(&mut levels_handler[2]);
501
502 let level_table_info = &mut levels.levels[0].table_infos;
504 for table_info in level_table_info {
505 let mut t_inner = table_info.get_inner();
506 t_inner.table_ids.resize(2, 0);
507 t_inner.table_ids[0] = 1;
508 t_inner.table_ids[1] = 2;
509 *table_info = t_inner.into();
510 }
511
512 let option = ManualCompactionOption {
514 sst_ids: vec![],
515 level: 1,
516 key_range: KeyRange {
517 left: Bytes::from(iterator_test_key_of_epoch(1, 101, 1)),
518 right: Bytes::from(iterator_test_key_of_epoch(1, 199, 1)),
519 right_exclusive: false,
520 },
521 internal_table_id: HashSet::from([2]),
522 };
523
524 let target_level = option.level + 1;
525 let mut picker = ManualCompactionPicker::new(
526 Arc::new(RangeOverlapStrategy::default()),
527 option,
528 target_level,
529 );
530
531 let result = picker
532 .pick_compaction(&levels, &levels_handler, &mut local_stats)
533 .unwrap();
534
535 assert_eq!(1, result.input_levels[0].table_infos.len());
536 assert_eq!(2, result.input_levels[1].table_infos.len());
537 }
538 }
539
540 fn generate_test_levels() -> (Levels, Vec<LevelHandler>) {
541 let mut l0 = generate_l0_overlapping_sublevels(vec![
542 vec![
543 generate_table(5, 1, 0, 500, 2),
544 generate_table(6, 2, 600, 1000, 2),
545 ],
546 vec![
547 generate_table(7, 1, 0, 500, 3),
548 generate_table(8, 2, 600, 1000, 3),
549 ],
550 vec![
551 generate_table(9, 1, 300, 500, 4),
552 generate_table(10, 2, 600, 1000, 4),
553 ],
554 ]);
555 l0.sub_levels[1].level_type = LevelType::Nonoverlapping as _;
557 assert_eq!(l0.sub_levels.len(), 3);
558 let mut levels = vec![
559 Level {
560 level_idx: 1,
561 level_type: LevelType::Nonoverlapping,
562 table_infos: vec![
563 generate_table(3, 1, 0, 100, 1),
564 generate_table(4, 2, 2000, 3000, 1),
565 ],
566 ..Default::default()
567 },
568 Level {
569 level_idx: 2,
570 level_type: LevelType::Nonoverlapping,
571 table_infos: vec![
572 generate_table(1, 1, 0, 100, 1),
573 generate_table(2, 2, 2000, 3000, 1),
574 ],
575 ..Default::default()
576 },
577 ];
578 assert_eq!(levels.len(), 2);
580 for iter in [l0.sub_levels.iter_mut(), levels.iter_mut()] {
581 for (idx, l) in iter.enumerate() {
582 for t in &mut l.table_infos {
583 let mut t_inner = t.get_inner();
584 t_inner.table_ids.clear();
585 if idx == 0 {
586 t_inner.table_ids.push(((t.sst_id % 2) + 1) as _);
587 } else {
588 t_inner.table_ids.push(3);
589 }
590 *t = t_inner.into();
591 }
592 }
593 }
594 let levels = Levels {
595 levels,
596 l0,
597 ..Default::default()
598 };
599
600 let levels_handler = vec![
601 LevelHandler::new(0),
602 LevelHandler::new(1),
603 LevelHandler::new(2),
604 ];
605 (levels, levels_handler)
606 }
607
608 fn generate_intra_test_levels() -> (Levels, Vec<LevelHandler>) {
609 let l0 = generate_l0_overlapping_sublevels(vec![]);
610 let levels = vec![Level {
611 level_idx: 1,
612 level_type: LevelType::Nonoverlapping,
613 table_infos: vec![
614 generate_table(1, 1, 0, 100, 1),
615 generate_table(2, 2, 100, 200, 1),
616 generate_table(3, 2, 200, 300, 1),
617 generate_table(4, 2, 300, 400, 1),
618 ],
619 ..Default::default()
620 }];
621 let levels = Levels {
622 levels,
623 l0,
624 ..Default::default()
625 };
626
627 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
628 (levels, levels_handler)
629 }
630
631 #[test]
632 fn test_l0_empty() {
633 let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
634 let levels = vec![Level {
635 level_idx: 1,
636 level_type: LevelType::Nonoverlapping,
637 table_infos: vec![],
638 total_file_size: 0,
639 sub_level_id: 0,
640 uncompressed_file_size: 0,
641 ..Default::default()
642 }];
643 let levels = Levels {
644 levels,
645 l0,
646 ..Default::default()
647 };
648 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
649 let option = ManualCompactionOption {
650 sst_ids: vec![1],
651 level: 0,
652 key_range: KeyRange {
653 left: Bytes::default(),
654 right: Bytes::default(),
655 right_exclusive: false,
656 },
657 internal_table_id: HashSet::default(),
658 };
659 let mut picker =
660 ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 0);
661 assert!(
662 picker
663 .pick_compaction(
664 &levels,
665 &levels_handler,
666 &mut LocalPickerStatistic::default()
667 )
668 .is_none()
669 );
670 }
671
672 #[test]
673 fn test_l0_basic() {
674 let (levels, levels_handler) = generate_test_levels();
675
676 let option = ManualCompactionOption {
678 sst_ids: vec![],
679 level: 0,
680 key_range: KeyRange {
681 left: Bytes::default(),
682 right: Bytes::default(),
683 right_exclusive: false,
684 },
685 internal_table_id: HashSet::default(),
686 };
687 let mut picker = ManualCompactionPicker::new(
688 Arc::new(RangeOverlapStrategy::default()),
689 option.clone(),
690 0,
691 );
692 let mut local_stats = LocalPickerStatistic::default();
693 assert!(
694 picker
695 .pick_compaction(&levels, &levels_handler, &mut local_stats)
696 .is_none()
697 );
698
699 let mut picker =
701 ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
702 let mut expected = [vec![5, 6], vec![7, 8], vec![9, 10]];
703 expected.reverse();
704 let result = picker
705 .pick_compaction(&levels, &levels_handler, &mut local_stats)
706 .unwrap();
707 assert_eq!(result.input_levels.len(), 4);
708 assert!(is_l0_to_lbase(&result));
709 assert_eq!(result.target_level, 1);
710 for (l, e) in expected.iter().enumerate().take(3) {
711 assert_eq!(
712 result.input_levels[l]
713 .table_infos
714 .iter()
715 .map(|s| s.sst_id)
716 .collect_vec(),
717 *e
718 );
719 }
720 assert_eq!(
721 result.input_levels[3].table_infos,
722 vec![levels.levels[0].table_infos[0].clone()]
723 );
724
725 let option = ManualCompactionOption {
727 sst_ids: vec![],
728 level: 0,
729 key_range: KeyRange {
730 left: Bytes::from(iterator_test_key_of_epoch(1, 0, 2)),
731 right: Bytes::from(iterator_test_key_of_epoch(1, 200, 2)),
732 right_exclusive: false,
733 },
734 internal_table_id: HashSet::default(),
735 };
736 let mut picker =
737 ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
738 let mut expected = [vec![5, 6], vec![7, 8]];
739 expected.reverse();
740 let result = picker
741 .pick_compaction(&levels, &levels_handler, &mut local_stats)
742 .unwrap();
743 assert_eq!(result.input_levels.len(), 3);
744 assert!(is_l0_to_lbase(&result));
745 assert_eq!(result.target_level, 1);
746 for (l, e) in expected.iter().enumerate().take(2) {
747 assert_eq!(
748 result.input_levels[l]
749 .table_infos
750 .iter()
751 .map(|s| s.sst_id)
752 .collect_vec(),
753 *e
754 );
755 }
756 assert_eq!(
757 result.input_levels[2].table_infos,
758 vec![levels.levels[0].table_infos[0].clone()]
759 );
760 }
761
762 #[test]
763 fn test_l0_to_l0_option_sst_ids() {
764 let (levels, levels_handler) = generate_test_levels();
765 let sst_id_filters = vec![
767 (0, vec![6], vec![vec![5, 6]]),
768 (0, vec![7], vec![vec![7, 8]]),
769 (0, vec![9], vec![vec![9, 10]]),
770 (0, vec![6, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
771 (0, vec![8, 9], vec![vec![7, 8], vec![9, 10]]),
772 (0, vec![6, 8, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
773 ];
774 let mut local_stats = LocalPickerStatistic::default();
775 for (input_level, sst_id_filter, expected) in &sst_id_filters {
776 let expected = expected.iter().rev().cloned().collect_vec();
777 let option = ManualCompactionOption {
778 sst_ids: sst_id_filter.clone(),
779 level: *input_level as _,
780 key_range: KeyRange {
781 left: Bytes::default(),
782 right: Bytes::default(),
783 right_exclusive: false,
784 },
785 internal_table_id: HashSet::default(),
786 };
787 let mut picker = ManualCompactionPicker::new(
788 Arc::new(RangeOverlapStrategy::default()),
789 option.clone(),
790 input_level + 1,
792 );
793 let result = picker
794 .pick_compaction(&levels, &levels_handler, &mut local_stats)
795 .unwrap();
796 assert!(is_l0_to_l0(&result));
797 assert_eq!(result.input_levels.len(), expected.len());
798 for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
799 assert_eq!(
800 result.input_levels[i]
801 .table_infos
802 .iter()
803 .map(|s| s.sst_id)
804 .collect_vec(),
805 *e
806 );
807 }
808 }
809 }
810
811 #[test]
812 fn test_l0_to_lbase_option_internal_table() {
813 let (levels, mut levels_handler) = generate_test_levels();
814 let input_level = 0;
815 let target_level = input_level + 1;
816 let mut local_stats = LocalPickerStatistic::default();
817 {
818 let option = ManualCompactionOption {
819 sst_ids: vec![],
820 level: input_level,
821 key_range: KeyRange {
822 left: Bytes::default(),
823 right: Bytes::default(),
824 right_exclusive: false,
825 },
826 internal_table_id: HashSet::from([100]),
828 };
829 let mut picker = ManualCompactionPicker::new(
830 Arc::new(RangeOverlapStrategy::default()),
831 option,
832 target_level,
833 );
834 assert!(
835 picker
836 .pick_compaction(&levels, &levels_handler, &mut local_stats)
837 .is_none()
838 )
839 }
840
841 {
842 let option = ManualCompactionOption {
843 sst_ids: vec![],
844 level: input_level,
845 key_range: KeyRange {
846 left: Bytes::default(),
847 right: Bytes::default(),
848 right_exclusive: false,
849 },
850 internal_table_id: HashSet::from([1, 2, 3]),
852 };
853 let mut picker = ManualCompactionPicker::new(
854 Arc::new(RangeOverlapStrategy::default()),
855 option,
856 target_level,
857 );
858 let result = picker
859 .pick_compaction(&levels, &levels_handler, &mut local_stats)
860 .unwrap();
861 assert_eq!(result.input_levels.len(), 4);
862 assert!(is_l0_to_lbase(&result));
863 assert_eq!(result.target_level, 1);
864 assert!(is_l0_to_lbase(&result));
865 assert_eq!(
866 result
867 .input_levels
868 .iter()
869 .take(3)
870 .flat_map(|s| s.table_infos.clone())
871 .map(|s| s.sst_id)
872 .collect_vec(),
873 vec![9, 10, 7, 8, 5, 6]
874 );
875 assert_eq!(
876 result.input_levels[3]
877 .table_infos
878 .iter()
879 .map(|s| s.sst_id)
880 .collect_vec(),
881 vec![3]
882 );
883 }
884
885 {
886 let option = ManualCompactionOption {
887 sst_ids: vec![],
888 level: input_level,
889 key_range: KeyRange {
890 left: Bytes::default(),
891 right: Bytes::default(),
892 right_exclusive: false,
893 },
894 internal_table_id: HashSet::from([3]),
896 };
897 let mut picker = ManualCompactionPicker::new(
898 Arc::new(RangeOverlapStrategy::default()),
899 option,
900 target_level,
901 );
902 let result = picker
903 .pick_compaction(&levels, &levels_handler, &mut local_stats)
904 .unwrap();
905 assert_eq!(result.input_levels.len(), 4);
906 assert!(is_l0_to_lbase(&result));
907 assert_eq!(
908 result
909 .input_levels
910 .iter()
911 .take(3)
912 .flat_map(|s| s.table_infos.clone())
913 .map(|s| s.sst_id)
914 .collect_vec(),
915 vec![9, 10, 7, 8, 5, 6]
916 );
917 assert_eq!(
918 result.input_levels[3]
919 .table_infos
920 .iter()
921 .map(|s| s.sst_id)
922 .collect_vec(),
923 vec![3]
924 );
925 assert_eq!(result.target_level, 1);
926 }
927
928 {
929 let option = ManualCompactionOption {
930 sst_ids: vec![],
931 level: input_level,
932 key_range: KeyRange {
933 left: Bytes::default(),
934 right: Bytes::default(),
935 right_exclusive: false,
936 },
937 internal_table_id: HashSet::from([1]),
940 };
941 let mut picker = ManualCompactionPicker::new(
942 Arc::new(RangeOverlapStrategy::default()),
943 option,
944 target_level,
945 );
946 let result = picker
947 .pick_compaction(&levels, &levels_handler, &mut local_stats)
948 .unwrap();
949 result.add_pending_task(0, &mut levels_handler);
950 assert_eq!(result.input_levels.len(), 2);
951 assert!(is_l0_to_lbase(&result));
952 assert_eq!(result.target_level, 1);
953 assert_eq!(
954 result
955 .input_levels
956 .iter()
957 .take(1)
958 .flat_map(|s| s.table_infos.clone())
959 .map(|s| s.sst_id)
960 .collect_vec(),
961 vec![5, 6]
962 );
963 assert_eq!(
964 result.input_levels[1]
965 .table_infos
966 .iter()
967 .map(|s| s.sst_id)
968 .collect_vec(),
969 vec![3]
970 );
971
972 let option = ManualCompactionOption {
974 sst_ids: vec![],
975 level: input_level,
976 key_range: KeyRange {
977 left: Bytes::default(),
978 right: Bytes::default(),
979 right_exclusive: false,
980 },
981 internal_table_id: HashSet::from([3]),
983 };
984 let mut picker = ManualCompactionPicker::new(
985 Arc::new(RangeOverlapStrategy::default()),
986 option,
987 target_level,
988 );
989 assert!(
991 picker
992 .pick_compaction(&levels, &levels_handler, &mut local_stats)
993 .is_none()
994 );
995
996 clean_task_state(&mut levels_handler[0]);
997 clean_task_state(&mut levels_handler[1]);
998 }
999 }
1000
1001 #[test]
1002 fn test_ln_to_lnext_option_internal_table() {
1003 let (levels, levels_handler) = generate_test_levels();
1004 let input_level = 1;
1005 let target_level = input_level + 1;
1006 let mut local_stats = LocalPickerStatistic::default();
1007 {
1008 let option = ManualCompactionOption {
1009 sst_ids: vec![],
1010 level: input_level,
1011 key_range: KeyRange {
1012 left: Bytes::default(),
1013 right: Bytes::default(),
1014 right_exclusive: false,
1015 },
1016 internal_table_id: HashSet::from([100]),
1018 };
1019 let mut picker = ManualCompactionPicker::new(
1020 Arc::new(RangeOverlapStrategy::default()),
1021 option,
1022 target_level,
1023 );
1024 assert!(
1025 picker
1026 .pick_compaction(&levels, &levels_handler, &mut local_stats)
1027 .is_none()
1028 )
1029 }
1030
1031 {
1032 let expected_input_level_sst_ids = [vec![4], vec![2]];
1033 let option = ManualCompactionOption {
1034 sst_ids: vec![],
1035 level: input_level,
1036 key_range: KeyRange {
1037 left: Bytes::default(),
1038 right: Bytes::default(),
1039 right_exclusive: false,
1040 },
1041 internal_table_id: HashSet::from([1]),
1043 };
1044 let mut picker = ManualCompactionPicker::new(
1045 Arc::new(RangeOverlapStrategy::default()),
1046 option,
1047 target_level,
1048 );
1049 let result = picker
1050 .pick_compaction(&levels, &levels_handler, &mut local_stats)
1051 .unwrap();
1052 assert_eq!(
1053 result.input_levels.len(),
1054 expected_input_level_sst_ids.len()
1055 );
1056 assert_eq!(result.target_level, target_level);
1057 for (l, e) in expected_input_level_sst_ids
1058 .iter()
1059 .enumerate()
1060 .take(result.input_levels.len())
1061 {
1062 assert_eq!(
1063 result.input_levels[l]
1064 .table_infos
1065 .iter()
1066 .map(|s| s.sst_id)
1067 .collect_vec(),
1068 *e
1069 );
1070 }
1071 }
1072 }
1073
1074 #[test]
1075 fn test_ln_to_lnext_option_sst_ids() {
1076 let (levels, levels_handler) = generate_test_levels();
1077 let sst_id_filters = vec![
1079 (1, vec![3], vec![vec![3], vec![1]]),
1080 (1, vec![4], vec![vec![4], vec![2]]),
1081 (1, vec![3, 4], vec![vec![3, 4], vec![1, 2]]),
1082 ];
1083 let mut local_stats = LocalPickerStatistic::default();
1084 for (input_level, sst_id_filter, expected) in &sst_id_filters {
1085 let option = ManualCompactionOption {
1086 sst_ids: sst_id_filter.clone(),
1087 level: *input_level as _,
1088 key_range: KeyRange {
1089 left: Bytes::default(),
1090 right: Bytes::default(),
1091 right_exclusive: false,
1092 },
1093 internal_table_id: HashSet::default(),
1094 };
1095 let mut picker = ManualCompactionPicker::new(
1096 Arc::new(RangeOverlapStrategy::default()),
1097 option.clone(),
1098 input_level + 1,
1099 );
1100 let result = picker
1101 .pick_compaction(&levels, &levels_handler, &mut local_stats)
1102 .unwrap();
1103 assert_eq!(result.input_levels.len(), expected.len());
1104 for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1105 assert_eq!(
1106 result.input_levels[i]
1107 .table_infos
1108 .iter()
1109 .map(|s| s.sst_id)
1110 .collect_vec(),
1111 *e
1112 );
1113 }
1114 }
1115 }
1116
1117 #[test]
1118 fn test_ln_to_ln() {
1119 let (levels, levels_handler) = generate_intra_test_levels();
1120 let sst_id_filters = vec![
1122 (1, vec![1], vec![vec![1], vec![]]),
1123 (1, vec![3], vec![vec![3], vec![]]),
1124 (1, vec![4], vec![vec![4], vec![]]),
1125 (1, vec![3, 4], vec![vec![3, 4], vec![]]),
1126 (1, vec![1, 4], vec![vec![1, 2, 3, 4], vec![]]),
1127 (1, vec![2, 4], vec![vec![2, 3, 4], vec![]]),
1128 (1, vec![1, 3], vec![vec![1, 2, 3], vec![]]),
1129 ];
1130 for (input_level, sst_id_filter, expected) in &sst_id_filters {
1131 let option = ManualCompactionOption {
1132 sst_ids: sst_id_filter.clone(),
1133 level: *input_level as _,
1134 key_range: KeyRange {
1135 left: Bytes::default(),
1136 right: Bytes::default(),
1137 right_exclusive: false,
1138 },
1139 internal_table_id: HashSet::default(),
1140 };
1141 let mut picker = ManualCompactionPicker::new(
1142 Arc::new(RangeOverlapStrategy::default()),
1143 option.clone(),
1144 *input_level as _,
1145 );
1146 let result = picker
1147 .pick_compaction(
1148 &levels,
1149 &levels_handler,
1150 &mut LocalPickerStatistic::default(),
1151 )
1152 .unwrap();
1153 assert_eq!(result.input_levels.len(), expected.len());
1154 for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1155 assert_eq!(
1156 result.input_levels[i]
1157 .table_infos
1158 .iter()
1159 .map(|s| s.sst_id)
1160 .collect_vec(),
1161 *e
1162 );
1163 }
1164 }
1165 }
1166
1167 #[test]
1168 fn test_manual_compaction_selector_l0() {
1169 let config = CompactionConfigBuilder::new().max_level(4).build();
1170 let group_config = CompactionGroup::new(1, config);
1171 let l0 = generate_l0_nonoverlapping_sublevels(vec![
1172 generate_table(0, 1, 0, 500, 1),
1173 generate_table(1, 1, 0, 500, 1),
1174 ]);
1175 assert_eq!(l0.sub_levels.len(), 2);
1176 let levels = vec![
1177 generate_level(1, vec![]),
1178 generate_level(2, vec![]),
1179 generate_level(3, vec![]),
1180 Level {
1181 level_idx: 4,
1182 level_type: LevelType::Nonoverlapping,
1183 table_infos: vec![
1184 generate_table(2, 1, 0, 100, 1),
1185 generate_table(3, 1, 101, 200, 1),
1186 generate_table(4, 1, 222, 300, 1),
1187 ],
1188 ..Default::default()
1189 },
1190 ];
1191 assert_eq!(levels.len(), 4);
1192 let levels = Levels {
1193 levels,
1194 l0,
1195 ..Default::default()
1196 };
1197 let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1198 let mut local_stats = LocalSelectorStatistic::default();
1199
1200 {
1202 let option = ManualCompactionOption {
1203 sst_ids: vec![0, 1],
1204 key_range: KeyRange {
1205 left: Bytes::default(),
1206 right: Bytes::default(),
1207 right_exclusive: false,
1208 },
1209 internal_table_id: HashSet::default(),
1210 level: 0,
1211 };
1212 let mut selector = ManualCompactionSelector::new(option);
1213 let task = selector
1214 .pick_compaction(
1215 1,
1216 compaction_selector_context(
1217 &group_config,
1218 &levels,
1219 &BTreeSet::new(),
1220 &mut levels_handler,
1221 &mut local_stats,
1222 &HashMap::default(),
1223 Arc::new(CompactionDeveloperConfig::default()),
1224 &Default::default(),
1225 &HummockVersionStateTableInfo::empty(),
1226 ),
1227 )
1228 .unwrap();
1229 assert_compaction_task(&task, &levels_handler);
1230 assert_eq!(task.input.input_levels.len(), 2);
1231 assert_eq!(task.input.input_levels[0].level_idx, 0);
1232 assert_eq!(task.input.input_levels[1].level_idx, 0);
1233 assert_eq!(task.input.target_level, 0);
1234 }
1235
1236 for level_handler in &mut levels_handler {
1237 for pending_task_id in &level_handler.pending_tasks_ids() {
1238 level_handler.remove_task(*pending_task_id);
1239 }
1240 }
1241
1242 {
1244 let option = ManualCompactionOption {
1245 sst_ids: vec![],
1246 key_range: KeyRange {
1247 left: Bytes::default(),
1248 right: Bytes::default(),
1249 right_exclusive: false,
1250 },
1251 internal_table_id: HashSet::default(),
1252 level: 0,
1253 };
1254 let mut selector = ManualCompactionSelector::new(option);
1255 let task = selector
1256 .pick_compaction(
1257 2,
1258 compaction_selector_context(
1259 &group_config,
1260 &levels,
1261 &BTreeSet::new(),
1262 &mut levels_handler,
1263 &mut local_stats,
1264 &HashMap::default(),
1265 Arc::new(CompactionDeveloperConfig::default()),
1266 &Default::default(),
1267 &HummockVersionStateTableInfo::empty(),
1268 ),
1269 )
1270 .unwrap();
1271 assert_compaction_task(&task, &levels_handler);
1272 assert_eq!(task.input.input_levels.len(), 3);
1273 assert_eq!(task.input.input_levels[0].level_idx, 0);
1274 assert_eq!(task.input.input_levels[1].level_idx, 0);
1275 assert_eq!(task.input.input_levels[2].level_idx, 4);
1276 assert_eq!(task.input.target_level, 4);
1277 }
1278 }
1279
1280 #[test]
1282 fn test_manual_compaction_selector() {
1283 let config = CompactionConfigBuilder::new().max_level(4).build();
1284 let group_config = CompactionGroup::new(1, config);
1285 let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
1286 assert_eq!(l0.sub_levels.len(), 0);
1287 let levels = vec![
1288 generate_level(1, vec![]),
1289 generate_level(2, vec![]),
1290 generate_level(
1291 3,
1292 vec![
1293 generate_table(0, 1, 150, 151, 1),
1294 generate_table(1, 1, 250, 251, 1),
1295 ],
1296 ),
1297 Level {
1298 level_idx: 4,
1299 level_type: LevelType::Nonoverlapping,
1300 table_infos: vec![
1301 generate_table(2, 1, 0, 100, 1),
1302 generate_table(3, 1, 101, 200, 1),
1303 generate_table(4, 1, 222, 300, 1),
1304 generate_table(5, 1, 333, 400, 1),
1305 generate_table(6, 1, 444, 500, 1),
1306 generate_table(7, 1, 555, 600, 1),
1307 ],
1308 ..Default::default()
1309 },
1310 ];
1311 assert_eq!(levels.len(), 4);
1312 let levels = Levels {
1313 levels,
1314 l0,
1315 ..Default::default()
1316 };
1317 let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1318 let mut local_stats = LocalSelectorStatistic::default();
1319
1320 {
1322 let option = ManualCompactionOption {
1323 sst_ids: vec![0, 1],
1324 key_range: KeyRange {
1325 left: Bytes::default(),
1326 right: Bytes::default(),
1327 right_exclusive: false,
1328 },
1329 internal_table_id: HashSet::default(),
1330 level: 3,
1331 };
1332 let mut selector = ManualCompactionSelector::new(option);
1333 let task = selector
1334 .pick_compaction(
1335 1,
1336 compaction_selector_context(
1337 &group_config,
1338 &levels,
1339 &BTreeSet::new(),
1340 &mut levels_handler,
1341 &mut local_stats,
1342 &HashMap::default(),
1343 Arc::new(CompactionDeveloperConfig::default()),
1344 &Default::default(),
1345 &HummockVersionStateTableInfo::empty(),
1346 ),
1347 )
1348 .unwrap();
1349 assert_compaction_task(&task, &levels_handler);
1350 assert_eq!(task.input.input_levels.len(), 2);
1351 assert_eq!(task.input.input_levels[0].level_idx, 3);
1352 assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
1353 assert_eq!(task.input.input_levels[1].level_idx, 4);
1354 assert_eq!(task.input.input_levels[1].table_infos.len(), 2);
1355 assert_eq!(task.input.target_level, 4);
1356 }
1357
1358 for level_handler in &mut levels_handler {
1359 for pending_task_id in &level_handler.pending_tasks_ids() {
1360 level_handler.remove_task(*pending_task_id);
1361 }
1362 }
1363
1364 {
1366 let option = ManualCompactionOption {
1367 sst_ids: vec![],
1368 key_range: KeyRange {
1369 left: Bytes::default(),
1370 right: Bytes::default(),
1371 right_exclusive: false,
1372 },
1373 internal_table_id: HashSet::default(),
1374 level: 4,
1375 };
1376 let mut selector = ManualCompactionSelector::new(option);
1377 let task = selector
1378 .pick_compaction(
1379 1,
1380 compaction_selector_context(
1381 &group_config,
1382 &levels,
1383 &BTreeSet::new(),
1384 &mut levels_handler,
1385 &mut local_stats,
1386 &HashMap::default(),
1387 Arc::new(CompactionDeveloperConfig::default()),
1388 &Default::default(),
1389 &HummockVersionStateTableInfo::empty(),
1390 ),
1391 )
1392 .unwrap();
1393 assert_compaction_task(&task, &levels_handler);
1394 assert_eq!(task.input.input_levels.len(), 2);
1395 assert_eq!(task.input.input_levels[0].level_idx, 4);
1396 assert_eq!(task.input.input_levels[0].table_infos.len(), 6);
1397 assert_eq!(task.input.input_levels[1].level_idx, 4);
1398 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
1399 assert_eq!(task.input.target_level, 4);
1400 assert!(matches!(
1401 task.compaction_task_type,
1402 compact_task::TaskType::Manual
1403 ));
1404 }
1405 }
1406}