1use std::sync::Arc;
16
17use risingwave_common::config::default::compaction_config;
18use risingwave_hummock_sdk::level::{InputLevel, Levels, OverlappingLevel};
19use risingwave_pb::hummock::{CompactionConfig, LevelType};
20
21use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker;
22use super::{
23 CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
24 ValidationRuleType,
25};
26use crate::hummock::compaction::picker::TrivialMovePicker;
27use crate::hummock::compaction::{CompactionDeveloperConfig, create_overlap_strategy};
28use crate::hummock::level_handler::LevelHandler;
29
30pub struct IntraCompactionPicker {
31 config: Arc<CompactionConfig>,
32 compaction_task_validator: Arc<CompactionTaskValidator>,
33
34 developer_config: Arc<CompactionDeveloperConfig>,
35}
36
37impl CompactionPicker for IntraCompactionPicker {
38 fn pick_compaction(
39 &mut self,
40 levels: &Levels,
41 level_handlers: &[LevelHandler],
42 stats: &mut LocalPickerStatistic,
43 ) -> Option<CompactionInput> {
44 let l0 = &levels.l0;
45 if l0.sub_levels.is_empty() {
46 return None;
47 }
48
49 if let Some(ret) = self.pick_l0_trivial_move_file(l0, level_handlers, stats) {
50 return Some(ret);
51 }
52
53 let vnode_partition_count = self.config.split_weight_by_vnode;
54
55 if let Some(ret) =
56 self.pick_whole_level(l0, &level_handlers[0], vnode_partition_count, stats)
57 {
58 if ret.input_levels.len() < 2 {
59 tracing::error!(
60 ?ret,
61 vnode_partition_count,
62 "pick_whole_level failed to pick enough levels"
63 );
64 return None;
65 }
66
67 return Some(ret);
68 }
69
70 if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], vnode_partition_count, stats)
71 {
72 if ret.input_levels.len() < 2 {
73 tracing::error!(
74 ?ret,
75 vnode_partition_count,
76 "pick_l0_intra failed to pick enough levels"
77 );
78 return None;
79 }
80
81 return Some(ret);
82 }
83
84 None
85 }
86}
87
88impl IntraCompactionPicker {
89 #[cfg(test)]
90 pub fn for_test(
91 config: Arc<CompactionConfig>,
92 developer_config: Arc<CompactionDeveloperConfig>,
93 ) -> IntraCompactionPicker {
94 IntraCompactionPicker {
95 compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
96 config,
97 developer_config,
98 }
99 }
100
101 pub fn new_with_validator(
102 config: Arc<CompactionConfig>,
103 compaction_task_validator: Arc<CompactionTaskValidator>,
104 developer_config: Arc<CompactionDeveloperConfig>,
105 ) -> IntraCompactionPicker {
106 assert!(config.level0_sub_level_compact_level_count > 1);
107 IntraCompactionPicker {
108 config,
109 compaction_task_validator,
110 developer_config,
111 }
112 }
113
114 fn pick_whole_level(
115 &self,
116 l0: &OverlappingLevel,
117 level_handler: &LevelHandler,
118 partition_count: u32,
119 stats: &mut LocalPickerStatistic,
120 ) -> Option<CompactionInput> {
121 let picker = WholeLevelCompactionPicker::new(
122 self.config.clone(),
123 self.compaction_task_validator.clone(),
124 );
125 picker.pick_whole_level(l0, level_handler, partition_count, stats)
126 }
127
128 fn pick_l0_intra(
129 &self,
130 l0: &OverlappingLevel,
131 level_handler: &LevelHandler,
132 vnode_partition_count: u32,
133 stats: &mut LocalPickerStatistic,
134 ) -> Option<CompactionInput> {
135 let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
136 let mut max_vnode_partition_idx = 0;
137 for (idx, level) in l0.sub_levels.iter().enumerate() {
138 if level.vnode_partition_count < vnode_partition_count {
139 break;
140 }
141 max_vnode_partition_idx = idx;
142 }
143
144 for (idx, level) in l0.sub_levels.iter().enumerate() {
145 if level.level_type != LevelType::Nonoverlapping
146 || level.total_file_size > self.config.sub_level_max_compaction_bytes
147 {
148 continue;
149 }
150
151 if idx > max_vnode_partition_idx {
152 break;
153 }
154
155 if level_handler.is_level_all_pending_compact(level) {
156 continue;
157 }
158
159 let max_compaction_bytes = std::cmp::min(
160 self.config.max_compaction_bytes,
161 self.config.sub_level_max_compaction_bytes,
162 );
163
164 let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new(
165 self.config.sub_level_max_compaction_bytes / 2,
166 max_compaction_bytes,
167 self.config.level0_sub_level_compact_level_count as usize,
168 self.config.level0_max_compact_file_number,
169 overlap_strategy.clone(),
170 self.developer_config.enable_check_task_level_overlap,
171 self.config
172 .max_l0_compact_level_count
173 .unwrap_or(compaction_config::max_l0_compact_level_count())
174 as usize,
175 self.config
176 .enable_optimize_l0_interval_selection
177 .unwrap_or(compaction_config::enable_optimize_l0_interval_selection()),
178 );
179
180 let l0_select_tables_vec = non_overlap_sub_level_picker
181 .pick_l0_multi_non_overlap_level(
182 &l0.sub_levels[idx..=max_vnode_partition_idx],
183 level_handler,
184 );
185
186 if l0_select_tables_vec.is_empty() {
187 continue;
188 }
189
190 let mut select_input_size = 0;
191 let mut total_file_count = 0;
192 for input in l0_select_tables_vec {
193 let mut select_level_inputs = Vec::with_capacity(input.sstable_infos.len());
194 let mut target_sub_level_id = None;
195 for (sub_level_id, level_select_sst) in input.sstable_infos {
196 if level_select_sst.is_empty() {
197 continue;
198 }
199
200 if target_sub_level_id.is_none() {
201 target_sub_level_id = Some(sub_level_id);
202 }
203
204 select_level_inputs.push(InputLevel {
205 level_idx: 0,
206 level_type: LevelType::Nonoverlapping,
207 table_infos: level_select_sst,
208 });
209
210 select_input_size += input.total_file_size;
211 total_file_count += input.total_file_count;
212 }
213 select_level_inputs.reverse();
214
215 let result = CompactionInput {
216 input_levels: select_level_inputs,
217 target_sub_level_id: target_sub_level_id.unwrap(),
218 select_input_size,
219 total_file_count: total_file_count as u64,
220 ..Default::default()
221 };
222
223 if !self.compaction_task_validator.valid_compact_task(
224 &result,
225 ValidationRuleType::Intra,
226 stats,
227 ) {
228 continue;
229 }
230
231 return Some(result);
232 }
233 }
234
235 None
236 }
237
238 fn pick_l0_trivial_move_file(
239 &self,
240 l0: &OverlappingLevel,
241 level_handlers: &[LevelHandler],
242 stats: &mut LocalPickerStatistic,
243 ) -> Option<CompactionInput> {
244 if !self.developer_config.enable_trivial_move {
245 return None;
246 }
247
248 let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
249
250 for (idx, level) in l0.sub_levels.iter().enumerate() {
251 if level.level_type == LevelType::Overlapping || idx + 1 >= l0.sub_levels.len() {
252 continue;
253 }
254
255 if l0.sub_levels[idx + 1].level_type == LevelType::Overlapping {
256 continue;
257 }
258
259 if level_handlers[0].is_level_pending_compact(level) {
260 continue;
261 }
262
263 if l0.sub_levels[idx + 1].vnode_partition_count
264 != l0.sub_levels[idx].vnode_partition_count
265 {
266 continue;
267 }
268
269 let trivial_move_picker = TrivialMovePicker::new(
270 0,
271 0,
272 overlap_strategy.clone(),
273 0,
274 self.config
275 .sst_allowed_trivial_move_max_count
276 .unwrap_or(compaction_config::sst_allowed_trivial_move_max_count())
277 as usize,
278 );
279
280 if let Some(select_ssts) = trivial_move_picker.pick_multi_trivial_move_ssts(
281 &l0.sub_levels[idx + 1].table_infos,
282 &level.table_infos,
283 level_handlers,
284 stats,
285 ) {
286 let mut overlap = overlap_strategy.create_overlap_info();
287 select_ssts
288 .iter()
289 .for_each(|sst| overlap.update(&sst.key_range));
290
291 assert!(
292 overlap
293 .check_multiple_overlap(&l0.sub_levels[idx].table_infos)
294 .is_empty()
295 );
296
297 let select_input_size = select_ssts.iter().map(|sst| sst.sst_size).sum();
298 let total_file_count = select_ssts.len() as u64;
299 let input_levels = vec![
300 InputLevel {
301 level_idx: 0,
302 level_type: LevelType::Nonoverlapping,
303 table_infos: select_ssts,
304 },
305 InputLevel {
306 level_idx: 0,
307 level_type: LevelType::Nonoverlapping,
308 table_infos: vec![],
309 },
310 ];
311 return Some(CompactionInput {
312 input_levels,
313 target_level: 0,
314 target_sub_level_id: level.sub_level_id,
315 select_input_size,
316 total_file_count,
317 ..Default::default()
318 });
319 }
320 }
321 None
322 }
323}
324
325pub struct WholeLevelCompactionPicker {
326 config: Arc<CompactionConfig>,
327 compaction_task_validator: Arc<CompactionTaskValidator>,
328}
329
330impl WholeLevelCompactionPicker {
331 pub fn new(
332 config: Arc<CompactionConfig>,
333 compaction_task_validator: Arc<CompactionTaskValidator>,
334 ) -> Self {
335 Self {
336 config,
337 compaction_task_validator,
338 }
339 }
340
341 pub fn pick_whole_level(
342 &self,
343 l0: &OverlappingLevel,
344 level_handler: &LevelHandler,
345 partition_count: u32,
346 stats: &mut LocalPickerStatistic,
347 ) -> Option<CompactionInput> {
348 if partition_count == 0 {
349 return None;
350 }
351 for (idx, level) in l0.sub_levels.iter().enumerate() {
352 if level.level_type != LevelType::Nonoverlapping
353 || level.vnode_partition_count == partition_count
354 {
355 continue;
356 }
357
358 let max_compaction_bytes = std::cmp::max(
359 self.config.max_bytes_for_level_base,
360 self.config.sub_level_max_compaction_bytes
361 * (self.config.level0_sub_level_compact_level_count as u64),
362 );
363
364 let mut select_input_size = 0;
365
366 let mut select_level_inputs = vec![];
367 let mut total_file_count = 0;
368 let mut wait_enough = false;
369 for next_level in l0.sub_levels.iter().skip(idx) {
370 if (select_input_size > max_compaction_bytes
371 || total_file_count > self.config.level0_max_compact_file_number
372 || next_level.vnode_partition_count == partition_count)
373 && select_level_inputs.len() > 1
374 {
375 wait_enough = true;
376 break;
377 }
378
379 if level_handler.is_level_pending_compact(next_level) {
380 break;
381 }
382
383 select_input_size += next_level.total_file_size;
384 total_file_count += next_level.table_infos.len() as u64;
385
386 select_level_inputs.push(InputLevel {
387 level_idx: 0,
388 level_type: next_level.level_type,
389 table_infos: next_level.table_infos.clone(),
390 });
391 }
392 if select_level_inputs.len() > 1 {
393 let vnode_partition_count =
394 if select_input_size > self.config.sub_level_max_compaction_bytes / 2 {
395 partition_count
396 } else {
397 0
398 };
399 let result = CompactionInput {
400 input_levels: select_level_inputs,
401 target_sub_level_id: level.sub_level_id,
402 select_input_size,
403 total_file_count,
404 vnode_partition_count,
405 ..Default::default()
406 };
407 if wait_enough
408 || self.compaction_task_validator.valid_compact_task(
409 &result,
410 ValidationRuleType::Intra,
411 stats,
412 )
413 {
414 return Some(result);
415 }
416 }
417 }
418
419 None
420 }
421}
422
423#[cfg(test)]
424pub mod tests {
425 use risingwave_hummock_sdk::level::Level;
426
427 use super::*;
428 use crate::hummock::compaction::TierCompactionPicker;
429 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
430 use crate::hummock::compaction::selector::tests::{
431 generate_l0_nonoverlapping_multi_sublevels, generate_l0_nonoverlapping_sublevels,
432 generate_l0_overlapping_sublevels, generate_level, generate_table,
433 push_table_level0_overlapping, push_tables_level0_nonoverlapping,
434 };
435
436 fn create_compaction_picker_for_test() -> IntraCompactionPicker {
437 let config = Arc::new(
438 CompactionConfigBuilder::new()
439 .level0_tier_compact_file_number(2)
440 .level0_sub_level_compact_level_count(1)
441 .build(),
442 );
443 IntraCompactionPicker::for_test(config, Arc::new(CompactionDeveloperConfig::default()))
444 }
445
446 #[test]
447 fn test_l0_to_l1_compact_conflict() {
448 let levels = vec![Level {
451 level_idx: 1,
452 level_type: LevelType::Nonoverlapping,
453 table_infos: vec![],
454 ..Default::default()
455 }];
456 let mut levels = Levels {
457 levels,
458 l0: OverlappingLevel {
459 sub_levels: vec![],
460 total_file_size: 0,
461 uncompressed_file_size: 0,
462 },
463 ..Default::default()
464 };
465 push_tables_level0_nonoverlapping(
466 &mut levels,
467 vec![
468 generate_table(1, 1, 100, 300, 2),
469 generate_table(2, 1, 350, 500, 2),
470 ],
471 );
472 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
473
474 let mut local_stats = LocalPickerStatistic::default();
475 push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(3, 1, 250, 300, 3)]);
476 let config: CompactionConfig = CompactionConfigBuilder::new()
477 .level0_tier_compact_file_number(2)
478 .max_compaction_bytes(1000)
479 .sub_level_max_compaction_bytes(150)
480 .max_bytes_for_level_multiplier(1)
481 .level0_sub_level_compact_level_count(3)
482 .build();
483 let mut picker = TierCompactionPicker::new(Arc::new(config));
484
485 let ret: Option<CompactionInput> =
486 picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
487 assert!(ret.is_none());
488 }
489
490 #[test]
491 fn test_compacting_key_range_overlap_intra_l0() {
492 let mut picker = create_compaction_picker_for_test();
495
496 let mut levels = Levels {
497 levels: vec![Level {
498 level_idx: 1,
499 level_type: LevelType::Nonoverlapping,
500 table_infos: vec![generate_table(3, 1, 200, 300, 2)],
501 ..Default::default()
502 }],
503 l0: generate_l0_nonoverlapping_sublevels(vec![
504 generate_table(1, 1, 100, 210, 2),
505 generate_table(2, 1, 200, 250, 2),
506 ]),
507 ..Default::default()
508 };
509 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
510
511 let mut local_stats = LocalPickerStatistic::default();
512 let ret = picker
513 .pick_compaction(&levels, &levels_handler, &mut local_stats)
514 .unwrap();
515 ret.add_pending_task(0, &mut levels_handler);
516
517 push_table_level0_overlapping(&mut levels, generate_table(4, 1, 170, 180, 3));
518 assert!(
519 picker
520 .pick_compaction(&levels, &levels_handler, &mut local_stats)
521 .is_none()
522 );
523 }
524
525 #[test]
526 fn test_pick_l0_intra() {
527 {
528 let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
529 vec![
530 generate_table(6, 1, 50, 99, 1),
531 generate_table(1, 1, 100, 200, 1),
532 generate_table(2, 1, 250, 300, 1),
533 ],
534 vec![
535 generate_table(3, 1, 10, 90, 1),
536 generate_table(6, 1, 100, 110, 1),
537 ],
538 vec![
539 generate_table(4, 1, 50, 99, 1),
540 generate_table(5, 1, 100, 200, 1),
541 ],
542 ]);
543 let levels = Levels {
544 l0,
545 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
546 ..Default::default()
547 };
548 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
549 levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
550 let config = Arc::new(
551 CompactionConfigBuilder::new()
552 .level0_sub_level_compact_level_count(1)
553 .level0_overlapping_sub_level_compact_level_count(4)
554 .build(),
555 );
556 let mut picker = IntraCompactionPicker::for_test(
557 config,
558 Arc::new(CompactionDeveloperConfig::default()),
559 );
560 let mut local_stats = LocalPickerStatistic::default();
561 let ret = picker
562 .pick_compaction(&levels, &levels_handler, &mut local_stats)
563 .unwrap();
564 ret.add_pending_task(1, &mut levels_handler);
565 assert_eq!(
566 ret.input_levels
567 .iter()
568 .map(|i| i.table_infos.len())
569 .sum::<usize>(),
570 3
571 );
572 }
573
574 {
575 let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
578 vec![
579 generate_table(1, 1, 100, 200, 1),
580 generate_table(2, 1, 300, 400, 1),
581 ],
582 vec![
583 generate_table(3, 1, 100, 200, 1),
584 generate_table(6, 1, 300, 500, 1),
585 ],
586 vec![
587 generate_table(4, 1, 100, 200, 1),
588 generate_table(5, 1, 300, 400, 1),
589 ],
590 ]);
591 let levels = Levels {
592 l0,
593 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
594 ..Default::default()
595 };
596 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
597 levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
598 let config = Arc::new(
599 CompactionConfigBuilder::new()
600 .level0_sub_level_compact_level_count(1)
601 .sub_level_max_compaction_bytes(300)
602 .build(),
603 );
604 let mut picker = IntraCompactionPicker::for_test(
605 config,
606 Arc::new(CompactionDeveloperConfig::default()),
607 );
608 let mut local_stats = LocalPickerStatistic::default();
609 let ret = picker
610 .pick_compaction(&levels, &levels_handler, &mut local_stats)
611 .unwrap();
612 ret.add_pending_task(1, &mut levels_handler);
613 assert_eq!(
614 ret.input_levels
615 .iter()
616 .map(|i| i.table_infos.len())
617 .sum::<usize>(),
618 3
619 );
620
621 assert_eq!(4, ret.input_levels[0].table_infos[0].sst_id);
622 assert_eq!(3, ret.input_levels[1].table_infos[0].sst_id);
623 assert_eq!(1, ret.input_levels[2].table_infos[0].sst_id);
624
625 let ret2 = picker
627 .pick_compaction(&levels, &levels_handler, &mut local_stats)
628 .unwrap();
629
630 assert_eq!(
631 ret2.input_levels
632 .iter()
633 .map(|i| i.table_infos.len())
634 .sum::<usize>(),
635 2
636 );
637
638 assert_eq!(6, ret2.input_levels[0].table_infos[0].sst_id);
639 assert_eq!(2, ret2.input_levels[1].table_infos[0].sst_id);
640 }
641
642 {
643 let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
644 vec![
645 generate_table(1, 1, 100, 149, 1),
646 generate_table(6, 1, 150, 199, 1),
647 generate_table(7, 1, 200, 250, 1),
648 generate_table(2, 1, 300, 400, 1),
649 ],
650 vec![
651 generate_table(3, 1, 100, 149, 1),
652 generate_table(8, 1, 150, 199, 1),
653 generate_table(9, 1, 200, 250, 1),
654 generate_table(10, 1, 300, 400, 1),
655 ],
656 vec![
657 generate_table(4, 1, 100, 199, 1),
658 generate_table(11, 1, 200, 250, 1),
659 generate_table(5, 1, 300, 350, 1),
660 ],
661 ]);
662 let levels = Levels {
663 l0,
664 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
665 ..Default::default()
666 };
667 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
668 levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
669 let config = Arc::new(
670 CompactionConfigBuilder::new()
671 .level0_sub_level_compact_level_count(1)
672 .sub_level_max_compaction_bytes(300)
673 .build(),
674 );
675 let mut picker = IntraCompactionPicker::for_test(
676 config,
677 Arc::new(CompactionDeveloperConfig::default()),
678 );
679 let mut local_stats = LocalPickerStatistic::default();
680 let ret = picker
681 .pick_compaction(&levels, &levels_handler, &mut local_stats)
682 .unwrap();
683 ret.add_pending_task(1, &mut levels_handler);
684 assert_eq!(
685 ret.input_levels
686 .iter()
687 .map(|i| i.table_infos.len())
688 .sum::<usize>(),
689 3
690 );
691
692 assert_eq!(11, ret.input_levels[0].table_infos[0].sst_id);
693 assert_eq!(9, ret.input_levels[1].table_infos[0].sst_id);
694 assert_eq!(7, ret.input_levels[2].table_infos[0].sst_id);
695
696 let ret2 = picker
697 .pick_compaction(&levels, &levels_handler, &mut local_stats)
698 .unwrap();
699
700 assert_eq!(
701 ret2.input_levels
702 .iter()
703 .map(|i| i.table_infos.len())
704 .sum::<usize>(),
705 3
706 );
707
708 assert_eq!(5, ret2.input_levels[0].table_infos[0].sst_id);
709 assert_eq!(10, ret2.input_levels[1].table_infos[0].sst_id);
710 assert_eq!(2, ret2.input_levels[2].table_infos[0].sst_id);
711 }
712 }
713
714 fn is_l0_trivial_move(compaction_input: &CompactionInput) -> bool {
715 compaction_input.input_levels.len() == 2
716 && !compaction_input.input_levels[0].table_infos.is_empty()
717 && compaction_input.input_levels[1].table_infos.is_empty()
718 }
719
720 #[test]
721 fn test_trivial_move() {
722 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
723 let config = Arc::new(
724 CompactionConfigBuilder::new()
725 .level0_tier_compact_file_number(2)
726 .target_file_size_base(30)
727 .level0_sub_level_compact_level_count(20) .build(),
729 );
730 let mut picker =
731 IntraCompactionPicker::for_test(config, Arc::new(CompactionDeveloperConfig::default()));
732
733 let l0 = generate_l0_overlapping_sublevels(vec![vec![
735 generate_table(1, 1, 100, 110, 1),
736 generate_table(2, 1, 150, 250, 1),
737 ]]);
738 let levels = Levels {
739 l0,
740 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
741 ..Default::default()
742 };
743 levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
744 let mut local_stats = LocalPickerStatistic::default();
745 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
746 assert!(ret.is_none());
747
748 let l0: OverlappingLevel = generate_l0_overlapping_sublevels(vec![
750 vec![
751 generate_table(1, 1, 100, 110, 1),
752 generate_table(2, 1, 150, 250, 1),
753 ],
754 vec![generate_table(3, 1, 10, 90, 1)],
755 vec![generate_table(4, 1, 10, 90, 1)],
756 vec![generate_table(5, 1, 10, 90, 1)],
757 ]);
758 let mut levels = Levels {
759 l0,
760 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
761 ..Default::default()
762 };
763 assert!(
764 picker
765 .pick_compaction(&levels, &levels_handler, &mut local_stats)
766 .is_none()
767 );
768
769 levels.l0.sub_levels[0].level_type = LevelType::Nonoverlapping;
771 levels.l0.sub_levels[1].level_type = LevelType::Overlapping;
772 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
773 assert!(ret.is_none());
774
775 levels.l0.sub_levels[0].level_type = LevelType::Overlapping;
777 levels.l0.sub_levels[1].level_type = LevelType::Nonoverlapping;
778 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
779 assert!(ret.is_none());
780
781 levels.l0.sub_levels[0].level_type = LevelType::Nonoverlapping;
783 levels.l0.sub_levels[1].level_type = LevelType::Nonoverlapping;
784 let ret = picker
785 .pick_compaction(&levels, &levels_handler, &mut local_stats)
786 .unwrap();
787 assert!(is_l0_trivial_move(&ret));
788 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
789 }
790 #[test]
791 fn test_pick_whole_level() {
792 let config = Arc::new(
793 CompactionConfigBuilder::new()
794 .level0_max_compact_file_number(20)
795 .build(),
796 );
797 let mut table_infos = vec![];
798 for epoch in 1..3 {
799 let base = epoch * 100;
800 let mut ssts = vec![];
801 for i in 1..50 {
802 let left = (i as usize) * 100;
803 let right = left + 100;
804 ssts.push(generate_table(base + i, 1, left, right, epoch));
805 }
806 table_infos.push(ssts);
807 }
808
809 let l0 = generate_l0_nonoverlapping_multi_sublevels(table_infos);
810 let compaction_task_validator = Arc::new(CompactionTaskValidator::new(config.clone()));
811 let picker = WholeLevelCompactionPicker::new(config, compaction_task_validator);
812 let level_handler = LevelHandler::new(0);
813 let ret = picker
814 .pick_whole_level(&l0, &level_handler, 4, &mut LocalPickerStatistic::default())
815 .unwrap();
816 assert_eq!(ret.input_levels.len(), 2);
817 }
818
819 #[test]
820 fn test_priority() {
821 let config = Arc::new(
822 CompactionConfigBuilder::new()
823 .level0_max_compact_file_number(20)
824 .sub_level_max_compaction_bytes(1)
825 .level0_sub_level_compact_level_count(2)
826 .build(),
827 );
828 let mut table_infos = vec![];
829 for epoch in 1..3 {
830 let base = epoch * 100;
831 let mut ssts = vec![];
832 for i in 1..50 {
833 let left = (i as usize) * 100;
834 let right = left + 100;
835 ssts.push(generate_table(base + i, 1, left, right, epoch));
836 }
837 table_infos.push(ssts);
838 }
839
840 let mut l0 = generate_l0_nonoverlapping_multi_sublevels(table_infos);
841 l0.sub_levels[1]
843 .table_infos
844 .push(generate_table(9999, 900000000, 0, 100, 1));
845
846 l0.sub_levels[0].total_file_size = 1;
847 l0.sub_levels[1].total_file_size = 1;
848
849 let mut picker = IntraCompactionPicker::new_with_validator(
850 config,
851 Arc::new(CompactionTaskValidator::unused()),
852 Arc::new(CompactionDeveloperConfig::default()),
853 );
854 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
855 let mut local_stats = LocalPickerStatistic::default();
856
857 let levels = Levels {
858 l0,
859 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
860 ..Default::default()
861 };
862
863 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
864 assert!(is_l0_trivial_move(ret.as_ref().unwrap()));
865 ret.as_ref()
866 .unwrap()
867 .add_pending_task(1, &mut levels_handler);
868 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
869 assert!(ret.is_some());
870 let input = ret.as_ref().unwrap();
871 assert_eq!(input.input_levels.len(), 2);
872 assert_ne!(
873 levels.l0.sub_levels[0].table_infos.len(),
874 input.input_levels[0].table_infos.len()
875 );
876 assert_ne!(
877 levels.l0.sub_levels[1].table_infos.len(),
878 input.input_levels[1].table_infos.len()
879 );
880 }
881}