1use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_hummock_sdk::HummockSstableId;
20use risingwave_hummock_sdk::level::{InputLevel, Level, Levels, OverlappingLevel};
21use risingwave_hummock_sdk::sstable_info::SstableInfo;
22use risingwave_pb::hummock::LevelType;
23
24use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
25use crate::hummock::compaction::overlap_strategy::{
26 OverlapInfo, OverlapStrategy, RangeOverlapInfo,
27};
28use crate::hummock::compaction::selector::ManualCompactionOption;
29use crate::hummock::level_handler::LevelHandler;
30
31pub struct ManualCompactionPicker {
32 overlap_strategy: Arc<dyn OverlapStrategy>,
33 option: ManualCompactionOption,
34 target_level: usize,
35}
36
37impl ManualCompactionPicker {
38 pub fn new(
39 overlap_strategy: Arc<dyn OverlapStrategy>,
40 option: ManualCompactionOption,
41 target_level: usize,
42 ) -> Self {
43 Self {
44 overlap_strategy,
45 option,
46 target_level,
47 }
48 }
49
50 fn pick_l0_to_sub_level(
51 &self,
52 l0: &OverlappingLevel,
53 level_handlers: &[LevelHandler],
54 ) -> Option<CompactionInput> {
55 assert_eq!(self.option.level, 0);
56 let mut input_levels = vec![];
57 let mut sub_level_id = 0;
58 let mut start_idx = None;
59 let mut end_idx = None;
60 for (idx, level) in l0.sub_levels.iter().enumerate() {
63 if !self.filter_level_by_option(level) {
64 continue;
65 }
66 if level_handlers[0].is_level_pending_compact(level) {
67 return None;
68 }
69 if start_idx.is_none() {
71 sub_level_id = level.sub_level_id;
72 start_idx = Some(idx as u64);
73 end_idx = start_idx;
74 } else {
75 end_idx = Some(idx as u64);
76 }
77 }
78 let (start_idx, end_idx) = match (start_idx, end_idx) {
79 (Some(start_idx), Some(end_idx)) => (start_idx, end_idx),
80 _ => {
81 return None;
82 }
83 };
84 for level in l0
86 .sub_levels
87 .iter()
88 .skip(start_idx as usize)
89 .take((end_idx - start_idx + 1) as usize)
90 {
91 input_levels.push(InputLevel {
92 level_idx: 0,
93 level_type: level.level_type,
94 table_infos: level.table_infos.clone(),
95 });
96 }
97 if input_levels.is_empty() {
98 return None;
99 }
100 input_levels.reverse();
101 Some(CompactionInput {
102 input_levels,
103 target_level: 0,
104 target_sub_level_id: sub_level_id,
105 ..Default::default()
106 })
107 }
108
109 fn pick_l0_to_base_level(
110 &self,
111 levels: &Levels,
112 level_handlers: &[LevelHandler],
113 ) -> Option<CompactionInput> {
114 assert!(self.option.level == 0 && self.target_level > 0);
115 for l in 1..self.target_level {
116 assert!(levels.levels[l - 1].table_infos.is_empty());
117 }
118 let l0 = &levels.l0;
119 let mut input_levels = vec![];
120 let mut max_sub_level_idx = usize::MAX;
121 let mut info = self.overlap_strategy.create_overlap_info();
122 for (idx, level) in l0.sub_levels.iter().enumerate() {
124 if !self.filter_level_by_option(level) {
125 continue;
126 }
127 if level_handlers[0].is_level_pending_compact(level) {
128 return None;
129 }
130
131 max_sub_level_idx = idx;
133 }
134 if max_sub_level_idx == usize::MAX {
135 return None;
136 }
137 for idx in 0..=max_sub_level_idx {
139 for table in &l0.sub_levels[idx].table_infos {
140 info.update(&table.key_range);
141 }
142 input_levels.push(InputLevel {
143 level_idx: 0,
144 level_type: l0.sub_levels[idx].level_type,
145 table_infos: l0.sub_levels[idx].table_infos.clone(),
146 })
147 }
148 let target_input_ssts_range =
149 info.check_multiple_overlap(&levels.levels[self.target_level - 1].table_infos);
150 let target_input_ssts = if target_input_ssts_range.is_empty() {
151 vec![]
152 } else {
153 levels.levels[self.target_level - 1].table_infos[target_input_ssts_range].to_vec()
154 };
155 if target_input_ssts
156 .iter()
157 .any(|table| level_handlers[self.target_level].is_pending_compact(&table.sst_id))
158 {
159 return None;
160 }
161 if input_levels.is_empty() {
162 return None;
163 }
164 input_levels.reverse();
165 input_levels.push(InputLevel {
166 level_idx: self.target_level as u32,
167 level_type: LevelType::Nonoverlapping,
168 table_infos: target_input_ssts,
169 });
170
171 Some(CompactionInput {
172 input_levels,
173 target_level: self.target_level,
174 target_sub_level_id: 0,
175 ..Default::default()
176 })
177 }
178
179 fn filter_level_by_option(&self, level: &Level) -> bool {
182 let mut hint_sst_ids: HashSet<HummockSstableId> = HashSet::new();
183 hint_sst_ids.extend(self.option.sst_ids.iter());
184 if self
185 .overlap_strategy
186 .check_overlap_with_range(&self.option.key_range, &level.table_infos)
187 .is_empty()
188 {
189 return false;
190 }
191 if !hint_sst_ids.is_empty()
192 && !level
193 .table_infos
194 .iter()
195 .any(|t| hint_sst_ids.contains(&t.sst_id))
196 {
197 return false;
198 }
199 if !self.option.internal_table_id.is_empty()
200 && !level.table_infos.iter().any(|sst_info| {
201 sst_info
202 .table_ids
203 .iter()
204 .any(|t| self.option.internal_table_id.contains(t))
205 })
206 {
207 return false;
208 }
209 true
210 }
211}
212
213impl CompactionPicker for ManualCompactionPicker {
214 fn pick_compaction(
215 &mut self,
216 levels: &Levels,
217 level_handlers: &[LevelHandler],
218 _stats: &mut LocalPickerStatistic,
219 ) -> Option<CompactionInput> {
220 if self.option.level == 0 {
221 if !self.option.sst_ids.is_empty() {
222 return self.pick_l0_to_sub_level(&levels.l0, level_handlers);
223 } else if self.target_level > 0 {
224 return self.pick_l0_to_base_level(levels, level_handlers);
225 } else {
226 return None;
227 }
228 }
229 let mut hint_sst_ids: HashSet<HummockSstableId> = HashSet::new();
230 hint_sst_ids.extend(self.option.sst_ids.iter());
231 let mut range_overlap_info = RangeOverlapInfo::default();
232 range_overlap_info.update(&self.option.key_range);
233 let level = self.option.level;
234 let target_level = self.target_level;
235 assert!(
236 self.option.level == self.target_level || self.option.level + 1 == self.target_level
237 );
238 let mut select_input_ssts: Vec<SstableInfo> = levels
240 .get_level(self.option.level)
241 .table_infos
242 .iter()
243 .filter(|sst_info| hint_sst_ids.is_empty() || hint_sst_ids.contains(&sst_info.sst_id))
244 .filter(|sst_info| range_overlap_info.check_overlap(sst_info))
245 .filter(|sst_info| {
246 if self.option.internal_table_id.is_empty() {
247 return true;
248 }
249
250 for table_id in &sst_info.table_ids {
252 if self.option.internal_table_id.contains(table_id) {
253 return true;
254 }
255 }
256 false
257 })
258 .cloned()
259 .collect();
260 if select_input_ssts.is_empty() {
261 return None;
262 }
263 let target_input_ssts = if target_level == level {
264 let (left, _) = levels
266 .get_level(level)
267 .table_infos
268 .iter()
269 .find_position(|p| p.sst_id == select_input_ssts.first().unwrap().sst_id)
270 .unwrap();
271 let (right, _) = levels
272 .get_level(level)
273 .table_infos
274 .iter()
275 .find_position(|p| p.sst_id == select_input_ssts.last().unwrap().sst_id)
276 .unwrap();
277 select_input_ssts = levels.get_level(level).table_infos[left..=right].to_vec();
278 vec![]
279 } else {
280 self.overlap_strategy.check_base_level_overlap(
281 &select_input_ssts,
282 &levels.get_level(target_level).table_infos,
283 )
284 };
285 if select_input_ssts
286 .iter()
287 .any(|table| level_handlers[level].is_pending_compact(&table.sst_id))
288 {
289 return None;
290 }
291 if target_input_ssts
292 .iter()
293 .any(|table| level_handlers[target_level].is_pending_compact(&table.sst_id))
294 {
295 return None;
296 }
297
298 Some(CompactionInput {
299 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
300 target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
301 total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
302 input_levels: vec![
303 InputLevel {
304 level_idx: level as u32,
305 level_type: levels.levels[level - 1].level_type,
306 table_infos: select_input_ssts,
307 },
308 InputLevel {
309 level_idx: target_level as u32,
310 level_type: levels.levels[target_level - 1].level_type,
311 table_infos: target_input_ssts,
312 },
313 ],
314 target_level,
315 ..Default::default()
316 })
317 }
318}
319
320#[cfg(test)]
321pub mod tests {
322 use std::collections::{BTreeSet, HashMap};
323
324 use bytes::Bytes;
325 use risingwave_hummock_sdk::key_range::KeyRange;
326 use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
327 use risingwave_pb::hummock::compact_task;
328
329 use super::*;
330 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
331 use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
332 use crate::hummock::compaction::selector::tests::{
333 assert_compaction_task, generate_l0_nonoverlapping_sublevels,
334 generate_l0_overlapping_sublevels, generate_level, generate_table,
335 };
336 use crate::hummock::compaction::selector::{CompactionSelector, ManualCompactionSelector};
337 use crate::hummock::compaction::{CompactionDeveloperConfig, LocalSelectorStatistic};
338 use crate::hummock::model::CompactionGroup;
339 use crate::hummock::test_utils::{compaction_selector_context, iterator_test_key_of_epoch};
340
341 fn clean_task_state(level_handler: &mut LevelHandler) {
342 for pending_task_id in &level_handler.pending_tasks_ids() {
343 level_handler.remove_task(*pending_task_id);
344 }
345 }
346
347 fn is_l0_to_lbase(compaction_input: &CompactionInput) -> bool {
348 compaction_input
349 .input_levels
350 .iter()
351 .take(compaction_input.input_levels.len() - 1)
352 .all(|i| i.level_idx == 0)
353 && compaction_input
354 .input_levels
355 .iter()
356 .last()
357 .unwrap()
358 .level_idx as usize
359 == compaction_input.target_level
360 && compaction_input.target_level > 0
361 }
362
363 fn is_l0_to_l0(compaction_input: &CompactionInput) -> bool {
364 compaction_input
365 .input_levels
366 .iter()
367 .all(|i| i.level_idx == 0)
368 && compaction_input.target_level == 0
369 }
370
371 #[test]
372 fn test_manual_compaction_picker() {
373 let levels = vec![
374 Level {
375 level_idx: 1,
376 level_type: LevelType::Nonoverlapping,
377 table_infos: vec![
378 generate_table(0, 1, 0, 100, 1),
379 generate_table(1, 1, 101, 200, 1),
380 generate_table(2, 1, 222, 300, 1),
381 ],
382 ..Default::default()
383 },
384 Level {
385 level_idx: 2,
386 level_type: LevelType::Nonoverlapping,
387 table_infos: vec![
388 generate_table(4, 1, 0, 100, 1),
389 generate_table(5, 1, 101, 150, 1),
390 generate_table(6, 1, 151, 201, 1),
391 generate_table(7, 1, 501, 800, 1),
392 generate_table(8, 2, 301, 400, 1),
393 ],
394 ..Default::default()
395 },
396 ];
397 let mut levels = Levels {
398 levels,
399 l0: generate_l0_nonoverlapping_sublevels(vec![]),
400 ..Default::default()
401 };
402 let mut levels_handler = vec![
403 LevelHandler::new(0),
404 LevelHandler::new(1),
405 LevelHandler::new(2),
406 ];
407 let mut local_stats = LocalPickerStatistic::default();
408
409 {
410 let option = ManualCompactionOption {
412 level: 1,
413 key_range: KeyRange {
414 left: Bytes::from(iterator_test_key_of_epoch(1, 0, 1)),
415 right: Bytes::from(iterator_test_key_of_epoch(1, 201, 1)),
416 right_exclusive: false,
417 },
418 ..Default::default()
419 };
420
421 let target_level = option.level + 1;
422 let mut picker = ManualCompactionPicker::new(
423 Arc::new(RangeOverlapStrategy::default()),
424 option,
425 target_level,
426 );
427 let result = picker
428 .pick_compaction(&levels, &levels_handler, &mut local_stats)
429 .unwrap();
430 result.add_pending_task(0, &mut levels_handler);
431
432 assert_eq!(2, result.input_levels[0].table_infos.len());
433 assert_eq!(3, result.input_levels[1].table_infos.len());
434 }
435
436 {
437 clean_task_state(&mut levels_handler[1]);
438 clean_task_state(&mut levels_handler[2]);
439
440 let option = ManualCompactionOption::default();
442 let target_level = option.level + 1;
443 let mut picker = ManualCompactionPicker::new(
444 Arc::new(RangeOverlapStrategy::default()),
445 option,
446 target_level,
447 );
448 let result = picker
449 .pick_compaction(&levels, &levels_handler, &mut local_stats)
450 .unwrap();
451 result.add_pending_task(0, &mut levels_handler);
452
453 assert_eq!(3, result.input_levels[0].table_infos.len());
454 assert_eq!(3, result.input_levels[1].table_infos.len());
455 }
456
457 {
458 clean_task_state(&mut levels_handler[1]);
459 clean_task_state(&mut levels_handler[2]);
460
461 let level_table_info = &mut levels.levels[0].table_infos;
462 let table_info_1 = &mut level_table_info[1];
463 let mut t_inner = table_info_1.get_inner();
464 t_inner.table_ids.resize(2, 0.into());
465 t_inner.table_ids[0] = 1.into();
466 t_inner.table_ids[1] = 2.into();
467 *table_info_1 = t_inner.into();
468
469 let option = ManualCompactionOption {
471 level: 1,
472 internal_table_id: HashSet::from([2.into()]),
473 ..Default::default()
474 };
475
476 let target_level = option.level + 1;
477 let mut picker = ManualCompactionPicker::new(
478 Arc::new(RangeOverlapStrategy::default()),
479 option,
480 target_level,
481 );
482
483 let result = picker
484 .pick_compaction(&levels, &levels_handler, &mut local_stats)
485 .unwrap();
486 result.add_pending_task(0, &mut levels_handler);
487
488 assert_eq!(1, result.input_levels[0].table_infos.len());
489 assert_eq!(2, result.input_levels[1].table_infos.len());
490 }
491
492 {
493 clean_task_state(&mut levels_handler[1]);
494 clean_task_state(&mut levels_handler[2]);
495
496 let level_table_info = &mut levels.levels[0].table_infos;
498 for table_info in level_table_info {
499 let mut t_inner = table_info.get_inner();
500 t_inner.table_ids.resize(2, 0.into());
501 t_inner.table_ids[0] = 1.into();
502 t_inner.table_ids[1] = 2.into();
503 *table_info = t_inner.into();
504 }
505
506 let option = ManualCompactionOption {
508 sst_ids: vec![],
509 level: 1,
510 key_range: KeyRange {
511 left: Bytes::from(iterator_test_key_of_epoch(1, 101, 1)),
512 right: Bytes::from(iterator_test_key_of_epoch(1, 199, 1)),
513 right_exclusive: false,
514 },
515 internal_table_id: HashSet::from([2.into()]),
516 };
517
518 let target_level = option.level + 1;
519 let mut picker = ManualCompactionPicker::new(
520 Arc::new(RangeOverlapStrategy::default()),
521 option,
522 target_level,
523 );
524
525 let result = picker
526 .pick_compaction(&levels, &levels_handler, &mut local_stats)
527 .unwrap();
528
529 assert_eq!(1, result.input_levels[0].table_infos.len());
530 assert_eq!(2, result.input_levels[1].table_infos.len());
531 }
532 }
533
534 fn generate_test_levels() -> (Levels, Vec<LevelHandler>) {
535 let mut l0 = generate_l0_overlapping_sublevels(vec![
536 vec![
537 generate_table(5, 1, 0, 500, 2),
538 generate_table(6, 2, 600, 1000, 2),
539 ],
540 vec![
541 generate_table(7, 1, 0, 500, 3),
542 generate_table(8, 2, 600, 1000, 3),
543 ],
544 vec![
545 generate_table(9, 1, 300, 500, 4),
546 generate_table(10, 2, 600, 1000, 4),
547 ],
548 ]);
549 l0.sub_levels[1].level_type = LevelType::Nonoverlapping as _;
551 assert_eq!(l0.sub_levels.len(), 3);
552 let mut levels = vec![
553 Level {
554 level_idx: 1,
555 level_type: LevelType::Nonoverlapping,
556 table_infos: vec![
557 generate_table(3, 1, 0, 100, 1),
558 generate_table(4, 2, 2000, 3000, 1),
559 ],
560 ..Default::default()
561 },
562 Level {
563 level_idx: 2,
564 level_type: LevelType::Nonoverlapping,
565 table_infos: vec![
566 generate_table(1, 1, 0, 100, 1),
567 generate_table(2, 2, 2000, 3000, 1),
568 ],
569 ..Default::default()
570 },
571 ];
572 assert_eq!(levels.len(), 2);
574 for iter in [l0.sub_levels.iter_mut(), levels.iter_mut()] {
575 for (idx, l) in iter.enumerate() {
576 for t in &mut l.table_infos {
577 let mut t_inner = t.get_inner();
578 t_inner.table_ids.clear();
579 if idx == 0 {
580 t_inner
581 .table_ids
582 .push((((t.sst_id.inner() % 2) + 1) as u32).into());
583 } else {
584 t_inner.table_ids.push(3.into());
585 }
586 *t = t_inner.into();
587 }
588 }
589 }
590 let levels = Levels {
591 levels,
592 l0,
593 ..Default::default()
594 };
595
596 let levels_handler = vec![
597 LevelHandler::new(0),
598 LevelHandler::new(1),
599 LevelHandler::new(2),
600 ];
601 (levels, levels_handler)
602 }
603
604 fn generate_intra_test_levels() -> (Levels, Vec<LevelHandler>) {
605 let l0 = generate_l0_overlapping_sublevels(vec![]);
606 let levels = vec![Level {
607 level_idx: 1,
608 level_type: LevelType::Nonoverlapping,
609 table_infos: vec![
610 generate_table(1, 1, 0, 100, 1),
611 generate_table(2, 2, 100, 200, 1),
612 generate_table(3, 2, 200, 300, 1),
613 generate_table(4, 2, 300, 400, 1),
614 ],
615 ..Default::default()
616 }];
617 let levels = Levels {
618 levels,
619 l0,
620 ..Default::default()
621 };
622
623 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
624 (levels, levels_handler)
625 }
626
627 #[test]
628 fn test_l0_empty() {
629 let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
630 let levels = vec![Level {
631 level_idx: 1,
632 level_type: LevelType::Nonoverlapping,
633 table_infos: vec![],
634 total_file_size: 0,
635 sub_level_id: 0,
636 uncompressed_file_size: 0,
637 ..Default::default()
638 }];
639 let levels = Levels {
640 levels,
641 l0,
642 ..Default::default()
643 };
644 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
645 let option = ManualCompactionOption {
646 sst_ids: vec![1.into()],
647 level: 0,
648 key_range: KeyRange {
649 left: Bytes::default(),
650 right: Bytes::default(),
651 right_exclusive: false,
652 },
653 internal_table_id: HashSet::default(),
654 };
655 let mut picker =
656 ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 0);
657 assert!(
658 picker
659 .pick_compaction(
660 &levels,
661 &levels_handler,
662 &mut LocalPickerStatistic::default()
663 )
664 .is_none()
665 );
666 }
667
668 #[test]
669 fn test_l0_basic() {
670 let (levels, levels_handler) = generate_test_levels();
671
672 let option = ManualCompactionOption {
674 sst_ids: vec![],
675 level: 0,
676 key_range: KeyRange {
677 left: Bytes::default(),
678 right: Bytes::default(),
679 right_exclusive: false,
680 },
681 internal_table_id: HashSet::default(),
682 };
683 let mut picker = ManualCompactionPicker::new(
684 Arc::new(RangeOverlapStrategy::default()),
685 option.clone(),
686 0,
687 );
688 let mut local_stats = LocalPickerStatistic::default();
689 assert!(
690 picker
691 .pick_compaction(&levels, &levels_handler, &mut local_stats)
692 .is_none()
693 );
694
695 let mut picker =
697 ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
698 let mut expected = [vec![5, 6], vec![7, 8], vec![9, 10]];
699 expected.reverse();
700 let result = picker
701 .pick_compaction(&levels, &levels_handler, &mut local_stats)
702 .unwrap();
703 assert_eq!(result.input_levels.len(), 4);
704 assert!(is_l0_to_lbase(&result));
705 assert_eq!(result.target_level, 1);
706 for (l, e) in expected.iter().enumerate().take(3) {
707 assert_eq!(
708 result.input_levels[l]
709 .table_infos
710 .iter()
711 .map(|s| s.sst_id)
712 .collect_vec(),
713 *e
714 );
715 }
716 assert_eq!(
717 result.input_levels[3].table_infos,
718 vec![levels.levels[0].table_infos[0].clone()]
719 );
720
721 let option = ManualCompactionOption {
723 sst_ids: vec![],
724 level: 0,
725 key_range: KeyRange {
726 left: Bytes::from(iterator_test_key_of_epoch(1, 0, 2)),
727 right: Bytes::from(iterator_test_key_of_epoch(1, 200, 2)),
728 right_exclusive: false,
729 },
730 internal_table_id: HashSet::default(),
731 };
732 let mut picker =
733 ManualCompactionPicker::new(Arc::new(RangeOverlapStrategy::default()), option, 1);
734 let mut expected = [vec![5, 6], vec![7, 8]];
735 expected.reverse();
736 let result = picker
737 .pick_compaction(&levels, &levels_handler, &mut local_stats)
738 .unwrap();
739 assert_eq!(result.input_levels.len(), 3);
740 assert!(is_l0_to_lbase(&result));
741 assert_eq!(result.target_level, 1);
742 for (l, e) in expected.iter().enumerate().take(2) {
743 assert_eq!(
744 result.input_levels[l]
745 .table_infos
746 .iter()
747 .map(|s| s.sst_id)
748 .collect_vec(),
749 *e
750 );
751 }
752 assert_eq!(
753 result.input_levels[2].table_infos,
754 vec![levels.levels[0].table_infos[0].clone()]
755 );
756 }
757
758 #[test]
759 fn test_l0_to_l0_option_sst_ids() {
760 let (levels, levels_handler) = generate_test_levels();
761 let sst_id_filters = vec![
763 (0, vec![6], vec![vec![5, 6]]),
764 (0, vec![7], vec![vec![7, 8]]),
765 (0, vec![9], vec![vec![9, 10]]),
766 (0, vec![6, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
767 (0, vec![8, 9], vec![vec![7, 8], vec![9, 10]]),
768 (0, vec![6, 8, 9], vec![vec![5, 6], vec![7, 8], vec![9, 10]]),
769 ];
770 let mut local_stats = LocalPickerStatistic::default();
771 for (input_level, sst_id_filter, expected) in &sst_id_filters {
772 let expected = expected.iter().rev().cloned().collect_vec();
773 let option = ManualCompactionOption {
774 sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
775 level: *input_level as _,
776 key_range: KeyRange {
777 left: Bytes::default(),
778 right: Bytes::default(),
779 right_exclusive: false,
780 },
781 internal_table_id: HashSet::default(),
782 };
783 let mut picker = ManualCompactionPicker::new(
784 Arc::new(RangeOverlapStrategy::default()),
785 option.clone(),
786 input_level + 1,
788 );
789 let result = picker
790 .pick_compaction(&levels, &levels_handler, &mut local_stats)
791 .unwrap();
792 assert!(is_l0_to_l0(&result));
793 assert_eq!(result.input_levels.len(), expected.len());
794 for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
795 assert_eq!(
796 result.input_levels[i]
797 .table_infos
798 .iter()
799 .map(|s| s.sst_id)
800 .collect_vec(),
801 *e
802 );
803 }
804 }
805 }
806
807 #[test]
808 fn test_l0_to_lbase_option_internal_table() {
809 let (levels, mut levels_handler) = generate_test_levels();
810 let input_level = 0;
811 let target_level = input_level + 1;
812 let mut local_stats = LocalPickerStatistic::default();
813 {
814 let option = ManualCompactionOption {
815 sst_ids: vec![],
816 level: input_level,
817 key_range: KeyRange {
818 left: Bytes::default(),
819 right: Bytes::default(),
820 right_exclusive: false,
821 },
822 internal_table_id: HashSet::from([100.into()]),
824 };
825 let mut picker = ManualCompactionPicker::new(
826 Arc::new(RangeOverlapStrategy::default()),
827 option,
828 target_level,
829 );
830 assert!(
831 picker
832 .pick_compaction(&levels, &levels_handler, &mut local_stats)
833 .is_none()
834 )
835 }
836
837 {
838 let option = ManualCompactionOption {
839 sst_ids: vec![],
840 level: input_level,
841 key_range: KeyRange {
842 left: Bytes::default(),
843 right: Bytes::default(),
844 right_exclusive: false,
845 },
846 internal_table_id: HashSet::from([1.into(), 2.into(), 3.into()]),
848 };
849 let mut picker = ManualCompactionPicker::new(
850 Arc::new(RangeOverlapStrategy::default()),
851 option,
852 target_level,
853 );
854 let result = picker
855 .pick_compaction(&levels, &levels_handler, &mut local_stats)
856 .unwrap();
857 assert_eq!(result.input_levels.len(), 4);
858 assert!(is_l0_to_lbase(&result));
859 assert_eq!(result.target_level, 1);
860 assert!(is_l0_to_lbase(&result));
861 assert_eq!(
862 result
863 .input_levels
864 .iter()
865 .take(3)
866 .flat_map(|s| s.table_infos.clone())
867 .map(|s| s.sst_id)
868 .collect_vec(),
869 vec![9, 10, 7, 8, 5, 6]
870 );
871 assert_eq!(
872 result.input_levels[3]
873 .table_infos
874 .iter()
875 .map(|s| s.sst_id)
876 .collect_vec(),
877 vec![3]
878 );
879 }
880
881 {
882 let option = ManualCompactionOption {
883 sst_ids: vec![],
884 level: input_level,
885 key_range: KeyRange {
886 left: Bytes::default(),
887 right: Bytes::default(),
888 right_exclusive: false,
889 },
890 internal_table_id: HashSet::from([3.into()]),
892 };
893 let mut picker = ManualCompactionPicker::new(
894 Arc::new(RangeOverlapStrategy::default()),
895 option,
896 target_level,
897 );
898 let result = picker
899 .pick_compaction(&levels, &levels_handler, &mut local_stats)
900 .unwrap();
901 assert_eq!(result.input_levels.len(), 4);
902 assert!(is_l0_to_lbase(&result));
903 assert_eq!(
904 result
905 .input_levels
906 .iter()
907 .take(3)
908 .flat_map(|s| s.table_infos.clone())
909 .map(|s| s.sst_id)
910 .collect_vec(),
911 vec![9, 10, 7, 8, 5, 6]
912 );
913 assert_eq!(
914 result.input_levels[3]
915 .table_infos
916 .iter()
917 .map(|s| s.sst_id)
918 .collect_vec(),
919 vec![3]
920 );
921 assert_eq!(result.target_level, 1);
922 }
923
924 {
925 let option = ManualCompactionOption {
926 sst_ids: vec![],
927 level: input_level,
928 key_range: KeyRange {
929 left: Bytes::default(),
930 right: Bytes::default(),
931 right_exclusive: false,
932 },
933 internal_table_id: HashSet::from([1.into()]),
936 };
937 let mut picker = ManualCompactionPicker::new(
938 Arc::new(RangeOverlapStrategy::default()),
939 option,
940 target_level,
941 );
942 let result = picker
943 .pick_compaction(&levels, &levels_handler, &mut local_stats)
944 .unwrap();
945 result.add_pending_task(0, &mut levels_handler);
946 assert_eq!(result.input_levels.len(), 2);
947 assert!(is_l0_to_lbase(&result));
948 assert_eq!(result.target_level, 1);
949 assert_eq!(
950 result
951 .input_levels
952 .iter()
953 .take(1)
954 .flat_map(|s| s.table_infos.clone())
955 .map(|s| s.sst_id)
956 .collect_vec(),
957 vec![5, 6]
958 );
959 assert_eq!(
960 result.input_levels[1]
961 .table_infos
962 .iter()
963 .map(|s| s.sst_id)
964 .collect_vec(),
965 vec![3]
966 );
967
968 let option = ManualCompactionOption {
970 sst_ids: vec![],
971 level: input_level,
972 key_range: KeyRange {
973 left: Bytes::default(),
974 right: Bytes::default(),
975 right_exclusive: false,
976 },
977 internal_table_id: HashSet::from([3.into()]),
979 };
980 let mut picker = ManualCompactionPicker::new(
981 Arc::new(RangeOverlapStrategy::default()),
982 option,
983 target_level,
984 );
985 assert!(
987 picker
988 .pick_compaction(&levels, &levels_handler, &mut local_stats)
989 .is_none()
990 );
991
992 clean_task_state(&mut levels_handler[0]);
993 clean_task_state(&mut levels_handler[1]);
994 }
995 }
996
997 #[test]
998 fn test_ln_to_lnext_option_internal_table() {
999 let (levels, levels_handler) = generate_test_levels();
1000 let input_level = 1;
1001 let target_level = input_level + 1;
1002 let mut local_stats = LocalPickerStatistic::default();
1003 {
1004 let option = ManualCompactionOption {
1005 sst_ids: vec![],
1006 level: input_level,
1007 key_range: KeyRange {
1008 left: Bytes::default(),
1009 right: Bytes::default(),
1010 right_exclusive: false,
1011 },
1012 internal_table_id: HashSet::from([100.into()]),
1014 };
1015 let mut picker = ManualCompactionPicker::new(
1016 Arc::new(RangeOverlapStrategy::default()),
1017 option,
1018 target_level,
1019 );
1020 assert!(
1021 picker
1022 .pick_compaction(&levels, &levels_handler, &mut local_stats)
1023 .is_none()
1024 )
1025 }
1026
1027 {
1028 let expected_input_level_sst_ids = [vec![4], vec![2]];
1029 let option = ManualCompactionOption {
1030 sst_ids: vec![],
1031 level: input_level,
1032 key_range: KeyRange {
1033 left: Bytes::default(),
1034 right: Bytes::default(),
1035 right_exclusive: false,
1036 },
1037 internal_table_id: HashSet::from([1.into()]),
1039 };
1040 let mut picker = ManualCompactionPicker::new(
1041 Arc::new(RangeOverlapStrategy::default()),
1042 option,
1043 target_level,
1044 );
1045 let result = picker
1046 .pick_compaction(&levels, &levels_handler, &mut local_stats)
1047 .unwrap();
1048 assert_eq!(
1049 result.input_levels.len(),
1050 expected_input_level_sst_ids.len()
1051 );
1052 assert_eq!(result.target_level, target_level);
1053 for (l, e) in expected_input_level_sst_ids
1054 .iter()
1055 .enumerate()
1056 .take(result.input_levels.len())
1057 {
1058 assert_eq!(
1059 result.input_levels[l]
1060 .table_infos
1061 .iter()
1062 .map(|s| s.sst_id)
1063 .collect_vec(),
1064 *e
1065 );
1066 }
1067 }
1068 }
1069
1070 #[test]
1071 fn test_ln_to_lnext_option_sst_ids() {
1072 let (levels, levels_handler) = generate_test_levels();
1073 let sst_id_filters = vec![
1075 (1, vec![3], vec![vec![3], vec![1]]),
1076 (1, vec![4], vec![vec![4], vec![2]]),
1077 (1, vec![3, 4], vec![vec![3, 4], vec![1, 2]]),
1078 ];
1079 let mut local_stats = LocalPickerStatistic::default();
1080 for (input_level, sst_id_filter, expected) in &sst_id_filters {
1081 let option = ManualCompactionOption {
1082 sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
1083 level: *input_level as _,
1084 key_range: KeyRange {
1085 left: Bytes::default(),
1086 right: Bytes::default(),
1087 right_exclusive: false,
1088 },
1089 internal_table_id: HashSet::default(),
1090 };
1091 let mut picker = ManualCompactionPicker::new(
1092 Arc::new(RangeOverlapStrategy::default()),
1093 option.clone(),
1094 input_level + 1,
1095 );
1096 let result = picker
1097 .pick_compaction(&levels, &levels_handler, &mut local_stats)
1098 .unwrap();
1099 assert_eq!(result.input_levels.len(), expected.len());
1100 for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1101 assert_eq!(
1102 result.input_levels[i]
1103 .table_infos
1104 .iter()
1105 .map(|s| s.sst_id)
1106 .collect_vec(),
1107 *e
1108 );
1109 }
1110 }
1111 }
1112
1113 #[test]
1114 fn test_ln_to_ln() {
1115 let (levels, levels_handler) = generate_intra_test_levels();
1116 let sst_id_filters = vec![
1118 (1, vec![1], vec![vec![1], vec![]]),
1119 (1, vec![3], vec![vec![3], vec![]]),
1120 (1, vec![4], vec![vec![4], vec![]]),
1121 (1, vec![3, 4], vec![vec![3, 4], vec![]]),
1122 (1, vec![1, 4], vec![vec![1, 2, 3, 4], vec![]]),
1123 (1, vec![2, 4], vec![vec![2, 3, 4], vec![]]),
1124 (1, vec![1, 3], vec![vec![1, 2, 3], vec![]]),
1125 ];
1126 for (input_level, sst_id_filter, expected) in &sst_id_filters {
1127 let option = ManualCompactionOption {
1128 sst_ids: sst_id_filter.iter().cloned().map(Into::into).collect(),
1129 level: *input_level as _,
1130 key_range: KeyRange {
1131 left: Bytes::default(),
1132 right: Bytes::default(),
1133 right_exclusive: false,
1134 },
1135 internal_table_id: HashSet::default(),
1136 };
1137 let mut picker = ManualCompactionPicker::new(
1138 Arc::new(RangeOverlapStrategy::default()),
1139 option.clone(),
1140 *input_level as _,
1141 );
1142 let result = picker
1143 .pick_compaction(
1144 &levels,
1145 &levels_handler,
1146 &mut LocalPickerStatistic::default(),
1147 )
1148 .unwrap();
1149 assert_eq!(result.input_levels.len(), expected.len());
1150 for (i, e) in expected.iter().enumerate().take(result.input_levels.len()) {
1151 assert_eq!(
1152 result.input_levels[i]
1153 .table_infos
1154 .iter()
1155 .map(|s| s.sst_id)
1156 .collect_vec(),
1157 *e
1158 );
1159 }
1160 }
1161 }
1162
1163 #[test]
1164 fn test_manual_compaction_selector_l0() {
1165 let config = CompactionConfigBuilder::new().max_level(4).build();
1166 let group_config = CompactionGroup::new(1, config);
1167 let l0 = generate_l0_nonoverlapping_sublevels(vec![
1168 generate_table(0, 1, 0, 500, 1),
1169 generate_table(1, 1, 0, 500, 1),
1170 ]);
1171 assert_eq!(l0.sub_levels.len(), 2);
1172 let levels = vec![
1173 generate_level(1, vec![]),
1174 generate_level(2, vec![]),
1175 generate_level(3, vec![]),
1176 Level {
1177 level_idx: 4,
1178 level_type: LevelType::Nonoverlapping,
1179 table_infos: vec![
1180 generate_table(2, 1, 0, 100, 1),
1181 generate_table(3, 1, 101, 200, 1),
1182 generate_table(4, 1, 222, 300, 1),
1183 ],
1184 ..Default::default()
1185 },
1186 ];
1187 assert_eq!(levels.len(), 4);
1188 let levels = Levels {
1189 levels,
1190 l0,
1191 ..Default::default()
1192 };
1193 let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1194 let mut local_stats = LocalSelectorStatistic::default();
1195
1196 {
1198 let option = ManualCompactionOption {
1199 sst_ids: [0, 1].iter().cloned().map(Into::into).collect(),
1200 key_range: KeyRange {
1201 left: Bytes::default(),
1202 right: Bytes::default(),
1203 right_exclusive: false,
1204 },
1205 internal_table_id: HashSet::default(),
1206 level: 0,
1207 };
1208 let mut selector = ManualCompactionSelector::new(option);
1209 let task = selector
1210 .pick_compaction(
1211 1,
1212 compaction_selector_context(
1213 &group_config,
1214 &levels,
1215 &BTreeSet::new(),
1216 &mut levels_handler,
1217 &mut local_stats,
1218 &HashMap::default(),
1219 Arc::new(CompactionDeveloperConfig::default()),
1220 &Default::default(),
1221 &HummockVersionStateTableInfo::empty(),
1222 ),
1223 )
1224 .unwrap();
1225 assert_compaction_task(&task, &levels_handler);
1226 assert_eq!(task.input.input_levels.len(), 2);
1227 assert_eq!(task.input.input_levels[0].level_idx, 0);
1228 assert_eq!(task.input.input_levels[1].level_idx, 0);
1229 assert_eq!(task.input.target_level, 0);
1230 }
1231
1232 for level_handler in &mut levels_handler {
1233 for pending_task_id in &level_handler.pending_tasks_ids() {
1234 level_handler.remove_task(*pending_task_id);
1235 }
1236 }
1237
1238 {
1240 let option = ManualCompactionOption {
1241 sst_ids: vec![],
1242 key_range: KeyRange {
1243 left: Bytes::default(),
1244 right: Bytes::default(),
1245 right_exclusive: false,
1246 },
1247 internal_table_id: HashSet::default(),
1248 level: 0,
1249 };
1250 let mut selector = ManualCompactionSelector::new(option);
1251 let task = selector
1252 .pick_compaction(
1253 2,
1254 compaction_selector_context(
1255 &group_config,
1256 &levels,
1257 &BTreeSet::new(),
1258 &mut levels_handler,
1259 &mut local_stats,
1260 &HashMap::default(),
1261 Arc::new(CompactionDeveloperConfig::default()),
1262 &Default::default(),
1263 &HummockVersionStateTableInfo::empty(),
1264 ),
1265 )
1266 .unwrap();
1267 assert_compaction_task(&task, &levels_handler);
1268 assert_eq!(task.input.input_levels.len(), 3);
1269 assert_eq!(task.input.input_levels[0].level_idx, 0);
1270 assert_eq!(task.input.input_levels[1].level_idx, 0);
1271 assert_eq!(task.input.input_levels[2].level_idx, 4);
1272 assert_eq!(task.input.target_level, 4);
1273 }
1274 }
1275
1276 #[test]
1278 fn test_manual_compaction_selector() {
1279 let config = CompactionConfigBuilder::new().max_level(4).build();
1280 let group_config = CompactionGroup::new(1, config);
1281 let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
1282 assert_eq!(l0.sub_levels.len(), 0);
1283 let levels = vec![
1284 generate_level(1, vec![]),
1285 generate_level(2, vec![]),
1286 generate_level(
1287 3,
1288 vec![
1289 generate_table(0, 1, 150, 151, 1),
1290 generate_table(1, 1, 250, 251, 1),
1291 ],
1292 ),
1293 Level {
1294 level_idx: 4,
1295 level_type: LevelType::Nonoverlapping,
1296 table_infos: vec![
1297 generate_table(2, 1, 0, 100, 1),
1298 generate_table(3, 1, 101, 200, 1),
1299 generate_table(4, 1, 222, 300, 1),
1300 generate_table(5, 1, 333, 400, 1),
1301 generate_table(6, 1, 444, 500, 1),
1302 generate_table(7, 1, 555, 600, 1),
1303 ],
1304 ..Default::default()
1305 },
1306 ];
1307 assert_eq!(levels.len(), 4);
1308 let levels = Levels {
1309 levels,
1310 l0,
1311 ..Default::default()
1312 };
1313 let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
1314 let mut local_stats = LocalSelectorStatistic::default();
1315
1316 {
1318 let option = ManualCompactionOption {
1319 sst_ids: [0, 1].iter().cloned().map(Into::into).collect(),
1320 key_range: KeyRange {
1321 left: Bytes::default(),
1322 right: Bytes::default(),
1323 right_exclusive: false,
1324 },
1325 internal_table_id: HashSet::default(),
1326 level: 3,
1327 };
1328 let mut selector = ManualCompactionSelector::new(option);
1329 let task = selector
1330 .pick_compaction(
1331 1,
1332 compaction_selector_context(
1333 &group_config,
1334 &levels,
1335 &BTreeSet::new(),
1336 &mut levels_handler,
1337 &mut local_stats,
1338 &HashMap::default(),
1339 Arc::new(CompactionDeveloperConfig::default()),
1340 &Default::default(),
1341 &HummockVersionStateTableInfo::empty(),
1342 ),
1343 )
1344 .unwrap();
1345 assert_compaction_task(&task, &levels_handler);
1346 assert_eq!(task.input.input_levels.len(), 2);
1347 assert_eq!(task.input.input_levels[0].level_idx, 3);
1348 assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
1349 assert_eq!(task.input.input_levels[1].level_idx, 4);
1350 assert_eq!(task.input.input_levels[1].table_infos.len(), 2);
1351 assert_eq!(task.input.target_level, 4);
1352 }
1353
1354 for level_handler in &mut levels_handler {
1355 for pending_task_id in &level_handler.pending_tasks_ids() {
1356 level_handler.remove_task(*pending_task_id);
1357 }
1358 }
1359
1360 {
1362 let option = ManualCompactionOption {
1363 sst_ids: vec![],
1364 key_range: KeyRange {
1365 left: Bytes::default(),
1366 right: Bytes::default(),
1367 right_exclusive: false,
1368 },
1369 internal_table_id: HashSet::default(),
1370 level: 4,
1371 };
1372 let mut selector = ManualCompactionSelector::new(option);
1373 let task = selector
1374 .pick_compaction(
1375 1,
1376 compaction_selector_context(
1377 &group_config,
1378 &levels,
1379 &BTreeSet::new(),
1380 &mut levels_handler,
1381 &mut local_stats,
1382 &HashMap::default(),
1383 Arc::new(CompactionDeveloperConfig::default()),
1384 &Default::default(),
1385 &HummockVersionStateTableInfo::empty(),
1386 ),
1387 )
1388 .unwrap();
1389 assert_compaction_task(&task, &levels_handler);
1390 assert_eq!(task.input.input_levels.len(), 2);
1391 assert_eq!(task.input.input_levels[0].level_idx, 4);
1392 assert_eq!(task.input.input_levels[0].table_infos.len(), 6);
1393 assert_eq!(task.input.input_levels[1].level_idx, 4);
1394 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
1395 assert_eq!(task.input.target_level, 4);
1396 assert!(matches!(
1397 task.compaction_task_type,
1398 compact_task::TaskType::Manual
1399 ));
1400 }
1401 }
1402}