1use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_hummock_sdk::HummockSstableId;
20use risingwave_hummock_sdk::level::{InputLevel, Level, Levels, OverlappingLevel};
21use risingwave_hummock_sdk::sstable_info::SstableInfo;
22use risingwave_pb::hummock::LevelType;
23
24use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
25use crate::hummock::compaction::overlap_strategy::{
26 OverlapInfo, OverlapStrategy, RangeOverlapInfo,
27};
28use crate::hummock::compaction::selector::ManualCompactionOption;
29use crate::hummock::level_handler::LevelHandler;
30
31pub struct ManualCompactionPicker {
32 overlap_strategy: Arc<dyn OverlapStrategy>,
33 option: ManualCompactionOption,
34 target_level: usize,
35}
36
37impl ManualCompactionPicker {
38 pub fn new(
39 overlap_strategy: Arc<dyn OverlapStrategy>,
40 option: ManualCompactionOption,
41 target_level: usize,
42 ) -> Self {
43 Self {
44 overlap_strategy,
45 option,
46 target_level,
47 }
48 }
49
50 fn pick_l0_to_sub_level(
51 &self,
52 l0: &OverlappingLevel,
53 level_handlers: &[LevelHandler],
54 ) -> Option<CompactionInput> {
55 assert_eq!(self.option.level, 0);
56 let mut input_levels = vec![];
57 let mut sub_level_id = 0;
58 let mut start_idx = None;
59 let mut end_idx = None;
60 for (idx, level) in l0.sub_levels.iter().enumerate() {
63 if !self.filter_level_by_option(level) {
64 continue;
65 }
66 if level_handlers[0].is_level_pending_compact(level) {
67 return None;
68 }
69 if start_idx.is_none() {
71 sub_level_id = level.sub_level_id;
72 start_idx = Some(idx as u64);
73 end_idx = start_idx;
74 } else {
75 end_idx = Some(idx as u64);
76 }
77 }
78 let (start_idx, end_idx) = match (start_idx, end_idx) {
79 (Some(start_idx), Some(end_idx)) => (start_idx, end_idx),
80 _ => {
81 return None;
82 }
83 };
84 for level in l0
86 .sub_levels
87 .iter()
88 .skip(start_idx as usize)
89 .take((end_idx - start_idx + 1) as usize)
90 {
91 input_levels.push(InputLevel {
92 level_idx: 0,
93 level_type: level.level_type,
94 table_infos: level.table_infos.clone(),
95 });
96 }
97 if input_levels.is_empty() {
98 return None;
99 }
100 input_levels.reverse();
101 Some(CompactionInput {
102 input_levels,
103 target_level: 0,
104 target_sub_level_id: sub_level_id,
105 ..Default::default()
106 })
107 }
108
109 fn pick_l0_to_base_level(
110 &self,
111 levels: &Levels,
112 level_handlers: &[LevelHandler],
113 ) -> Option<CompactionInput> {
114 assert!(self.option.level == 0 && self.target_level > 0);
115 for l in 1..self.target_level {
116 assert!(levels.levels[l - 1].table_infos.is_empty());
117 }
118 let l0 = &levels.l0;
119 let mut input_levels = vec![];
120 let mut max_sub_level_idx = usize::MAX;
121 let mut info = self.overlap_strategy.create_overlap_info();
122 for (idx, level) in l0.sub_levels.iter().enumerate() {
124 if !self.filter_level_by_option(level) {
125 continue;
126 }
127 if level_handlers[0].is_level_pending_compact(level) {
128 return None;
129 }
130
131 max_sub_level_idx = idx;
133 }
134 if max_sub_level_idx == usize::MAX {
135 return None;
136 }
137 for idx in 0..=max_sub_level_idx {
139 for table in &l0.sub_levels[idx].table_infos {
140 info.update(&table.key_range);
141 }
142 input_levels.push(InputLevel {
143 level_idx: 0,
144 level_type: l0.sub_levels[idx].level_type,
145 table_infos: l0.sub_levels[idx].table_infos.clone(),
146 })
147 }
148 let target_input_ssts_range =
149 info.check_multiple_overlap(&levels.levels[self.target_level - 1].table_infos);
150 let target_input_ssts = if target_input_ssts_range.is_empty() {
151 vec![]
152 } else {
153 levels.levels[self.target_level - 1].table_infos[target_input_ssts_range].to_vec()
154 };
155 if target_input_ssts
156 .iter()
157 .any(|table| level_handlers[self.target_level].is_pending_compact(&table.sst_id))
158 {
159 return None;
160 }
161 if input_levels.is_empty() {
162 return None;
163 }
164 input_levels.reverse();
165 input_levels.push(InputLevel {
166 level_idx: self.target_level as u32,
167 level_type: LevelType::Nonoverlapping,
168 table_infos: target_input_ssts,
169 });
170
171 Some(CompactionInput {
172 input_levels,
173 target_level: self.target_level,
174 target_sub_level_id: 0,
175 ..Default::default()
176 })
177 }
178
179 fn filter_level_by_option(&self, level: &Level) -> bool {
182 let mut hint_sst_ids: HashSet<HummockSstableId> = HashSet::new();
183 hint_sst_ids.extend(self.option.sst_ids.iter());
184 if self
185 .overlap_strategy
186 .check_overlap_with_range(&self.option.key_range, &level.table_infos)
187 .is_empty()
188 {
189 return false;
190 }
191 if !hint_sst_ids.is_empty()
192 && !level
193 .table_infos
194 .iter()
195 .any(|t| hint_sst_ids.contains(&t.sst_id))
196 {
197 return false;
198 }
199 if !self.option.internal_table_id.is_empty()
200 && !level.table_infos.iter().any(|sst_info| {
201 sst_info
202 .table_ids
203 .iter()
204 .any(|t| self.option.internal_table_id.contains(t))
205 })
206 {
207 return false;
208 }
209 true
210 }
211}
212
213impl CompactionPicker for ManualCompactionPicker {
214 fn pick_compaction(
215 &mut self,
216 levels: &Levels,
217 level_handlers: &[LevelHandler],
218 _stats: &mut LocalPickerStatistic,
219 ) -> Option<CompactionInput> {
220 if self.option.exclusive
221 && level_handlers
222 .iter()
223 .any(|level_handler| level_handler.pending_file_count() > 0)
224 {
225 return None;
226 }
227 if self.option.level == 0 {
228 if !self.option.sst_ids.is_empty() {
229 return self.pick_l0_to_sub_level(&levels.l0, level_handlers);
230 } else if self.target_level > 0 {
231 return self.pick_l0_to_base_level(levels, level_handlers);
232 } else {
233 return None;
234 }
235 }
236 let mut hint_sst_ids: HashSet<HummockSstableId> = HashSet::new();
237 hint_sst_ids.extend(self.option.sst_ids.iter());
238 let mut range_overlap_info = RangeOverlapInfo::default();
239 range_overlap_info.update(&self.option.key_range);
240 let level = self.option.level;
241 let target_level = self.target_level;
242 assert!(
243 self.option.level == self.target_level || self.option.level + 1 == self.target_level
244 );
245 let mut select_input_ssts: Vec<SstableInfo> = levels
247 .get_level(self.option.level)
248 .table_infos
249 .iter()
250 .filter(|sst_info| hint_sst_ids.is_empty() || hint_sst_ids.contains(&sst_info.sst_id))
251 .filter(|sst_info| range_overlap_info.check_overlap(sst_info))
252 .filter(|sst_info| {
253 if self.option.internal_table_id.is_empty() {
254 return true;
255 }
256
257 for table_id in &sst_info.table_ids {
259 if self.option.internal_table_id.contains(table_id) {
260 return true;
261 }
262 }
263 false
264 })
265 .cloned()
266 .collect();
267 if select_input_ssts.is_empty() {
268 return None;
269 }
270 let target_input_ssts = if target_level == level {
271 let (left, _) = levels
273 .get_level(level)
274 .table_infos
275 .iter()
276 .find_position(|p| p.sst_id == select_input_ssts.first().unwrap().sst_id)
277 .unwrap();
278 let (right, _) = levels
279 .get_level(level)
280 .table_infos
281 .iter()
282 .find_position(|p| p.sst_id == select_input_ssts.last().unwrap().sst_id)
283 .unwrap();
284 select_input_ssts = levels.get_level(level).table_infos[left..=right].to_vec();
285 vec![]
286 } else {
287 self.overlap_strategy.check_base_level_overlap(
288 &select_input_ssts,
289 &levels.get_level(target_level).table_infos,
290 )
291 };
292 if select_input_ssts
293 .iter()
294 .any(|table| level_handlers[level].is_pending_compact(&table.sst_id))
295 {
296 return None;
297 }
298 if target_input_ssts
299 .iter()
300 .any(|table| level_handlers[target_level].is_pending_compact(&table.sst_id))
301 {
302 return None;
303 }
304
305 Some(CompactionInput {
306 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
307 target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
308 total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
309 input_levels: vec![
310 InputLevel {
311 level_idx: level as u32,
312 level_type: levels.levels[level - 1].level_type,
313 table_infos: select_input_ssts,
314 },
315 InputLevel {
316 level_idx: target_level as u32,
317 level_type: levels.levels[target_level - 1].level_type,
318 table_infos: target_input_ssts,
319 },
320 ],
321 target_level,
322 ..Default::default()
323 })
324 }
325}
326
327#[cfg(test)]
328pub mod tests {
329 use std::collections::{BTreeSet, HashMap};
330
331 use bytes::Bytes;
332 use risingwave_hummock_sdk::key_range::KeyRange;
333 use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
334 use risingwave_pb::hummock::compact_task;
335
336 use super::*;
337 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
338 use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
339 use crate::hummock::compaction::selector::tests::{
340 assert_compaction_task, generate_l0_nonoverlapping_sublevels,
341 generate_l0_overlapping_sublevels, generate_level, generate_table,
342 };
343 use crate::hummock::compaction::selector::{CompactionSelector, ManualCompactionSelector};
344 use crate::hummock::compaction::{CompactionDeveloperConfig, LocalSelectorStatistic};
345 use crate::hummock::model::CompactionGroup;
346 use crate::hummock::test_utils::{compaction_selector_context, iterator_test_key_of_epoch};
347
348 fn clean_task_state(level_handler: &mut LevelHandler) {
349 for pending_task_id in &level_handler.pending_tasks_ids() {
350 level_handler.remove_task(*pending_task_id);
351 }
352 }
353
354 fn is_l0_to_lbase(compaction_input: &CompactionInput) -> bool {
355 compaction_input
356 .input_levels
357 .iter()
358 .take(compaction_input.input_levels.len() - 1)
359 .all(|i| i.level_idx == 0)
360 && compaction_input
361 .input_levels
362 .iter()
363 .last()
364 .unwrap()
365 .level_idx as usize
366 == compaction_input.target_level
367 && compaction_input.target_level > 0
368 }
369
370 fn is_l0_to_l0(compaction_input: &CompactionInput) -> bool {
371 compaction_input
372 .input_levels
373 .iter()
374 .all(|i| i.level_idx == 0)
375 && compaction_input.target_level == 0
376 }
377
378 #[test]
379 fn test_manual_compaction_picker() {
380 let levels = vec![
381 Level {
382 level_idx: 1,
383 level_type: LevelType::Nonoverlapping,
384 table_infos: vec![
385 generate_table(0, 1, 0, 100, 1),
386 generate_table(1, 1, 101, 200, 1),
387 generate_table(2, 1, 222, 300, 1),
388 ],
389 ..Default::default()
390 },
391 Level {
392 level_idx: 2,
393 level_type: LevelType::Nonoverlapping,
394 table_infos: vec![
395 generate_table(4, 1, 0, 100, 1),
396 generate_table(5, 1, 101, 150, 1),
397 generate_table(6, 1, 151, 201, 1),
398 generate_table(7, 1, 501, 800, 1),
399 generate_table(8, 2, 301, 400, 1),
400 ],
401 ..Default::default()
402 },
403 ];
404 let mut levels = Levels {
405 levels,
406 l0: generate_l0_nonoverlapping_sublevels(vec![]),
407 ..Default::default()
408 };
409 let mut levels_handler = vec![
410 LevelHandler::new(0),
411 LevelHandler::new(1),
412 LevelHandler::new(2),
413 ];
414 let mut local_stats = LocalPickerStatistic::default();
415
416 {
417 let option = ManualCompactionOption {
419 level: 1,
420 key_range: KeyRange {
421 left: Bytes::from(iterator_test_key_of_epoch(1, 0, 1)),
422 right: Bytes::from(iterator_test_key_of_epoch(1, 201, 1)),
423 right_exclusive: false,
424 },
425 ..Default::default()
426 };
427
428 let target_level = option.level + 1;
429 let mut picker = ManualCompactionPicker::new(
430 Arc::new(RangeOverlapStrategy::default()),
431 option,
432 target_level,
433 );
434 let result = picker
435 .pick_compaction(&levels, &levels_handler, &mut local_stats)
436 .unwrap();
437 result.add_pending_task(0, &mut levels_handler);
438
439 assert_eq!(2, result.input_levels[0].table_infos.len());
440 assert_eq!(3, result.input_levels[1].table_infos.len());
441 }
442
443 {
444 clean_task_state(&mut levels_handler[1]);
445 clean_task_state(&mut levels_handler[2]);
446
447 let option = ManualCompactionOption::default();
449 let target_level = option.level + 1;
450 let mut picker = ManualCompactionPicker::new(
451 Arc::new(RangeOverlapStrategy::default()),
452 option,
453 target_level,
454 );
455 let result = picker
456 .pick_compaction(&levels, &levels_handler, &mut local_stats)
457 .unwrap();
458 result.add_pending_task(0, &mut levels_handler);
459
460 assert_eq!(3, result.input_levels[0].table_infos.len());
461 assert_eq!(3, result.input_levels[1].table_infos.len());
462 }
463
464 {
465 clean_task_state(&mut levels_handler[1]);
466 clean_task_state(&mut levels_handler[2]);
467
468 let level_table_info = &mut levels.levels[0].table_infos;
469 let table_info_1 = &mut level_table_info[1];
470 let mut t_inner = table_info_1.get_inner();
471 t_inner.table_ids.resize(2, 0.into());
472 t_inner.table_ids[0] = 1.into();
473 t_inner.table_ids[1] = 2.into();
474 *table_info_1 = t_inner.into();
475
476 let option = ManualCompactionOption {
478 level: 1,
479 internal_table_id: HashSet::from([2.into()]),
480 ..Default::default()
481 };
482
483 let target_level = option.level + 1;
484 let mut picker = ManualCompactionPicker::new(
485 Arc::new(RangeOverlapStrategy::default()),
486 option,
487 target_level,
488 );
489
490 let result = picker
491 .pick_compaction(&levels, &levels_handler, &mut local_stats)
492 .unwrap();
493 result.add_pending_task(0, &mut levels_handler);
494
495 assert_eq!(1, result.input_levels[0].table_infos.len());
496 assert_eq!(2, result.input_levels[1].table_infos.len());
497 }
498
499 {
500 clean_task_state(&mut levels_handler[1]);
501 clean_task_state(&mut levels_handler[2]);
502
503 let level_table_info = &mut levels.levels[0].table_infos;
505 for table_info in level_table_info {
506 let mut t_inner = table_info.get_inner();
507 t_inner.table_ids.resize(2, 0.into());
508 t_inner.table_ids[0] = 1.into();
509 t_inner.table_ids[1] = 2.into();
510 *table_info = t_inner.into();
511 }
512
513 let option = ManualCompactionOption {
515 sst_ids: vec![],
516 level: 1,
517 key_range: KeyRange {
518 left: Bytes::from(iterator_test_key_of_epoch(1, 101, 1)),
519 right: Bytes::from(iterator_test_key_of_epoch(1, 199, 1)),
520 right_exclusive: false,
521 },
522 internal_table_id: HashSet::from([2.into()]),
523 exclusive: false,
524 };
525
526 let target_level = option.level + 1;
527 let mut picker = ManualCompactionPicker::new(
528 Arc::new(RangeOverlapStrategy::default()),
529 option,
530 target_level,
531 );
532
533 let result = picker
534 .pick_compaction(&levels, &levels_handler, &mut local_stats)
535 .unwrap();
536
537 assert_eq!(1, result.input_levels[0].table_infos.len());
538 assert_eq!(2, result.input_levels[1].table_infos.len());
539 }
540 }
541
542 #[test]
543 fn test_manual_compaction_exclusive_blocked_by_pending() {
544 let (levels, mut levels_handler) = generate_test_levels();
545 let option = ManualCompactionOption {
546 exclusive: true,
547 ..Default::default()
548 };
549 let target_level = option.level + 1;
550 let mut picker = ManualCompactionPicker::new(
551 Arc::new(RangeOverlapStrategy::default()),
552 option,
553 target_level,
554 );
555
556 let pending_sst_id = levels.levels[0].table_infos[0].sst_id;
557 levels_handler[1].test_add_pending_sst(pending_sst_id, 1);
558
559 assert!(
560 picker
561 .pick_compaction(
562 &levels,
563 &levels_handler,
564 &mut LocalPickerStatistic::default()
565 )
566 .is_none()
567 );
568 }
569
570 fn generate_test_levels() -> (Levels, Vec<LevelHandler>) {
571 let mut l0 = generate_l0_overlapping_sublevels(vec![
572 vec![
573 generate_table(5, 1, 0, 500, 2),
574 generate_table(6, 2, 600, 1000, 2),
575 ],
576 vec![
577 generate_table(7, 1, 0, 500, 3),
578 generate_table(8, 2, 600, 1000, 3),
579 ],
580 vec![
581 generate_table(9, 1, 300, 500, 4),
582 generate_table(10, 2, 600, 1000, 4),
583 ],
584 ]);
585 l0.sub_levels[1].level_type = LevelType::Nonoverlapping as _;
587 assert_eq!(l0.sub_levels.len(), 3);
588 let mut levels = vec![
589 Level {
590 level_idx: 1,
591 level_type: LevelType::Nonoverlapping,
592 table_infos: vec![
593 generate_table(3, 1, 0, 100, 1),
594 generate_table(4, 2, 2000, 3000, 1),
595 ],
596 ..Default::default()
597 },
598 Level {
599 level_idx: 2,
600 level_type: LevelType::Nonoverlapping,
601 table_infos: vec![
602 generate_table(1, 1, 0, 100, 1),
603 generate_table(2, 2, 2000, 3000, 1),
604 ],
605 ..Default::default()
606 },
607 ];
608 assert_eq!(levels.len(), 2);
610 for iter in [l0.sub_levels.iter_mut(), levels.iter_mut()] {
611 for (idx, l) in iter.enumerate() {
612 for t in &mut l.table_infos {
613 let mut t_inner = t.get_inner();
614 t_inner.table_ids.clear();
615 if idx == 0 {
616 t_inner
617 .table_ids
618 .push((((t.sst_id.as_raw_id() % 2) + 1) as u32).into());
619 } else {
620 t_inner.table_ids.push(3.into());
621 }
622 *t = t_inner.into();
623 }
624 }
625 }
626 let levels = Levels {
627 levels,
628 l0,
629 ..Default::default()
630 };
631
632 let levels_handler = vec![
633 LevelHandler::new(0),
634 LevelHandler::new(1),
635 LevelHandler::new(2),
636 ];
637 (levels, levels_handler)
638 }
639
640 fn generate_intra_test_levels() -> (Levels, Vec<LevelHandler>) {
641 let l0 = generate_l0_overlapping_sublevels(vec![]);
642 let levels = vec![Level {
643 level_idx: 1,
644 level_type: LevelType::Nonoverlapping,
645 table_infos: vec![
646 generate_table(1, 1, 0, 100, 1),
647 generate_table(2, 2, 100, 200, 1),
648 generate_table(3, 2, 200, 300, 1),
649 generate_table(4, 2, 300, 400, 1),
650 ],
651 ..Default::default()
652 }];
653 let levels = Levels {
654 levels,
655 l0,
656 ..Default::default()
657 };
658
659 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
660 (levels, levels_handler)
661 }
662
663 #[test]
664 fn test_l0_empty() {
665 let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
666 let levels = vec![Level {
667 level_idx: 1,
668 level_type: LevelType::Nonoverlapping,
669 table_infos: vec![],
670 total_file_size: 0,
671 sub_level_id: 0,
672 uncompressed_file_size: 0,
673 ..Default::default()
674 }];
675 let levels = Levels {
676 levels,
677 l0,
678 ..Default::default()
679 };
680 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
681 let option = ManualCompactionOption {
682 sst_ids: vec![1.into()],
683 level: 0,
684 key_range: KeyRange {
685 left: Bytes::default(),
686 right: Bytes::default(),
687 right_exclusive: false,
688 },
689 internal_table_id: HashSet::default(),
690 exclusive: false,
691 };
692 let mut picker =
693 ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 0);
694 assert!(
695 picker
696 .pick_compaction(
697 &levels,
698 &levels_handler,
699 &mut LocalPickerStatistic::default()
700 )
701 .is_none()
702 );
703 }
704
705 #[test]
706 fn test_l0_basic() {
707 let (levels, levels_handler) = generate_test_levels();
708
709 let option = ManualCompactionOption {
711 sst_ids: vec![],
712 level: 0,
713 key_range: KeyRange {
714 left: Bytes::default(),
715 right: Bytes::default(),
716 right_exclusive: false,
717 },
718 internal_table_id: HashSet::default(),
719 exclusive: false,
720 };
721 let mut picker = ManualCompactionPicker::new(
722 Arc::new(RangeOverlapStrategy::default()),
723 option.clone(),
724 0,
725 );
726 let mut local_stats = LocalPickerStatistic::default();
727 assert!(
728 picker
729 .pick_compaction(&levels, &levels_handler, &mut local_stats)
730 .is_none()
731 );
732
733 let mut picker =
735 ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
736 let mut expected = [vec![5, 6], vec![7, 8], vec![9, 10]];
737 expected.reverse();
738 let result = picker
739 .pick_compaction(&levels, &levels_handler, &mut local_stats)
740 .unwrap();
741 assert_eq!(result.input_levels.len(), 4);
742 assert!(is_l0_to_lbase(&result));
743 assert_eq!(result.target_level, 1);
744 for (l, e) in expected.iter().enumerate().take(3) {
745 assert_eq!(
746 result.input_levels[l]
747 .table_infos
748 .iter()
749 .map(|s| s.sst_id)
750 .collect_vec(),
751 *e
752 );
753 }
754 assert_eq!(
755 result.input_levels[3].table_infos,
756 vec![levels.levels[0].table_infos[0].clone()]
757 );
758
759 let option = ManualCompactionOption {
761 sst_ids: vec![],
762 level: 0,
763 key_range: KeyRange {
764 left: Bytes::from(iterator_test_key_of_epoch(1, 0, 2)),
765 right: Bytes::from(iterator_test_key_of_epoch(1, 200, 2)),
766 right_exclusive: false,
767 },
768 internal_table_id: HashSet::default(),
769 exclusive: false,
770 };
771 let mut picker =
772 ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
773 let mut expected = [vec![5, 6], vec![7, 8]];
774 expected.reverse();
775 let result = picker
776 .pick_compaction(&levels, &levels_handler, &mut local_stats)
777 .unwrap();
778 assert_eq!(result.input_levels.len(), 3);
779 assert!(is_l0_to_lbase(&result));
780 assert_eq!(result.target_level, 1);
781 for (l, e) in expected.iter().enumerate().take(2) {
782 assert_eq!(
783 result.input_levels[l]
784 .table_infos
785 .iter()
786 .map(|s| s.sst_id)
787 .collect_vec(),
788 *e
789 );
790 }
791 assert_eq!(
792 result.input_levels[2].table_infos,
793 vec![levels.levels[0].table_infos[0].clone()]
794 );
795 }
796
797 #[test]
798 fn test_l0_to_l0_option_sst_ids() {
799 let (levels, levels_handler) = generate_test_levels();
800 let sst_id_filters = vec![
802 (0, vec![6], vec![vec![5, 6]]),
803 (0, vec![7], vec![vec![7, 8]]),
804 (0, vec![9], vec![vec![9, 10]]),
805 (0, vec![6, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
806 (0, vec![8, 9], vec![vec![7, 8], vec![9, 10]]),
807 (0, vec![6, 8, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
808 ];
809 let mut local_stats = LocalPickerStatistic::default();
810 for (input_level, sst_id_filter, expected) in &sst_id_filters {
811 let expected = expected.iter().rev().cloned().collect_vec();
812 let option = ManualCompactionOption {
813 sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
814 level: *input_level as _,
815 key_range: KeyRange {
816 left: Bytes::default(),
817 right: Bytes::default(),
818 right_exclusive: false,
819 },
820 internal_table_id: HashSet::default(),
821 exclusive: false,
822 };
823 let mut picker = ManualCompactionPicker::new(
824 Arc::new(RangeOverlapStrategy::default()),
825 option.clone(),
826 input_level + 1,
828 );
829 let result = picker
830 .pick_compaction(&levels, &levels_handler, &mut local_stats)
831 .unwrap();
832 assert!(is_l0_to_l0(&result));
833 assert_eq!(result.input_levels.len(), expected.len());
834 for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
835 assert_eq!(
836 result.input_levels[i]
837 .table_infos
838 .iter()
839 .map(|s| s.sst_id)
840 .collect_vec(),
841 *e
842 );
843 }
844 }
845 }
846
847 #[test]
848 fn test_l0_to_lbase_option_internal_table() {
849 let (levels, mut levels_handler) = generate_test_levels();
850 let input_level = 0;
851 let target_level = input_level + 1;
852 let mut local_stats = LocalPickerStatistic::default();
853 {
854 let option = ManualCompactionOption {
855 sst_ids: vec![],
856 level: input_level,
857 key_range: KeyRange {
858 left: Bytes::default(),
859 right: Bytes::default(),
860 right_exclusive: false,
861 },
862 internal_table_id: HashSet::from([100.into()]),
864 exclusive: false,
865 };
866 let mut picker = ManualCompactionPicker::new(
867 Arc::new(RangeOverlapStrategy::default()),
868 option,
869 target_level,
870 );
871 assert!(
872 picker
873 .pick_compaction(&levels, &levels_handler, &mut local_stats)
874 .is_none()
875 )
876 }
877
878 {
879 let option = ManualCompactionOption {
880 sst_ids: vec![],
881 level: input_level,
882 key_range: KeyRange {
883 left: Bytes::default(),
884 right: Bytes::default(),
885 right_exclusive: false,
886 },
887 internal_table_id: HashSet::from([1.into(), 2.into(), 3.into()]),
889 exclusive: false,
890 };
891 let mut picker = ManualCompactionPicker::new(
892 Arc::new(RangeOverlapStrategy::default()),
893 option,
894 target_level,
895 );
896 let result = picker
897 .pick_compaction(&levels, &levels_handler, &mut local_stats)
898 .unwrap();
899 assert_eq!(result.input_levels.len(), 4);
900 assert!(is_l0_to_lbase(&result));
901 assert_eq!(result.target_level, 1);
902 assert!(is_l0_to_lbase(&result));
903 assert_eq!(
904 result
905 .input_levels
906 .iter()
907 .take(3)
908 .flat_map(|s| s.table_infos.clone())
909 .map(|s| s.sst_id)
910 .collect_vec(),
911 vec![9, 10, 7, 8, 5, 6]
912 );
913 assert_eq!(
914 result.input_levels[3]
915 .table_infos
916 .iter()
917 .map(|s| s.sst_id)
918 .collect_vec(),
919 vec![3]
920 );
921 }
922
923 {
924 let option = ManualCompactionOption {
925 sst_ids: vec![],
926 level: input_level,
927 key_range: KeyRange {
928 left: Bytes::default(),
929 right: Bytes::default(),
930 right_exclusive: false,
931 },
932 internal_table_id: HashSet::from([3.into()]),
934 exclusive: false,
935 };
936 let mut picker = ManualCompactionPicker::new(
937 Arc::new(RangeOverlapStrategy::default()),
938 option,
939 target_level,
940 );
941 let result = picker
942 .pick_compaction(&levels, &levels_handler, &mut local_stats)
943 .unwrap();
944 assert_eq!(result.input_levels.len(), 4);
945 assert!(is_l0_to_lbase(&result));
946 assert_eq!(
947 result
948 .input_levels
949 .iter()
950 .take(3)
951 .flat_map(|s| s.table_infos.clone())
952 .map(|s| s.sst_id)
953 .collect_vec(),
954 vec![9, 10, 7, 8, 5, 6]
955 );
956 assert_eq!(
957 result.input_levels[3]
958 .table_infos
959 .iter()
960 .map(|s| s.sst_id)
961 .collect_vec(),
962 vec![3]
963 );
964 assert_eq!(result.target_level, 1);
965 }
966
967 {
968 let option = ManualCompactionOption {
969 sst_ids: vec![],
970 level: input_level,
971 key_range: KeyRange {
972 left: Bytes::default(),
973 right: Bytes::default(),
974 right_exclusive: false,
975 },
976 internal_table_id: HashSet::from([1.into()]),
979 exclusive: false,
980 };
981 let mut picker = ManualCompactionPicker::new(
982 Arc::new(RangeOverlapStrategy::default()),
983 option,
984 target_level,
985 );
986 let result = picker
987 .pick_compaction(&levels, &levels_handler, &mut local_stats)
988 .unwrap();
989 result.add_pending_task(0, &mut levels_handler);
990 assert_eq!(result.input_levels.len(), 2);
991 assert!(is_l0_to_lbase(&result));
992 assert_eq!(result.target_level, 1);
993 assert_eq!(
994 result
995 .input_levels
996 .iter()
997 .take(1)
998 .flat_map(|s| s.table_infos.clone())
999 .map(|s| s.sst_id)
1000 .collect_vec(),
1001 vec![5, 6]
1002 );
1003 assert_eq!(
1004 result.input_levels[1]
1005 .table_infos
1006 .iter()
1007 .map(|s| s.sst_id)
1008 .collect_vec(),
1009 vec![3]
1010 );
1011
1012 let option = ManualCompactionOption {
1014 sst_ids: vec![],
1015 level: input_level,
1016 key_range: KeyRange {
1017 left: Bytes::default(),
1018 right: Bytes::default(),
1019 right_exclusive: false,
1020 },
1021 internal_table_id: HashSet::from([3.into()]),
1023 exclusive: false,
1024 };
1025 let mut picker = ManualCompactionPicker::new(
1026 Arc::new(RangeOverlapStrategy::default()),
1027 option,
1028 target_level,
1029 );
1030 assert!(
1032 picker
1033 .pick_compaction(&levels, &levels_handler, &mut local_stats)
1034 .is_none()
1035 );
1036
1037 clean_task_state(&mut levels_handler[0]);
1038 clean_task_state(&mut levels_handler[1]);
1039 }
1040 }
1041
1042 #[test]
1043 fn test_ln_to_lnext_option_internal_table() {
1044 let (levels, levels_handler) = generate_test_levels();
1045 let input_level = 1;
1046 let target_level = input_level + 1;
1047 let mut local_stats = LocalPickerStatistic::default();
1048 {
1049 let option = ManualCompactionOption {
1050 sst_ids: vec![],
1051 level: input_level,
1052 key_range: KeyRange {
1053 left: Bytes::default(),
1054 right: Bytes::default(),
1055 right_exclusive: false,
1056 },
1057 internal_table_id: HashSet::from([100.into()]),
1059 exclusive: false,
1060 };
1061 let mut picker = ManualCompactionPicker::new(
1062 Arc::new(RangeOverlapStrategy::default()),
1063 option,
1064 target_level,
1065 );
1066 assert!(
1067 picker
1068 .pick_compaction(&levels, &levels_handler, &mut local_stats)
1069 .is_none()
1070 )
1071 }
1072
1073 {
1074 let expected_input_level_sst_ids = [vec![4], vec![2]];
1075 let option = ManualCompactionOption {
1076 sst_ids: vec![],
1077 level: input_level,
1078 key_range: KeyRange {
1079 left: Bytes::default(),
1080 right: Bytes::default(),
1081 right_exclusive: false,
1082 },
1083 internal_table_id: HashSet::from([1.into()]),
1085 exclusive: false,
1086 };
1087 let mut picker = ManualCompactionPicker::new(
1088 Arc::new(RangeOverlapStrategy::default()),
1089 option,
1090 target_level,
1091 );
1092 let result = picker
1093 .pick_compaction(&levels, &levels_handler, &mut local_stats)
1094 .unwrap();
1095 assert_eq!(
1096 result.input_levels.len(),
1097 expected_input_level_sst_ids.len()
1098 );
1099 assert_eq!(result.target_level, target_level);
1100 for (l, e) in expected_input_level_sst_ids
1101 .iter()
1102 .enumerate()
1103 .take(result.input_levels.len())
1104 {
1105 assert_eq!(
1106 result.input_levels[l]
1107 .table_infos
1108 .iter()
1109 .map(|s| s.sst_id)
1110 .collect_vec(),
1111 *e
1112 );
1113 }
1114 }
1115 }
1116
1117 #[test]
1118 fn test_ln_to_lnext_option_sst_ids() {
1119 let (levels, levels_handler) = generate_test_levels();
1120 let sst_id_filters = vec![
1122 (1, vec![3], vec![vec![3], vec![1]]),
1123 (1, vec![4], vec![vec![4], vec![2]]),
1124 (1, vec![3, 4], vec![vec![3, 4], vec![1, 2]]),
1125 ];
1126 let mut local_stats = LocalPickerStatistic::default();
1127 for (input_level, sst_id_filter, expected) in &sst_id_filters {
1128 let option = ManualCompactionOption {
1129 sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
1130 level: *input_level as _,
1131 key_range: KeyRange {
1132 left: Bytes::default(),
1133 right: Bytes::default(),
1134 right_exclusive: false,
1135 },
1136 internal_table_id: HashSet::default(),
1137 exclusive: false,
1138 };
1139 let mut picker = ManualCompactionPicker::new(
1140 Arc::new(RangeOverlapStrategy::default()),
1141 option.clone(),
1142 input_level + 1,
1143 );
1144 let result = picker
1145 .pick_compaction(&levels, &levels_handler, &mut local_stats)
1146 .unwrap();
1147 assert_eq!(result.input_levels.len(), expected.len());
1148 for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1149 assert_eq!(
1150 result.input_levels[i]
1151 .table_infos
1152 .iter()
1153 .map(|s| s.sst_id)
1154 .collect_vec(),
1155 *e
1156 );
1157 }
1158 }
1159 }
1160
1161 #[test]
1162 fn test_ln_to_ln() {
1163 let (levels, levels_handler) = generate_intra_test_levels();
1164 let sst_id_filters = vec![
1166 (1, vec![1], vec![vec![1], vec![]]),
1167 (1, vec![3], vec![vec![3], vec![]]),
1168 (1, vec![4], vec![vec![4], vec![]]),
1169 (1, vec![3, 4], vec![vec![3, 4], vec![]]),
1170 (1, vec![1, 4], vec![vec![1, 2, 3, 4], vec![]]),
1171 (1, vec![2, 4], vec![vec![2, 3, 4], vec![]]),
1172 (1, vec![1, 3], vec![vec![1, 2, 3], vec![]]),
1173 ];
1174 for (input_level, sst_id_filter, expected) in &sst_id_filters {
1175 let option = ManualCompactionOption {
1176 sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
1177 level: *input_level as _,
1178 key_range: KeyRange {
1179 left: Bytes::default(),
1180 right: Bytes::default(),
1181 right_exclusive: false,
1182 },
1183 internal_table_id: HashSet::default(),
1184 exclusive: false,
1185 };
1186 let mut picker = ManualCompactionPicker::new(
1187 Arc::new(RangeOverlapStrategy::default()),
1188 option.clone(),
1189 *input_level as _,
1190 );
1191 let result = picker
1192 .pick_compaction(
1193 &levels,
1194 &levels_handler,
1195 &mut LocalPickerStatistic::default(),
1196 )
1197 .unwrap();
1198 assert_eq!(result.input_levels.len(), expected.len());
1199 for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1200 assert_eq!(
1201 result.input_levels[i]
1202 .table_infos
1203 .iter()
1204 .map(|s| s.sst_id)
1205 .collect_vec(),
1206 *e
1207 );
1208 }
1209 }
1210 }
1211
1212 #[test]
1213 fn test_manual_compaction_selector_l0() {
1214 let config = CompactionConfigBuilder::new().max_level(4).build();
1215 let group_config = CompactionGroup::new(1, config);
1216 let l0 = generate_l0_nonoverlapping_sublevels(vec![
1217 generate_table(0, 1, 0, 500, 1),
1218 generate_table(1, 1, 0, 500, 1),
1219 ]);
1220 assert_eq!(l0.sub_levels.len(), 2);
1221 let levels = vec![
1222 generate_level(1, vec![]),
1223 generate_level(2, vec![]),
1224 generate_level(3, vec![]),
1225 Level {
1226 level_idx: 4,
1227 level_type: LevelType::Nonoverlapping,
1228 table_infos: vec![
1229 generate_table(2, 1, 0, 100, 1),
1230 generate_table(3, 1, 101, 200, 1),
1231 generate_table(4, 1, 222, 300, 1),
1232 ],
1233 ..Default::default()
1234 },
1235 ];
1236 assert_eq!(levels.len(), 4);
1237 let levels = Levels {
1238 levels,
1239 l0,
1240 ..Default::default()
1241 };
1242 let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1243 let mut local_stats = LocalSelectorStatistic::default();
1244
1245 {
1247 let option = ManualCompactionOption {
1248 sst_ids: [0, 1].iter().cloned().map(Into::into).collect(),
1249 key_range: KeyRange {
1250 left: Bytes::default(),
1251 right: Bytes::default(),
1252 right_exclusive: false,
1253 },
1254 internal_table_id: HashSet::default(),
1255 level: 0,
1256 exclusive: false,
1257 };
1258 let mut selector = ManualCompactionSelector::new(option);
1259 let task = selector
1260 .pick_compaction(
1261 1,
1262 compaction_selector_context(
1263 &group_config,
1264 &levels,
1265 &BTreeSet::new(),
1266 &mut levels_handler,
1267 &mut local_stats,
1268 &HashMap::default(),
1269 Arc::new(CompactionDeveloperConfig::default()),
1270 &Default::default(),
1271 &HummockVersionStateTableInfo::empty(),
1272 ),
1273 )
1274 .unwrap();
1275 assert_compaction_task(&task, &levels_handler);
1276 assert_eq!(task.input.input_levels.len(), 2);
1277 assert_eq!(task.input.input_levels[0].level_idx, 0);
1278 assert_eq!(task.input.input_levels[1].level_idx, 0);
1279 assert_eq!(task.input.target_level, 0);
1280 }
1281
1282 for level_handler in &mut levels_handler {
1283 for pending_task_id in &level_handler.pending_tasks_ids() {
1284 level_handler.remove_task(*pending_task_id);
1285 }
1286 }
1287
1288 {
1290 let option = ManualCompactionOption {
1291 sst_ids: vec![],
1292 key_range: KeyRange {
1293 left: Bytes::default(),
1294 right: Bytes::default(),
1295 right_exclusive: false,
1296 },
1297 internal_table_id: HashSet::default(),
1298 level: 0,
1299 exclusive: false,
1300 };
1301 let mut selector = ManualCompactionSelector::new(option);
1302 let task = selector
1303 .pick_compaction(
1304 2,
1305 compaction_selector_context(
1306 &group_config,
1307 &levels,
1308 &BTreeSet::new(),
1309 &mut levels_handler,
1310 &mut local_stats,
1311 &HashMap::default(),
1312 Arc::new(CompactionDeveloperConfig::default()),
1313 &Default::default(),
1314 &HummockVersionStateTableInfo::empty(),
1315 ),
1316 )
1317 .unwrap();
1318 assert_compaction_task(&task, &levels_handler);
1319 assert_eq!(task.input.input_levels.len(), 3);
1320 assert_eq!(task.input.input_levels[0].level_idx, 0);
1321 assert_eq!(task.input.input_levels[1].level_idx, 0);
1322 assert_eq!(task.input.input_levels[2].level_idx, 4);
1323 assert_eq!(task.input.target_level, 4);
1324 }
1325 }
1326
1327 #[test]
1329 fn test_manual_compaction_selector() {
1330 let config = CompactionConfigBuilder::new().max_level(4).build();
1331 let group_config = CompactionGroup::new(1, config);
1332 let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
1333 assert_eq!(l0.sub_levels.len(), 0);
1334 let levels = vec![
1335 generate_level(1, vec![]),
1336 generate_level(2, vec![]),
1337 generate_level(
1338 3,
1339 vec![
1340 generate_table(0, 1, 150, 151, 1),
1341 generate_table(1, 1, 250, 251, 1),
1342 ],
1343 ),
1344 Level {
1345 level_idx: 4,
1346 level_type: LevelType::Nonoverlapping,
1347 table_infos: vec![
1348 generate_table(2, 1, 0, 100, 1),
1349 generate_table(3, 1, 101, 200, 1),
1350 generate_table(4, 1, 222, 300, 1),
1351 generate_table(5, 1, 333, 400, 1),
1352 generate_table(6, 1, 444, 500, 1),
1353 generate_table(7, 1, 555, 600, 1),
1354 ],
1355 ..Default::default()
1356 },
1357 ];
1358 assert_eq!(levels.len(), 4);
1359 let levels = Levels {
1360 levels,
1361 l0,
1362 ..Default::default()
1363 };
1364 let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1365 let mut local_stats = LocalSelectorStatistic::default();
1366
1367 {
1369 let option = ManualCompactionOption {
1370 sst_ids: [0, 1].iter().cloned().map(Into::into).collect(),
1371 key_range: KeyRange {
1372 left: Bytes::default(),
1373 right: Bytes::default(),
1374 right_exclusive: false,
1375 },
1376 internal_table_id: HashSet::default(),
1377 level: 3,
1378 exclusive: false,
1379 };
1380 let mut selector = ManualCompactionSelector::new(option);
1381 let task = selector
1382 .pick_compaction(
1383 1,
1384 compaction_selector_context(
1385 &group_config,
1386 &levels,
1387 &BTreeSet::new(),
1388 &mut levels_handler,
1389 &mut local_stats,
1390 &HashMap::default(),
1391 Arc::new(CompactionDeveloperConfig::default()),
1392 &Default::default(),
1393 &HummockVersionStateTableInfo::empty(),
1394 ),
1395 )
1396 .unwrap();
1397 assert_compaction_task(&task, &levels_handler);
1398 assert_eq!(task.input.input_levels.len(), 2);
1399 assert_eq!(task.input.input_levels[0].level_idx, 3);
1400 assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
1401 assert_eq!(task.input.input_levels[1].level_idx, 4);
1402 assert_eq!(task.input.input_levels[1].table_infos.len(), 2);
1403 assert_eq!(task.input.target_level, 4);
1404 }
1405
1406 for level_handler in &mut levels_handler {
1407 for pending_task_id in &level_handler.pending_tasks_ids() {
1408 level_handler.remove_task(*pending_task_id);
1409 }
1410 }
1411
1412 {
1414 let option = ManualCompactionOption {
1415 sst_ids: vec![],
1416 key_range: KeyRange {
1417 left: Bytes::default(),
1418 right: Bytes::default(),
1419 right_exclusive: false,
1420 },
1421 internal_table_id: HashSet::default(),
1422 level: 4,
1423 exclusive: false,
1424 };
1425 let mut selector = ManualCompactionSelector::new(option);
1426 let task = selector
1427 .pick_compaction(
1428 1,
1429 compaction_selector_context(
1430 &group_config,
1431 &levels,
1432 &BTreeSet::new(),
1433 &mut levels_handler,
1434 &mut local_stats,
1435 &HashMap::default(),
1436 Arc::new(CompactionDeveloperConfig::default()),
1437 &Default::default(),
1438 &HummockVersionStateTableInfo::empty(),
1439 ),
1440 )
1441 .unwrap();
1442 assert_compaction_task(&task, &levels_handler);
1443 assert_eq!(task.input.input_levels.len(), 2);
1444 assert_eq!(task.input.input_levels[0].level_idx, 4);
1445 assert_eq!(task.input.input_levels[0].table_infos.len(), 6);
1446 assert_eq!(task.input.input_levels[1].level_idx, 4);
1447 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
1448 assert_eq!(task.input.target_level, 4);
1449 assert!(matches!(
1450 task.compaction_task_type,
1451 compact_task::TaskType::Manual
1452 ));
1453 }
1454 }
1455}