1use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_hummock_sdk::HummockSstableId;
20use risingwave_hummock_sdk::level::{InputLevel, Level, Levels, OverlappingLevel};
21use risingwave_hummock_sdk::sstable_info::SstableInfo;
22use risingwave_pb::hummock::LevelType;
23
24use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
25use crate::hummock::compaction::overlap_strategy::{
26 OverlapInfo, OverlapStrategy, RangeOverlapInfo,
27};
28use crate::hummock::compaction::selector::ManualCompactionOption;
29use crate::hummock::level_handler::LevelHandler;
30
31pub struct ManualCompactionPicker {
32 overlap_strategy: Arc<dyn OverlapStrategy>,
33 option: ManualCompactionOption,
34 target_level: usize,
35}
36
37impl ManualCompactionPicker {
38 pub fn new(
39 overlap_strategy: Arc<dyn OverlapStrategy>,
40 option: ManualCompactionOption,
41 target_level: usize,
42 ) -> Self {
43 Self {
44 overlap_strategy,
45 option,
46 target_level,
47 }
48 }
49
50 fn pick_l0_to_sub_level(
51 &self,
52 l0: &OverlappingLevel,
53 level_handlers: &[LevelHandler],
54 ) -> Option<CompactionInput> {
55 assert_eq!(self.option.level, 0);
56 let mut input_levels = vec![];
57 let mut sub_level_id = 0;
58 let mut start_idx = None;
59 let mut end_idx = None;
60 for (idx, level) in l0.sub_levels.iter().enumerate() {
63 if !self.filter_level_by_option(level) {
64 continue;
65 }
66 if level_handlers[0].is_level_pending_compact(level) {
67 return None;
68 }
69 if start_idx.is_none() {
71 sub_level_id = level.sub_level_id;
72 start_idx = Some(idx as u64);
73 end_idx = start_idx;
74 } else {
75 end_idx = Some(idx as u64);
76 }
77 }
78 let (start_idx, end_idx) = match (start_idx, end_idx) {
79 (Some(start_idx), Some(end_idx)) => (start_idx, end_idx),
80 _ => {
81 return None;
82 }
83 };
84 for level in l0
86 .sub_levels
87 .iter()
88 .skip(start_idx as usize)
89 .take((end_idx - start_idx + 1) as usize)
90 {
91 input_levels.push(InputLevel {
92 level_idx: 0,
93 level_type: level.level_type,
94 table_infos: level.table_infos.clone(),
95 });
96 }
97 if input_levels.is_empty() {
98 return None;
99 }
100 input_levels.reverse();
101 Some(CompactionInput {
102 input_levels,
103 target_level: 0,
104 target_sub_level_id: sub_level_id,
105 ..Default::default()
106 })
107 }
108
109 fn pick_l0_to_base_level(
110 &self,
111 levels: &Levels,
112 level_handlers: &[LevelHandler],
113 ) -> Option<CompactionInput> {
114 assert!(self.option.level == 0 && self.target_level > 0);
115 for l in 1..self.target_level {
116 assert!(levels.levels[l - 1].table_infos.is_empty());
117 }
118 let l0 = &levels.l0;
119 let mut input_levels = vec![];
120 let mut max_sub_level_idx = usize::MAX;
121 let mut info = self.overlap_strategy.create_overlap_info();
122 for (idx, level) in l0.sub_levels.iter().enumerate() {
124 if !self.filter_level_by_option(level) {
125 continue;
126 }
127 if level_handlers[0].is_level_pending_compact(level) {
128 return None;
129 }
130
131 max_sub_level_idx = idx;
133 }
134 if max_sub_level_idx == usize::MAX {
135 return None;
136 }
137 for idx in 0..=max_sub_level_idx {
139 for table in &l0.sub_levels[idx].table_infos {
140 info.update(&table.key_range);
141 }
142 input_levels.push(InputLevel {
143 level_idx: 0,
144 level_type: l0.sub_levels[idx].level_type,
145 table_infos: l0.sub_levels[idx].table_infos.clone(),
146 })
147 }
148 let target_input_ssts_range =
149 info.check_multiple_overlap(&levels.levels[self.target_level - 1].table_infos);
150 let target_input_ssts = if target_input_ssts_range.is_empty() {
151 vec![]
152 } else {
153 levels.levels[self.target_level - 1].table_infos[target_input_ssts_range].to_vec()
154 };
155 if target_input_ssts
156 .iter()
157 .any(|table| level_handlers[self.target_level].is_pending_compact(&table.sst_id))
158 {
159 return None;
160 }
161 if input_levels.is_empty() {
162 return None;
163 }
164 input_levels.reverse();
165 input_levels.push(InputLevel {
166 level_idx: self.target_level as u32,
167 level_type: LevelType::Nonoverlapping,
168 table_infos: target_input_ssts,
169 });
170
171 Some(CompactionInput {
172 input_levels,
173 target_level: self.target_level,
174 target_sub_level_id: 0,
175 ..Default::default()
176 })
177 }
178
179 fn filter_level_by_option(&self, level: &Level) -> bool {
182 let mut hint_sst_ids: HashSet<HummockSstableId> = HashSet::new();
183 hint_sst_ids.extend(self.option.sst_ids.iter());
184 if self
185 .overlap_strategy
186 .check_overlap_with_range(&self.option.key_range, &level.table_infos)
187 .is_empty()
188 {
189 return false;
190 }
191 if !hint_sst_ids.is_empty()
192 && !level
193 .table_infos
194 .iter()
195 .any(|t| hint_sst_ids.contains(&t.sst_id))
196 {
197 return false;
198 }
199 if !self.option.internal_table_id.is_empty()
200 && !level.table_infos.iter().any(|sst_info| {
201 sst_info
202 .table_ids
203 .iter()
204 .any(|t| self.option.internal_table_id.contains(t))
205 })
206 {
207 return false;
208 }
209 true
210 }
211}
212
213impl CompactionPicker for ManualCompactionPicker {
214 fn pick_compaction(
215 &mut self,
216 levels: &Levels,
217 level_handlers: &[LevelHandler],
218 _stats: &mut LocalPickerStatistic,
219 ) -> Option<CompactionInput> {
220 if self.option.level == 0 {
221 if !self.option.sst_ids.is_empty() {
222 return self.pick_l0_to_sub_level(&levels.l0, level_handlers);
223 } else if self.target_level > 0 {
224 return self.pick_l0_to_base_level(levels, level_handlers);
225 } else {
226 return None;
227 }
228 }
229 let mut hint_sst_ids: HashSet<HummockSstableId> = HashSet::new();
230 hint_sst_ids.extend(self.option.sst_ids.iter());
231 let mut range_overlap_info = RangeOverlapInfo::default();
232 range_overlap_info.update(&self.option.key_range);
233 let level = self.option.level;
234 let target_level = self.target_level;
235 assert!(
236 self.option.level == self.target_level || self.option.level + 1 == self.target_level
237 );
238 let mut select_input_ssts: Vec<SstableInfo> = levels
240 .get_level(self.option.level)
241 .table_infos
242 .iter()
243 .filter(|sst_info| hint_sst_ids.is_empty() || hint_sst_ids.contains(&sst_info.sst_id))
244 .filter(|sst_info| range_overlap_info.check_overlap(sst_info))
245 .filter(|sst_info| {
246 if self.option.internal_table_id.is_empty() {
247 return true;
248 }
249
250 for table_id in &sst_info.table_ids {
252 if self.option.internal_table_id.contains(table_id) {
253 return true;
254 }
255 }
256 false
257 })
258 .cloned()
259 .collect();
260 if select_input_ssts.is_empty() {
261 return None;
262 }
263 let target_input_ssts = if target_level == level {
264 let (left, _) = levels
266 .get_level(level)
267 .table_infos
268 .iter()
269 .find_position(|p| p.sst_id == select_input_ssts.first().unwrap().sst_id)
270 .unwrap();
271 let (right, _) = levels
272 .get_level(level)
273 .table_infos
274 .iter()
275 .find_position(|p| p.sst_id == select_input_ssts.last().unwrap().sst_id)
276 .unwrap();
277 select_input_ssts = levels.get_level(level).table_infos[left..=right].to_vec();
278 vec![]
279 } else {
280 self.overlap_strategy.check_base_level_overlap(
281 &select_input_ssts,
282 &levels.get_level(target_level).table_infos,
283 )
284 };
285 if select_input_ssts
286 .iter()
287 .any(|table| level_handlers[level].is_pending_compact(&table.sst_id))
288 {
289 return None;
290 }
291 if target_input_ssts
292 .iter()
293 .any(|table| level_handlers[target_level].is_pending_compact(&table.sst_id))
294 {
295 return None;
296 }
297
298 Some(CompactionInput {
299 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
300 target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
301 total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
302 input_levels: vec![
303 InputLevel {
304 level_idx: level as u32,
305 level_type: levels.levels[level - 1].level_type,
306 table_infos: select_input_ssts,
307 },
308 InputLevel {
309 level_idx: target_level as u32,
310 level_type: levels.levels[target_level - 1].level_type,
311 table_infos: target_input_ssts,
312 },
313 ],
314 target_level,
315 ..Default::default()
316 })
317 }
318}
319
320#[cfg(test)]
321pub mod tests {
322 use std::collections::{BTreeSet, HashMap};
323
324 use bytes::Bytes;
325 use risingwave_hummock_sdk::key_range::KeyRange;
326 use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
327 use risingwave_pb::hummock::compact_task;
328
329 use super::*;
330 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
331 use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
332 use crate::hummock::compaction::selector::tests::{
333 assert_compaction_task, generate_l0_nonoverlapping_sublevels,
334 generate_l0_overlapping_sublevels, generate_level, generate_table,
335 };
336 use crate::hummock::compaction::selector::{CompactionSelector, ManualCompactionSelector};
337 use crate::hummock::compaction::{CompactionDeveloperConfig, LocalSelectorStatistic};
338 use crate::hummock::model::CompactionGroup;
339 use crate::hummock::test_utils::{compaction_selector_context, iterator_test_key_of_epoch};
340
341 fn clean_task_state(level_handler: &mut LevelHandler) {
342 for pending_task_id in &level_handler.pending_tasks_ids() {
343 level_handler.remove_task(*pending_task_id);
344 }
345 }
346
347 fn is_l0_to_lbase(compaction_input: &CompactionInput) -> bool {
348 compaction_input
349 .input_levels
350 .iter()
351 .take(compaction_input.input_levels.len() - 1)
352 .all(|i| i.level_idx == 0)
353 && compaction_input
354 .input_levels
355 .iter()
356 .last()
357 .unwrap()
358 .level_idx as usize
359 == compaction_input.target_level
360 && compaction_input.target_level > 0
361 }
362
363 fn is_l0_to_l0(compaction_input: &CompactionInput) -> bool {
364 compaction_input
365 .input_levels
366 .iter()
367 .all(|i| i.level_idx == 0)
368 && compaction_input.target_level == 0
369 }
370
371 #[test]
372 fn test_manual_compaction_picker() {
373 let levels = vec![
374 Level {
375 level_idx: 1,
376 level_type: LevelType::Nonoverlapping,
377 table_infos: vec![
378 generate_table(0, 1, 0, 100, 1),
379 generate_table(1, 1, 101, 200, 1),
380 generate_table(2, 1, 222, 300, 1),
381 ],
382 ..Default::default()
383 },
384 Level {
385 level_idx: 2,
386 level_type: LevelType::Nonoverlapping,
387 table_infos: vec![
388 generate_table(4, 1, 0, 100, 1),
389 generate_table(5, 1, 101, 150, 1),
390 generate_table(6, 1, 151, 201, 1),
391 generate_table(7, 1, 501, 800, 1),
392 generate_table(8, 2, 301, 400, 1),
393 ],
394 ..Default::default()
395 },
396 ];
397 let mut levels = Levels {
398 levels,
399 l0: generate_l0_nonoverlapping_sublevels(vec![]),
400 ..Default::default()
401 };
402 let mut levels_handler = vec![
403 LevelHandler::new(0),
404 LevelHandler::new(1),
405 LevelHandler::new(2),
406 ];
407 let mut local_stats = LocalPickerStatistic::default();
408
409 {
410 let option = ManualCompactionOption {
412 level: 1,
413 key_range: KeyRange {
414 left: Bytes::from(iterator_test_key_of_epoch(1, 0, 1)),
415 right: Bytes::from(iterator_test_key_of_epoch(1, 201, 1)),
416 right_exclusive: false,
417 },
418 ..Default::default()
419 };
420
421 let target_level = option.level + 1;
422 let mut picker = ManualCompactionPicker::new(
423 Arc::new(RangeOverlapStrategy::default()),
424 option,
425 target_level,
426 );
427 let result = picker
428 .pick_compaction(&levels, &levels_handler, &mut local_stats)
429 .unwrap();
430 result.add_pending_task(0, &mut levels_handler);
431
432 assert_eq!(2, result.input_levels[0].table_infos.len());
433 assert_eq!(3, result.input_levels[1].table_infos.len());
434 }
435
436 {
437 clean_task_state(&mut levels_handler[1]);
438 clean_task_state(&mut levels_handler[2]);
439
440 let option = ManualCompactionOption::default();
442 let target_level = option.level + 1;
443 let mut picker = ManualCompactionPicker::new(
444 Arc::new(RangeOverlapStrategy::default()),
445 option,
446 target_level,
447 );
448 let result = picker
449 .pick_compaction(&levels, &levels_handler, &mut local_stats)
450 .unwrap();
451 result.add_pending_task(0, &mut levels_handler);
452
453 assert_eq!(3, result.input_levels[0].table_infos.len());
454 assert_eq!(3, result.input_levels[1].table_infos.len());
455 }
456
457 {
458 clean_task_state(&mut levels_handler[1]);
459 clean_task_state(&mut levels_handler[2]);
460
461 let level_table_info = &mut levels.levels[0].table_infos;
462 let table_info_1 = &mut level_table_info[1];
463 let mut t_inner = table_info_1.get_inner();
464 t_inner.table_ids.resize(2, 0);
465 t_inner.table_ids[0] = 1;
466 t_inner.table_ids[1] = 2;
467 *table_info_1 = t_inner.into();
468
469 let option = ManualCompactionOption {
471 level: 1,
472 internal_table_id: HashSet::from([2]),
473 ..Default::default()
474 };
475
476 let target_level = option.level + 1;
477 let mut picker = ManualCompactionPicker::new(
478 Arc::new(RangeOverlapStrategy::default()),
479 option,
480 target_level,
481 );
482
483 let result = picker
484 .pick_compaction(&levels, &levels_handler, &mut local_stats)
485 .unwrap();
486 result.add_pending_task(0, &mut levels_handler);
487
488 assert_eq!(1, result.input_levels[0].table_infos.len());
489 assert_eq!(2, result.input_levels[1].table_infos.len());
490 }
491
492 {
493 clean_task_state(&mut levels_handler[1]);
494 clean_task_state(&mut levels_handler[2]);
495
496 let level_table_info = &mut levels.levels[0].table_infos;
498 for table_info in level_table_info {
499 let mut t_inner = table_info.get_inner();
500 t_inner.table_ids.resize(2, 0);
501 t_inner.table_ids[0] = 1;
502 t_inner.table_ids[1] = 2;
503 *table_info = t_inner.into();
504 }
505
506 let option = ManualCompactionOption {
508 sst_ids: vec![],
509 level: 1,
510 key_range: KeyRange {
511 left: Bytes::from(iterator_test_key_of_epoch(1, 101, 1)),
512 right: Bytes::from(iterator_test_key_of_epoch(1, 199, 1)),
513 right_exclusive: false,
514 },
515 internal_table_id: HashSet::from([2]),
516 };
517
518 let target_level = option.level + 1;
519 let mut picker = ManualCompactionPicker::new(
520 Arc::new(RangeOverlapStrategy::default()),
521 option,
522 target_level,
523 );
524
525 let result = picker
526 .pick_compaction(&levels, &levels_handler, &mut local_stats)
527 .unwrap();
528
529 assert_eq!(1, result.input_levels[0].table_infos.len());
530 assert_eq!(2, result.input_levels[1].table_infos.len());
531 }
532 }
533
534 fn generate_test_levels() -> (Levels, Vec<LevelHandler>) {
535 let mut l0 = generate_l0_overlapping_sublevels(vec![
536 vec![
537 generate_table(5, 1, 0, 500, 2),
538 generate_table(6, 2, 600, 1000, 2),
539 ],
540 vec![
541 generate_table(7, 1, 0, 500, 3),
542 generate_table(8, 2, 600, 1000, 3),
543 ],
544 vec![
545 generate_table(9, 1, 300, 500, 4),
546 generate_table(10, 2, 600, 1000, 4),
547 ],
548 ]);
549 l0.sub_levels[1].level_type = LevelType::Nonoverlapping as _;
551 assert_eq!(l0.sub_levels.len(), 3);
552 let mut levels = vec![
553 Level {
554 level_idx: 1,
555 level_type: LevelType::Nonoverlapping,
556 table_infos: vec![
557 generate_table(3, 1, 0, 100, 1),
558 generate_table(4, 2, 2000, 3000, 1),
559 ],
560 ..Default::default()
561 },
562 Level {
563 level_idx: 2,
564 level_type: LevelType::Nonoverlapping,
565 table_infos: vec![
566 generate_table(1, 1, 0, 100, 1),
567 generate_table(2, 2, 2000, 3000, 1),
568 ],
569 ..Default::default()
570 },
571 ];
572 assert_eq!(levels.len(), 2);
574 for iter in [l0.sub_levels.iter_mut(), levels.iter_mut()] {
575 for (idx, l) in iter.enumerate() {
576 for t in &mut l.table_infos {
577 let mut t_inner = t.get_inner();
578 t_inner.table_ids.clear();
579 if idx == 0 {
580 t_inner.table_ids.push(((t.sst_id.inner() % 2) + 1) as _);
581 } else {
582 t_inner.table_ids.push(3);
583 }
584 *t = t_inner.into();
585 }
586 }
587 }
588 let levels = Levels {
589 levels,
590 l0,
591 ..Default::default()
592 };
593
594 let levels_handler = vec![
595 LevelHandler::new(0),
596 LevelHandler::new(1),
597 LevelHandler::new(2),
598 ];
599 (levels, levels_handler)
600 }
601
602 fn generate_intra_test_levels() -> (Levels, Vec<LevelHandler>) {
603 let l0 = generate_l0_overlapping_sublevels(vec![]);
604 let levels = vec![Level {
605 level_idx: 1,
606 level_type: LevelType::Nonoverlapping,
607 table_infos: vec![
608 generate_table(1, 1, 0, 100, 1),
609 generate_table(2, 2, 100, 200, 1),
610 generate_table(3, 2, 200, 300, 1),
611 generate_table(4, 2, 300, 400, 1),
612 ],
613 ..Default::default()
614 }];
615 let levels = Levels {
616 levels,
617 l0,
618 ..Default::default()
619 };
620
621 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
622 (levels, levels_handler)
623 }
624
625 #[test]
626 fn test_l0_empty() {
627 let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
628 let levels = vec![Level {
629 level_idx: 1,
630 level_type: LevelType::Nonoverlapping,
631 table_infos: vec![],
632 total_file_size: 0,
633 sub_level_id: 0,
634 uncompressed_file_size: 0,
635 ..Default::default()
636 }];
637 let levels = Levels {
638 levels,
639 l0,
640 ..Default::default()
641 };
642 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
643 let option = ManualCompactionOption {
644 sst_ids: vec![1.into()],
645 level: 0,
646 key_range: KeyRange {
647 left: Bytes::default(),
648 right: Bytes::default(),
649 right_exclusive: false,
650 },
651 internal_table_id: HashSet::default(),
652 };
653 let mut picker =
654 ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 0);
655 assert!(
656 picker
657 .pick_compaction(
658 &levels,
659 &levels_handler,
660 &mut LocalPickerStatistic::default()
661 )
662 .is_none()
663 );
664 }
665
666 #[test]
667 fn test_l0_basic() {
668 let (levels, levels_handler) = generate_test_levels();
669
670 let option = ManualCompactionOption {
672 sst_ids: vec![],
673 level: 0,
674 key_range: KeyRange {
675 left: Bytes::default(),
676 right: Bytes::default(),
677 right_exclusive: false,
678 },
679 internal_table_id: HashSet::default(),
680 };
681 let mut picker = ManualCompactionPicker::new(
682 Arc::new(RangeOverlapStrategy::default()),
683 option.clone(),
684 0,
685 );
686 let mut local_stats = LocalPickerStatistic::default();
687 assert!(
688 picker
689 .pick_compaction(&levels, &levels_handler, &mut local_stats)
690 .is_none()
691 );
692
693 let mut picker =
695 ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
696 let mut expected = [vec![5, 6], vec![7, 8], vec![9, 10]];
697 expected.reverse();
698 let result = picker
699 .pick_compaction(&levels, &levels_handler, &mut local_stats)
700 .unwrap();
701 assert_eq!(result.input_levels.len(), 4);
702 assert!(is_l0_to_lbase(&result));
703 assert_eq!(result.target_level, 1);
704 for (l, e) in expected.iter().enumerate().take(3) {
705 assert_eq!(
706 result.input_levels[l]
707 .table_infos
708 .iter()
709 .map(|s| s.sst_id)
710 .collect_vec(),
711 *e
712 );
713 }
714 assert_eq!(
715 result.input_levels[3].table_infos,
716 vec![levels.levels[0].table_infos[0].clone()]
717 );
718
719 let option = ManualCompactionOption {
721 sst_ids: vec![],
722 level: 0,
723 key_range: KeyRange {
724 left: Bytes::from(iterator_test_key_of_epoch(1, 0, 2)),
725 right: Bytes::from(iterator_test_key_of_epoch(1, 200, 2)),
726 right_exclusive: false,
727 },
728 internal_table_id: HashSet::default(),
729 };
730 let mut picker =
731 ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
732 let mut expected = [vec![5, 6], vec![7, 8]];
733 expected.reverse();
734 let result = picker
735 .pick_compaction(&levels, &levels_handler, &mut local_stats)
736 .unwrap();
737 assert_eq!(result.input_levels.len(), 3);
738 assert!(is_l0_to_lbase(&result));
739 assert_eq!(result.target_level, 1);
740 for (l, e) in expected.iter().enumerate().take(2) {
741 assert_eq!(
742 result.input_levels[l]
743 .table_infos
744 .iter()
745 .map(|s| s.sst_id)
746 .collect_vec(),
747 *e
748 );
749 }
750 assert_eq!(
751 result.input_levels[2].table_infos,
752 vec![levels.levels[0].table_infos[0].clone()]
753 );
754 }
755
756 #[test]
757 fn test_l0_to_l0_option_sst_ids() {
758 let (levels, levels_handler) = generate_test_levels();
759 let sst_id_filters = vec![
761 (0, vec![6], vec![vec![5, 6]]),
762 (0, vec![7], vec![vec![7, 8]]),
763 (0, vec![9], vec![vec![9, 10]]),
764 (0, vec![6, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
765 (0, vec![8, 9], vec![vec![7, 8], vec![9, 10]]),
766 (0, vec![6, 8, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
767 ];
768 let mut local_stats = LocalPickerStatistic::default();
769 for (input_level, sst_id_filter, expected) in &sst_id_filters {
770 let expected = expected.iter().rev().cloned().collect_vec();
771 let option = ManualCompactionOption {
772 sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
773 level: *input_level as _,
774 key_range: KeyRange {
775 left: Bytes::default(),
776 right: Bytes::default(),
777 right_exclusive: false,
778 },
779 internal_table_id: HashSet::default(),
780 };
781 let mut picker = ManualCompactionPicker::new(
782 Arc::new(RangeOverlapStrategy::default()),
783 option.clone(),
784 input_level + 1,
786 );
787 let result = picker
788 .pick_compaction(&levels, &levels_handler, &mut local_stats)
789 .unwrap();
790 assert!(is_l0_to_l0(&result));
791 assert_eq!(result.input_levels.len(), expected.len());
792 for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
793 assert_eq!(
794 result.input_levels[i]
795 .table_infos
796 .iter()
797 .map(|s| s.sst_id)
798 .collect_vec(),
799 *e
800 );
801 }
802 }
803 }
804
805 #[test]
806 fn test_l0_to_lbase_option_internal_table() {
807 let (levels, mut levels_handler) = generate_test_levels();
808 let input_level = 0;
809 let target_level = input_level + 1;
810 let mut local_stats = LocalPickerStatistic::default();
811 {
812 let option = ManualCompactionOption {
813 sst_ids: vec![],
814 level: input_level,
815 key_range: KeyRange {
816 left: Bytes::default(),
817 right: Bytes::default(),
818 right_exclusive: false,
819 },
820 internal_table_id: HashSet::from([100]),
822 };
823 let mut picker = ManualCompactionPicker::new(
824 Arc::new(RangeOverlapStrategy::default()),
825 option,
826 target_level,
827 );
828 assert!(
829 picker
830 .pick_compaction(&levels, &levels_handler, &mut local_stats)
831 .is_none()
832 )
833 }
834
835 {
836 let option = ManualCompactionOption {
837 sst_ids: vec![],
838 level: input_level,
839 key_range: KeyRange {
840 left: Bytes::default(),
841 right: Bytes::default(),
842 right_exclusive: false,
843 },
844 internal_table_id: HashSet::from([1, 2, 3]),
846 };
847 let mut picker = ManualCompactionPicker::new(
848 Arc::new(RangeOverlapStrategy::default()),
849 option,
850 target_level,
851 );
852 let result = picker
853 .pick_compaction(&levels, &levels_handler, &mut local_stats)
854 .unwrap();
855 assert_eq!(result.input_levels.len(), 4);
856 assert!(is_l0_to_lbase(&result));
857 assert_eq!(result.target_level, 1);
858 assert!(is_l0_to_lbase(&result));
859 assert_eq!(
860 result
861 .input_levels
862 .iter()
863 .take(3)
864 .flat_map(|s| s.table_infos.clone())
865 .map(|s| s.sst_id)
866 .collect_vec(),
867 vec![9, 10, 7, 8, 5, 6]
868 );
869 assert_eq!(
870 result.input_levels[3]
871 .table_infos
872 .iter()
873 .map(|s| s.sst_id)
874 .collect_vec(),
875 vec![3]
876 );
877 }
878
879 {
880 let option = ManualCompactionOption {
881 sst_ids: vec![],
882 level: input_level,
883 key_range: KeyRange {
884 left: Bytes::default(),
885 right: Bytes::default(),
886 right_exclusive: false,
887 },
888 internal_table_id: HashSet::from([3]),
890 };
891 let mut picker = ManualCompactionPicker::new(
892 Arc::new(RangeOverlapStrategy::default()),
893 option,
894 target_level,
895 );
896 let result = picker
897 .pick_compaction(&levels, &levels_handler, &mut local_stats)
898 .unwrap();
899 assert_eq!(result.input_levels.len(), 4);
900 assert!(is_l0_to_lbase(&result));
901 assert_eq!(
902 result
903 .input_levels
904 .iter()
905 .take(3)
906 .flat_map(|s| s.table_infos.clone())
907 .map(|s| s.sst_id)
908 .collect_vec(),
909 vec![9, 10, 7, 8, 5, 6]
910 );
911 assert_eq!(
912 result.input_levels[3]
913 .table_infos
914 .iter()
915 .map(|s| s.sst_id)
916 .collect_vec(),
917 vec![3]
918 );
919 assert_eq!(result.target_level, 1);
920 }
921
922 {
923 let option = ManualCompactionOption {
924 sst_ids: vec![],
925 level: input_level,
926 key_range: KeyRange {
927 left: Bytes::default(),
928 right: Bytes::default(),
929 right_exclusive: false,
930 },
931 internal_table_id: HashSet::from([1]),
934 };
935 let mut picker = ManualCompactionPicker::new(
936 Arc::new(RangeOverlapStrategy::default()),
937 option,
938 target_level,
939 );
940 let result = picker
941 .pick_compaction(&levels, &levels_handler, &mut local_stats)
942 .unwrap();
943 result.add_pending_task(0, &mut levels_handler);
944 assert_eq!(result.input_levels.len(), 2);
945 assert!(is_l0_to_lbase(&result));
946 assert_eq!(result.target_level, 1);
947 assert_eq!(
948 result
949 .input_levels
950 .iter()
951 .take(1)
952 .flat_map(|s| s.table_infos.clone())
953 .map(|s| s.sst_id)
954 .collect_vec(),
955 vec![5, 6]
956 );
957 assert_eq!(
958 result.input_levels[1]
959 .table_infos
960 .iter()
961 .map(|s| s.sst_id)
962 .collect_vec(),
963 vec![3]
964 );
965
966 let option = ManualCompactionOption {
968 sst_ids: vec![],
969 level: input_level,
970 key_range: KeyRange {
971 left: Bytes::default(),
972 right: Bytes::default(),
973 right_exclusive: false,
974 },
975 internal_table_id: HashSet::from([3]),
977 };
978 let mut picker = ManualCompactionPicker::new(
979 Arc::new(RangeOverlapStrategy::default()),
980 option,
981 target_level,
982 );
983 assert!(
985 picker
986 .pick_compaction(&levels, &levels_handler, &mut local_stats)
987 .is_none()
988 );
989
990 clean_task_state(&mut levels_handler[0]);
991 clean_task_state(&mut levels_handler[1]);
992 }
993 }
994
995 #[test]
996 fn test_ln_to_lnext_option_internal_table() {
997 let (levels, levels_handler) = generate_test_levels();
998 let input_level = 1;
999 let target_level = input_level + 1;
1000 let mut local_stats = LocalPickerStatistic::default();
1001 {
1002 let option = ManualCompactionOption {
1003 sst_ids: vec![],
1004 level: input_level,
1005 key_range: KeyRange {
1006 left: Bytes::default(),
1007 right: Bytes::default(),
1008 right_exclusive: false,
1009 },
1010 internal_table_id: HashSet::from([100]),
1012 };
1013 let mut picker = ManualCompactionPicker::new(
1014 Arc::new(RangeOverlapStrategy::default()),
1015 option,
1016 target_level,
1017 );
1018 assert!(
1019 picker
1020 .pick_compaction(&levels, &levels_handler, &mut local_stats)
1021 .is_none()
1022 )
1023 }
1024
1025 {
1026 let expected_input_level_sst_ids = [vec![4], vec![2]];
1027 let option = ManualCompactionOption {
1028 sst_ids: vec![],
1029 level: input_level,
1030 key_range: KeyRange {
1031 left: Bytes::default(),
1032 right: Bytes::default(),
1033 right_exclusive: false,
1034 },
1035 internal_table_id: HashSet::from([1]),
1037 };
1038 let mut picker = ManualCompactionPicker::new(
1039 Arc::new(RangeOverlapStrategy::default()),
1040 option,
1041 target_level,
1042 );
1043 let result = picker
1044 .pick_compaction(&levels, &levels_handler, &mut local_stats)
1045 .unwrap();
1046 assert_eq!(
1047 result.input_levels.len(),
1048 expected_input_level_sst_ids.len()
1049 );
1050 assert_eq!(result.target_level, target_level);
1051 for (l, e) in expected_input_level_sst_ids
1052 .iter()
1053 .enumerate()
1054 .take(result.input_levels.len())
1055 {
1056 assert_eq!(
1057 result.input_levels[l]
1058 .table_infos
1059 .iter()
1060 .map(|s| s.sst_id)
1061 .collect_vec(),
1062 *e
1063 );
1064 }
1065 }
1066 }
1067
1068 #[test]
1069 fn test_ln_to_lnext_option_sst_ids() {
1070 let (levels, levels_handler) = generate_test_levels();
1071 let sst_id_filters = vec![
1073 (1, vec![3], vec![vec![3], vec![1]]),
1074 (1, vec![4], vec![vec![4], vec![2]]),
1075 (1, vec![3, 4], vec![vec![3, 4], vec![1, 2]]),
1076 ];
1077 let mut local_stats = LocalPickerStatistic::default();
1078 for (input_level, sst_id_filter, expected) in &sst_id_filters {
1079 let option = ManualCompactionOption {
1080 sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
1081 level: *input_level as _,
1082 key_range: KeyRange {
1083 left: Bytes::default(),
1084 right: Bytes::default(),
1085 right_exclusive: false,
1086 },
1087 internal_table_id: HashSet::default(),
1088 };
1089 let mut picker = ManualCompactionPicker::new(
1090 Arc::new(RangeOverlapStrategy::default()),
1091 option.clone(),
1092 input_level + 1,
1093 );
1094 let result = picker
1095 .pick_compaction(&levels, &levels_handler, &mut local_stats)
1096 .unwrap();
1097 assert_eq!(result.input_levels.len(), expected.len());
1098 for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1099 assert_eq!(
1100 result.input_levels[i]
1101 .table_infos
1102 .iter()
1103 .map(|s| s.sst_id)
1104 .collect_vec(),
1105 *e
1106 );
1107 }
1108 }
1109 }
1110
1111 #[test]
1112 fn test_ln_to_ln() {
1113 let (levels, levels_handler) = generate_intra_test_levels();
1114 let sst_id_filters = vec![
1116 (1, vec![1], vec![vec![1], vec![]]),
1117 (1, vec![3], vec![vec![3], vec![]]),
1118 (1, vec![4], vec![vec![4], vec![]]),
1119 (1, vec![3, 4], vec![vec![3, 4], vec![]]),
1120 (1, vec![1, 4], vec![vec![1, 2, 3, 4], vec![]]),
1121 (1, vec![2, 4], vec![vec![2, 3, 4], vec![]]),
1122 (1, vec![1, 3], vec![vec![1, 2, 3], vec![]]),
1123 ];
1124 for (input_level, sst_id_filter, expected) in &sst_id_filters {
1125 let option = ManualCompactionOption {
1126 sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
1127 level: *input_level as _,
1128 key_range: KeyRange {
1129 left: Bytes::default(),
1130 right: Bytes::default(),
1131 right_exclusive: false,
1132 },
1133 internal_table_id: HashSet::default(),
1134 };
1135 let mut picker = ManualCompactionPicker::new(
1136 Arc::new(RangeOverlapStrategy::default()),
1137 option.clone(),
1138 *input_level as _,
1139 );
1140 let result = picker
1141 .pick_compaction(
1142 &levels,
1143 &levels_handler,
1144 &mut LocalPickerStatistic::default(),
1145 )
1146 .unwrap();
1147 assert_eq!(result.input_levels.len(), expected.len());
1148 for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1149 assert_eq!(
1150 result.input_levels[i]
1151 .table_infos
1152 .iter()
1153 .map(|s| s.sst_id)
1154 .collect_vec(),
1155 *e
1156 );
1157 }
1158 }
1159 }
1160
1161 #[test]
1162 fn test_manual_compaction_selector_l0() {
1163 let config = CompactionConfigBuilder::new().max_level(4).build();
1164 let group_config = CompactionGroup::new(1, config);
1165 let l0 = generate_l0_nonoverlapping_sublevels(vec![
1166 generate_table(0, 1, 0, 500, 1),
1167 generate_table(1, 1, 0, 500, 1),
1168 ]);
1169 assert_eq!(l0.sub_levels.len(), 2);
1170 let levels = vec![
1171 generate_level(1, vec![]),
1172 generate_level(2, vec![]),
1173 generate_level(3, vec![]),
1174 Level {
1175 level_idx: 4,
1176 level_type: LevelType::Nonoverlapping,
1177 table_infos: vec![
1178 generate_table(2, 1, 0, 100, 1),
1179 generate_table(3, 1, 101, 200, 1),
1180 generate_table(4, 1, 222, 300, 1),
1181 ],
1182 ..Default::default()
1183 },
1184 ];
1185 assert_eq!(levels.len(), 4);
1186 let levels = Levels {
1187 levels,
1188 l0,
1189 ..Default::default()
1190 };
1191 let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1192 let mut local_stats = LocalSelectorStatistic::default();
1193
1194 {
1196 let option = ManualCompactionOption {
1197 sst_ids: [0, 1].iter().cloned().map(Into::into).collect(),
1198 key_range: KeyRange {
1199 left: Bytes::default(),
1200 right: Bytes::default(),
1201 right_exclusive: false,
1202 },
1203 internal_table_id: HashSet::default(),
1204 level: 0,
1205 };
1206 let mut selector = ManualCompactionSelector::new(option);
1207 let task = selector
1208 .pick_compaction(
1209 1,
1210 compaction_selector_context(
1211 &group_config,
1212 &levels,
1213 &BTreeSet::new(),
1214 &mut levels_handler,
1215 &mut local_stats,
1216 &HashMap::default(),
1217 Arc::new(CompactionDeveloperConfig::default()),
1218 &Default::default(),
1219 &HummockVersionStateTableInfo::empty(),
1220 ),
1221 )
1222 .unwrap();
1223 assert_compaction_task(&task, &levels_handler);
1224 assert_eq!(task.input.input_levels.len(), 2);
1225 assert_eq!(task.input.input_levels[0].level_idx, 0);
1226 assert_eq!(task.input.input_levels[1].level_idx, 0);
1227 assert_eq!(task.input.target_level, 0);
1228 }
1229
1230 for level_handler in &mut levels_handler {
1231 for pending_task_id in &level_handler.pending_tasks_ids() {
1232 level_handler.remove_task(*pending_task_id);
1233 }
1234 }
1235
1236 {
1238 let option = ManualCompactionOption {
1239 sst_ids: vec![],
1240 key_range: KeyRange {
1241 left: Bytes::default(),
1242 right: Bytes::default(),
1243 right_exclusive: false,
1244 },
1245 internal_table_id: HashSet::default(),
1246 level: 0,
1247 };
1248 let mut selector = ManualCompactionSelector::new(option);
1249 let task = selector
1250 .pick_compaction(
1251 2,
1252 compaction_selector_context(
1253 &group_config,
1254 &levels,
1255 &BTreeSet::new(),
1256 &mut levels_handler,
1257 &mut local_stats,
1258 &HashMap::default(),
1259 Arc::new(CompactionDeveloperConfig::default()),
1260 &Default::default(),
1261 &HummockVersionStateTableInfo::empty(),
1262 ),
1263 )
1264 .unwrap();
1265 assert_compaction_task(&task, &levels_handler);
1266 assert_eq!(task.input.input_levels.len(), 3);
1267 assert_eq!(task.input.input_levels[0].level_idx, 0);
1268 assert_eq!(task.input.input_levels[1].level_idx, 0);
1269 assert_eq!(task.input.input_levels[2].level_idx, 4);
1270 assert_eq!(task.input.target_level, 4);
1271 }
1272 }
1273
1274 #[test]
1276 fn test_manual_compaction_selector() {
1277 let config = CompactionConfigBuilder::new().max_level(4).build();
1278 let group_config = CompactionGroup::new(1, config);
1279 let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
1280 assert_eq!(l0.sub_levels.len(), 0);
1281 let levels = vec![
1282 generate_level(1, vec![]),
1283 generate_level(2, vec![]),
1284 generate_level(
1285 3,
1286 vec![
1287 generate_table(0, 1, 150, 151, 1),
1288 generate_table(1, 1, 250, 251, 1),
1289 ],
1290 ),
1291 Level {
1292 level_idx: 4,
1293 level_type: LevelType::Nonoverlapping,
1294 table_infos: vec![
1295 generate_table(2, 1, 0, 100, 1),
1296 generate_table(3, 1, 101, 200, 1),
1297 generate_table(4, 1, 222, 300, 1),
1298 generate_table(5, 1, 333, 400, 1),
1299 generate_table(6, 1, 444, 500, 1),
1300 generate_table(7, 1, 555, 600, 1),
1301 ],
1302 ..Default::default()
1303 },
1304 ];
1305 assert_eq!(levels.len(), 4);
1306 let levels = Levels {
1307 levels,
1308 l0,
1309 ..Default::default()
1310 };
1311 let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1312 let mut local_stats = LocalSelectorStatistic::default();
1313
1314 {
1316 let option = ManualCompactionOption {
1317 sst_ids: [0, 1].iter().cloned().map(Into::into).collect(),
1318 key_range: KeyRange {
1319 left: Bytes::default(),
1320 right: Bytes::default(),
1321 right_exclusive: false,
1322 },
1323 internal_table_id: HashSet::default(),
1324 level: 3,
1325 };
1326 let mut selector = ManualCompactionSelector::new(option);
1327 let task = selector
1328 .pick_compaction(
1329 1,
1330 compaction_selector_context(
1331 &group_config,
1332 &levels,
1333 &BTreeSet::new(),
1334 &mut levels_handler,
1335 &mut local_stats,
1336 &HashMap::default(),
1337 Arc::new(CompactionDeveloperConfig::default()),
1338 &Default::default(),
1339 &HummockVersionStateTableInfo::empty(),
1340 ),
1341 )
1342 .unwrap();
1343 assert_compaction_task(&task, &levels_handler);
1344 assert_eq!(task.input.input_levels.len(), 2);
1345 assert_eq!(task.input.input_levels[0].level_idx, 3);
1346 assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
1347 assert_eq!(task.input.input_levels[1].level_idx, 4);
1348 assert_eq!(task.input.input_levels[1].table_infos.len(), 2);
1349 assert_eq!(task.input.target_level, 4);
1350 }
1351
1352 for level_handler in &mut levels_handler {
1353 for pending_task_id in &level_handler.pending_tasks_ids() {
1354 level_handler.remove_task(*pending_task_id);
1355 }
1356 }
1357
1358 {
1360 let option = ManualCompactionOption {
1361 sst_ids: vec![],
1362 key_range: KeyRange {
1363 left: Bytes::default(),
1364 right: Bytes::default(),
1365 right_exclusive: false,
1366 },
1367 internal_table_id: HashSet::default(),
1368 level: 4,
1369 };
1370 let mut selector = ManualCompactionSelector::new(option);
1371 let task = selector
1372 .pick_compaction(
1373 1,
1374 compaction_selector_context(
1375 &group_config,
1376 &levels,
1377 &BTreeSet::new(),
1378 &mut levels_handler,
1379 &mut local_stats,
1380 &HashMap::default(),
1381 Arc::new(CompactionDeveloperConfig::default()),
1382 &Default::default(),
1383 &HummockVersionStateTableInfo::empty(),
1384 ),
1385 )
1386 .unwrap();
1387 assert_compaction_task(&task, &levels_handler);
1388 assert_eq!(task.input.input_levels.len(), 2);
1389 assert_eq!(task.input.input_levels[0].level_idx, 4);
1390 assert_eq!(task.input.input_levels[0].table_infos.len(), 6);
1391 assert_eq!(task.input.input_levels[1].level_idx, 4);
1392 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
1393 assert_eq!(task.input.target_level, 4);
1394 assert!(matches!(
1395 task.compaction_task_type,
1396 compact_task::TaskType::Manual
1397 ));
1398 }
1399 }
1400}