1use std::sync::Arc;
16
17use risingwave_common::config::default::compaction_config;
18use risingwave_hummock_sdk::level::{InputLevel, Levels, OverlappingLevel};
19use risingwave_pb::hummock::{CompactionConfig, LevelType};
20
21use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker;
22use super::{
23 CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
24 ValidationRuleType,
25};
26use crate::hummock::compaction::picker::TrivialMovePicker;
27use crate::hummock::compaction::{CompactionDeveloperConfig, create_overlap_strategy};
28use crate::hummock::level_handler::LevelHandler;
29
30pub struct IntraCompactionPicker {
31 config: Arc<CompactionConfig>,
32 compaction_task_validator: Arc<CompactionTaskValidator>,
33
34 developer_config: Arc<CompactionDeveloperConfig>,
35}
36
37impl CompactionPicker for IntraCompactionPicker {
38 fn pick_compaction(
39 &mut self,
40 levels: &Levels,
41 level_handlers: &[LevelHandler],
42 stats: &mut LocalPickerStatistic,
43 ) -> Option<CompactionInput> {
44 let l0 = &levels.l0;
45 if l0.sub_levels.is_empty() {
46 return None;
47 }
48
49 if let Some(ret) = self.pick_l0_trivial_move_file(l0, level_handlers, stats) {
50 return Some(ret);
51 }
52
53 let vnode_partition_count = self.config.split_weight_by_vnode;
54
55 if let Some(ret) =
56 self.pick_whole_level(l0, &level_handlers[0], vnode_partition_count, stats)
57 {
58 if ret.input_levels.len() < 2 {
59 tracing::error!(
60 ?ret,
61 vnode_partition_count,
62 "pick_whole_level failed to pick enough levels"
63 );
64 return None;
65 }
66
67 return Some(ret);
68 }
69
70 if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], vnode_partition_count, stats)
71 {
72 if ret.input_levels.len() < 2 {
73 tracing::error!(
74 ?ret,
75 vnode_partition_count,
76 "pick_l0_intra failed to pick enough levels"
77 );
78 return None;
79 }
80
81 return Some(ret);
82 }
83
84 None
85 }
86}
87
88impl IntraCompactionPicker {
89 #[cfg(test)]
90 pub fn for_test(
91 config: Arc<CompactionConfig>,
92 developer_config: Arc<CompactionDeveloperConfig>,
93 ) -> IntraCompactionPicker {
94 IntraCompactionPicker {
95 compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
96 config,
97 developer_config,
98 }
99 }
100
101 pub fn new_with_validator(
102 config: Arc<CompactionConfig>,
103 compaction_task_validator: Arc<CompactionTaskValidator>,
104 developer_config: Arc<CompactionDeveloperConfig>,
105 ) -> IntraCompactionPicker {
106 assert!(config.level0_sub_level_compact_level_count > 1);
107 IntraCompactionPicker {
108 config,
109 compaction_task_validator,
110 developer_config,
111 }
112 }
113
114 fn pick_whole_level(
115 &self,
116 l0: &OverlappingLevel,
117 level_handler: &LevelHandler,
118 partition_count: u32,
119 stats: &mut LocalPickerStatistic,
120 ) -> Option<CompactionInput> {
121 let picker = WholeLevelCompactionPicker::new(
122 self.config.clone(),
123 self.compaction_task_validator.clone(),
124 );
125 picker.pick_whole_level(l0, level_handler, partition_count, stats)
126 }
127
128 fn pick_l0_intra(
129 &self,
130 l0: &OverlappingLevel,
131 level_handler: &LevelHandler,
132 vnode_partition_count: u32,
133 stats: &mut LocalPickerStatistic,
134 ) -> Option<CompactionInput> {
135 let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
136 let mut max_vnode_partition_idx = 0;
137 for (idx, level) in l0.sub_levels.iter().enumerate() {
138 if level.vnode_partition_count < vnode_partition_count {
139 break;
140 }
141 max_vnode_partition_idx = idx;
142 }
143
144 for (idx, level) in l0.sub_levels.iter().enumerate() {
145 if level.level_type != LevelType::Nonoverlapping
146 || level.total_file_size > self.config.sub_level_max_compaction_bytes
147 {
148 continue;
149 }
150
151 if idx > max_vnode_partition_idx {
152 break;
153 }
154
155 if level_handler.is_level_all_pending_compact(level) {
156 continue;
157 }
158
159 let max_compaction_bytes = std::cmp::min(
160 self.config.max_compaction_bytes,
161 self.config.sub_level_max_compaction_bytes,
162 );
163
164 let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new(
165 self.config.sub_level_max_compaction_bytes / 2,
166 max_compaction_bytes,
167 self.config.level0_sub_level_compact_level_count as usize,
168 self.config.level0_max_compact_file_number,
169 overlap_strategy.clone(),
170 self.developer_config.enable_check_task_level_overlap,
171 self.config
172 .max_l0_compact_level_count
173 .unwrap_or(compaction_config::max_l0_compact_level_count())
174 as usize,
175 );
176
177 let l0_select_tables_vec = non_overlap_sub_level_picker
178 .pick_l0_multi_non_overlap_level(
179 &l0.sub_levels[idx..=max_vnode_partition_idx],
180 level_handler,
181 );
182
183 if l0_select_tables_vec.is_empty() {
184 continue;
185 }
186
187 let mut select_input_size = 0;
188 let mut total_file_count = 0;
189 for input in l0_select_tables_vec {
190 let mut max_level_size = 0;
191 for level_select_table in &input.sstable_infos {
192 let level_select_size = level_select_table
193 .iter()
194 .map(|sst| sst.sst_size)
195 .sum::<u64>();
196
197 max_level_size = std::cmp::max(max_level_size, level_select_size);
198 }
199
200 let mut select_level_inputs = Vec::with_capacity(input.sstable_infos.len());
201 for level_select_sst in input.sstable_infos {
202 if level_select_sst.is_empty() {
203 continue;
204 }
205 select_level_inputs.push(InputLevel {
206 level_idx: 0,
207 level_type: LevelType::Nonoverlapping,
208 table_infos: level_select_sst,
209 });
210
211 select_input_size += input.total_file_size;
212 total_file_count += input.total_file_count;
213 }
214 select_level_inputs.reverse();
215
216 let result = CompactionInput {
217 input_levels: select_level_inputs,
218 target_sub_level_id: level.sub_level_id,
219 select_input_size,
220 total_file_count: total_file_count as u64,
221 ..Default::default()
222 };
223
224 if !self.compaction_task_validator.valid_compact_task(
225 &result,
226 ValidationRuleType::Intra,
227 stats,
228 ) {
229 continue;
230 }
231
232 return Some(result);
233 }
234 }
235
236 None
237 }
238
239 fn pick_l0_trivial_move_file(
240 &self,
241 l0: &OverlappingLevel,
242 level_handlers: &[LevelHandler],
243 stats: &mut LocalPickerStatistic,
244 ) -> Option<CompactionInput> {
245 if !self.developer_config.enable_trivial_move {
246 return None;
247 }
248
249 let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
250
251 for (idx, level) in l0.sub_levels.iter().enumerate() {
252 if level.level_type == LevelType::Overlapping || idx + 1 >= l0.sub_levels.len() {
253 continue;
254 }
255
256 if l0.sub_levels[idx + 1].level_type == LevelType::Overlapping {
257 continue;
258 }
259
260 if level_handlers[0].is_level_pending_compact(level) {
261 continue;
262 }
263
264 if l0.sub_levels[idx + 1].vnode_partition_count
265 != l0.sub_levels[idx].vnode_partition_count
266 {
267 continue;
268 }
269
270 let trivial_move_picker = TrivialMovePicker::new(
271 0,
272 0,
273 overlap_strategy.clone(),
274 0,
275 self.config
276 .sst_allowed_trivial_move_max_count
277 .unwrap_or(compaction_config::sst_allowed_trivial_move_max_count())
278 as usize,
279 );
280
281 if let Some(select_ssts) = trivial_move_picker.pick_multi_trivial_move_ssts(
282 &l0.sub_levels[idx + 1].table_infos,
283 &level.table_infos,
284 level_handlers,
285 stats,
286 ) {
287 let mut overlap = overlap_strategy.create_overlap_info();
288 select_ssts.iter().for_each(|ssts| overlap.update(ssts));
289
290 assert!(
291 overlap
292 .check_multiple_overlap(&l0.sub_levels[idx].table_infos)
293 .is_empty()
294 );
295
296 let select_input_size = select_ssts.iter().map(|sst| sst.sst_size).sum();
297 let total_file_count = select_ssts.len() as u64;
298 let input_levels = vec![
299 InputLevel {
300 level_idx: 0,
301 level_type: LevelType::Nonoverlapping,
302 table_infos: select_ssts,
303 },
304 InputLevel {
305 level_idx: 0,
306 level_type: LevelType::Nonoverlapping,
307 table_infos: vec![],
308 },
309 ];
310 return Some(CompactionInput {
311 input_levels,
312 target_level: 0,
313 target_sub_level_id: level.sub_level_id,
314 select_input_size,
315 total_file_count,
316 ..Default::default()
317 });
318 }
319 }
320 None
321 }
322}
323
324pub struct WholeLevelCompactionPicker {
325 config: Arc<CompactionConfig>,
326 compaction_task_validator: Arc<CompactionTaskValidator>,
327}
328
329impl WholeLevelCompactionPicker {
330 pub fn new(
331 config: Arc<CompactionConfig>,
332 compaction_task_validator: Arc<CompactionTaskValidator>,
333 ) -> Self {
334 Self {
335 config,
336 compaction_task_validator,
337 }
338 }
339
340 pub fn pick_whole_level(
341 &self,
342 l0: &OverlappingLevel,
343 level_handler: &LevelHandler,
344 partition_count: u32,
345 stats: &mut LocalPickerStatistic,
346 ) -> Option<CompactionInput> {
347 if partition_count == 0 {
348 return None;
349 }
350 for (idx, level) in l0.sub_levels.iter().enumerate() {
351 if level.level_type != LevelType::Nonoverlapping
352 || level.vnode_partition_count == partition_count
353 {
354 continue;
355 }
356
357 let max_compaction_bytes = std::cmp::max(
358 self.config.max_bytes_for_level_base,
359 self.config.sub_level_max_compaction_bytes
360 * (self.config.level0_sub_level_compact_level_count as u64),
361 );
362
363 let mut select_input_size = 0;
364
365 let mut select_level_inputs = vec![];
366 let mut total_file_count = 0;
367 let mut wait_enough = false;
368 for next_level in l0.sub_levels.iter().skip(idx) {
369 if (select_input_size > max_compaction_bytes
370 || total_file_count > self.config.level0_max_compact_file_number
371 || next_level.vnode_partition_count == partition_count)
372 && select_level_inputs.len() > 1
373 {
374 wait_enough = true;
375 break;
376 }
377
378 if level_handler.is_level_pending_compact(next_level) {
379 break;
380 }
381
382 select_input_size += next_level.total_file_size;
383 total_file_count += next_level.table_infos.len() as u64;
384
385 select_level_inputs.push(InputLevel {
386 level_idx: 0,
387 level_type: next_level.level_type,
388 table_infos: next_level.table_infos.clone(),
389 });
390 }
391 if select_level_inputs.len() > 1 {
392 let vnode_partition_count =
393 if select_input_size > self.config.sub_level_max_compaction_bytes / 2 {
394 partition_count
395 } else {
396 0
397 };
398 let result = CompactionInput {
399 input_levels: select_level_inputs,
400 target_sub_level_id: level.sub_level_id,
401 select_input_size,
402 total_file_count,
403 vnode_partition_count,
404 ..Default::default()
405 };
406 if wait_enough
407 || self.compaction_task_validator.valid_compact_task(
408 &result,
409 ValidationRuleType::Intra,
410 stats,
411 )
412 {
413 return Some(result);
414 }
415 }
416 }
417
418 None
419 }
420}
421
422#[cfg(test)]
423pub mod tests {
424 use risingwave_hummock_sdk::level::Level;
425
426 use super::*;
427 use crate::hummock::compaction::TierCompactionPicker;
428 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
429 use crate::hummock::compaction::selector::tests::{
430 generate_l0_nonoverlapping_multi_sublevels, generate_l0_nonoverlapping_sublevels,
431 generate_l0_overlapping_sublevels, generate_level, generate_table,
432 push_table_level0_overlapping, push_tables_level0_nonoverlapping,
433 };
434
435 fn create_compaction_picker_for_test() -> IntraCompactionPicker {
436 let config = Arc::new(
437 CompactionConfigBuilder::new()
438 .level0_tier_compact_file_number(2)
439 .level0_sub_level_compact_level_count(1)
440 .build(),
441 );
442 IntraCompactionPicker::for_test(config, Arc::new(CompactionDeveloperConfig::default()))
443 }
444
445 #[test]
446 fn test_l0_to_l1_compact_conflict() {
447 let levels = vec![Level {
450 level_idx: 1,
451 level_type: LevelType::Nonoverlapping,
452 table_infos: vec![],
453 ..Default::default()
454 }];
455 let mut levels = Levels {
456 levels,
457 l0: OverlappingLevel {
458 sub_levels: vec![],
459 total_file_size: 0,
460 uncompressed_file_size: 0,
461 },
462 ..Default::default()
463 };
464 push_tables_level0_nonoverlapping(
465 &mut levels,
466 vec![
467 generate_table(1, 1, 100, 300, 2),
468 generate_table(2, 1, 350, 500, 2),
469 ],
470 );
471 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
472
473 let mut local_stats = LocalPickerStatistic::default();
474 push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(3, 1, 250, 300, 3)]);
475 let config: CompactionConfig = CompactionConfigBuilder::new()
476 .level0_tier_compact_file_number(2)
477 .max_compaction_bytes(1000)
478 .sub_level_max_compaction_bytes(150)
479 .max_bytes_for_level_multiplier(1)
480 .level0_sub_level_compact_level_count(3)
481 .build();
482 let mut picker = TierCompactionPicker::new(Arc::new(config));
483
484 let ret: Option<CompactionInput> =
485 picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
486 assert!(ret.is_none());
487 }
488
489 #[test]
490 fn test_compacting_key_range_overlap_intra_l0() {
491 let mut picker = create_compaction_picker_for_test();
494
495 let mut levels = Levels {
496 levels: vec![Level {
497 level_idx: 1,
498 level_type: LevelType::Nonoverlapping,
499 table_infos: vec![generate_table(3, 1, 200, 300, 2)],
500 ..Default::default()
501 }],
502 l0: generate_l0_nonoverlapping_sublevels(vec![
503 generate_table(1, 1, 100, 210, 2),
504 generate_table(2, 1, 200, 250, 2),
505 ]),
506 ..Default::default()
507 };
508 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
509
510 let mut local_stats = LocalPickerStatistic::default();
511 let ret = picker
512 .pick_compaction(&levels, &levels_handler, &mut local_stats)
513 .unwrap();
514 ret.add_pending_task(0, &mut levels_handler);
515
516 push_table_level0_overlapping(&mut levels, generate_table(4, 1, 170, 180, 3));
517 assert!(
518 picker
519 .pick_compaction(&levels, &levels_handler, &mut local_stats)
520 .is_none()
521 );
522 }
523
524 #[test]
525 fn test_pick_l0_intra() {
526 {
527 let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
528 vec![
529 generate_table(6, 1, 50, 99, 1),
530 generate_table(1, 1, 100, 200, 1),
531 generate_table(2, 1, 250, 300, 1),
532 ],
533 vec![
534 generate_table(3, 1, 10, 90, 1),
535 generate_table(6, 1, 100, 110, 1),
536 ],
537 vec![
538 generate_table(4, 1, 50, 99, 1),
539 generate_table(5, 1, 100, 200, 1),
540 ],
541 ]);
542 let levels = Levels {
543 l0,
544 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
545 ..Default::default()
546 };
547 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
548 levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
549 let config = Arc::new(
550 CompactionConfigBuilder::new()
551 .level0_sub_level_compact_level_count(1)
552 .level0_overlapping_sub_level_compact_level_count(4)
553 .build(),
554 );
555 let mut picker = IntraCompactionPicker::for_test(
556 config,
557 Arc::new(CompactionDeveloperConfig::default()),
558 );
559 let mut local_stats = LocalPickerStatistic::default();
560 let ret = picker
561 .pick_compaction(&levels, &levels_handler, &mut local_stats)
562 .unwrap();
563 ret.add_pending_task(1, &mut levels_handler);
564 assert_eq!(
565 ret.input_levels
566 .iter()
567 .map(|i| i.table_infos.len())
568 .sum::<usize>(),
569 3
570 );
571 }
572
573 {
574 let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
577 vec![
578 generate_table(1, 1, 100, 200, 1),
579 generate_table(2, 1, 300, 400, 1),
580 ],
581 vec![
582 generate_table(3, 1, 100, 200, 1),
583 generate_table(6, 1, 300, 500, 1),
584 ],
585 vec![
586 generate_table(4, 1, 100, 200, 1),
587 generate_table(5, 1, 300, 400, 1),
588 ],
589 ]);
590 let levels = Levels {
591 l0,
592 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
593 ..Default::default()
594 };
595 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
596 levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
597 let config = Arc::new(
598 CompactionConfigBuilder::new()
599 .level0_sub_level_compact_level_count(1)
600 .sub_level_max_compaction_bytes(300)
601 .build(),
602 );
603 let mut picker = IntraCompactionPicker::for_test(
604 config,
605 Arc::new(CompactionDeveloperConfig::default()),
606 );
607 let mut local_stats = LocalPickerStatistic::default();
608 let ret = picker
609 .pick_compaction(&levels, &levels_handler, &mut local_stats)
610 .unwrap();
611 ret.add_pending_task(1, &mut levels_handler);
612 assert_eq!(
613 ret.input_levels
614 .iter()
615 .map(|i| i.table_infos.len())
616 .sum::<usize>(),
617 3
618 );
619
620 assert_eq!(4, ret.input_levels[0].table_infos[0].sst_id);
621 assert_eq!(3, ret.input_levels[1].table_infos[0].sst_id);
622 assert_eq!(1, ret.input_levels[2].table_infos[0].sst_id);
623
624 let ret2 = picker
626 .pick_compaction(&levels, &levels_handler, &mut local_stats)
627 .unwrap();
628
629 assert_eq!(
630 ret2.input_levels
631 .iter()
632 .map(|i| i.table_infos.len())
633 .sum::<usize>(),
634 2
635 );
636
637 assert_eq!(6, ret2.input_levels[0].table_infos[0].sst_id);
638 assert_eq!(2, ret2.input_levels[1].table_infos[0].sst_id);
639 }
640
641 {
642 let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
643 vec![
644 generate_table(1, 1, 100, 149, 1),
645 generate_table(6, 1, 150, 199, 1),
646 generate_table(7, 1, 200, 250, 1),
647 generate_table(2, 1, 300, 400, 1),
648 ],
649 vec![
650 generate_table(3, 1, 100, 149, 1),
651 generate_table(8, 1, 150, 199, 1),
652 generate_table(9, 1, 200, 250, 1),
653 generate_table(10, 1, 300, 400, 1),
654 ],
655 vec![
656 generate_table(4, 1, 100, 199, 1),
657 generate_table(11, 1, 200, 250, 1),
658 generate_table(5, 1, 300, 350, 1),
659 ],
660 ]);
661 let levels = Levels {
662 l0,
663 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
664 ..Default::default()
665 };
666 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
667 levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
668 let config = Arc::new(
669 CompactionConfigBuilder::new()
670 .level0_sub_level_compact_level_count(1)
671 .sub_level_max_compaction_bytes(300)
672 .build(),
673 );
674 let mut picker = IntraCompactionPicker::for_test(
675 config,
676 Arc::new(CompactionDeveloperConfig::default()),
677 );
678 let mut local_stats = LocalPickerStatistic::default();
679 let ret = picker
680 .pick_compaction(&levels, &levels_handler, &mut local_stats)
681 .unwrap();
682 ret.add_pending_task(1, &mut levels_handler);
683 assert_eq!(
684 ret.input_levels
685 .iter()
686 .map(|i| i.table_infos.len())
687 .sum::<usize>(),
688 3
689 );
690
691 assert_eq!(11, ret.input_levels[0].table_infos[0].sst_id);
692 assert_eq!(9, ret.input_levels[1].table_infos[0].sst_id);
693 assert_eq!(7, ret.input_levels[2].table_infos[0].sst_id);
694
695 let ret2 = picker
696 .pick_compaction(&levels, &levels_handler, &mut local_stats)
697 .unwrap();
698
699 assert_eq!(
700 ret2.input_levels
701 .iter()
702 .map(|i| i.table_infos.len())
703 .sum::<usize>(),
704 3
705 );
706
707 assert_eq!(5, ret2.input_levels[0].table_infos[0].sst_id);
708 assert_eq!(10, ret2.input_levels[1].table_infos[0].sst_id);
709 assert_eq!(2, ret2.input_levels[2].table_infos[0].sst_id);
710 }
711 }
712
713 fn is_l0_trivial_move(compaction_input: &CompactionInput) -> bool {
714 compaction_input.input_levels.len() == 2
715 && !compaction_input.input_levels[0].table_infos.is_empty()
716 && compaction_input.input_levels[1].table_infos.is_empty()
717 }
718
719 #[test]
720 fn test_trivial_move() {
721 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
722 let config = Arc::new(
723 CompactionConfigBuilder::new()
724 .level0_tier_compact_file_number(2)
725 .target_file_size_base(30)
726 .level0_sub_level_compact_level_count(20) .build(),
728 );
729 let mut picker =
730 IntraCompactionPicker::for_test(config, Arc::new(CompactionDeveloperConfig::default()));
731
732 let l0 = generate_l0_overlapping_sublevels(vec![vec![
734 generate_table(1, 1, 100, 110, 1),
735 generate_table(2, 1, 150, 250, 1),
736 ]]);
737 let levels = Levels {
738 l0,
739 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
740 ..Default::default()
741 };
742 levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
743 let mut local_stats = LocalPickerStatistic::default();
744 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
745 assert!(ret.is_none());
746
747 let l0: OverlappingLevel = generate_l0_overlapping_sublevels(vec![
749 vec![
750 generate_table(1, 1, 100, 110, 1),
751 generate_table(2, 1, 150, 250, 1),
752 ],
753 vec![generate_table(3, 1, 10, 90, 1)],
754 vec![generate_table(4, 1, 10, 90, 1)],
755 vec![generate_table(5, 1, 10, 90, 1)],
756 ]);
757 let mut levels = Levels {
758 l0,
759 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
760 ..Default::default()
761 };
762 assert!(
763 picker
764 .pick_compaction(&levels, &levels_handler, &mut local_stats)
765 .is_none()
766 );
767
768 levels.l0.sub_levels[0].level_type = LevelType::Nonoverlapping;
770 levels.l0.sub_levels[1].level_type = LevelType::Overlapping;
771 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
772 assert!(ret.is_none());
773
774 levels.l0.sub_levels[0].level_type = LevelType::Overlapping;
776 levels.l0.sub_levels[1].level_type = LevelType::Nonoverlapping;
777 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
778 assert!(ret.is_none());
779
780 levels.l0.sub_levels[0].level_type = LevelType::Nonoverlapping;
782 levels.l0.sub_levels[1].level_type = LevelType::Nonoverlapping;
783 let ret = picker
784 .pick_compaction(&levels, &levels_handler, &mut local_stats)
785 .unwrap();
786 assert!(is_l0_trivial_move(&ret));
787 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
788 }
789 #[test]
790 fn test_pick_whole_level() {
791 let config = Arc::new(
792 CompactionConfigBuilder::new()
793 .level0_max_compact_file_number(20)
794 .build(),
795 );
796 let mut table_infos = vec![];
797 for epoch in 1..3 {
798 let base = epoch * 100;
799 let mut ssts = vec![];
800 for i in 1..50 {
801 let left = (i as usize) * 100;
802 let right = left + 100;
803 ssts.push(generate_table(base + i, 1, left, right, epoch));
804 }
805 table_infos.push(ssts);
806 }
807
808 let l0 = generate_l0_nonoverlapping_multi_sublevels(table_infos);
809 let compaction_task_validator = Arc::new(CompactionTaskValidator::new(config.clone()));
810 let picker = WholeLevelCompactionPicker::new(config, compaction_task_validator);
811 let level_handler = LevelHandler::new(0);
812 let ret = picker
813 .pick_whole_level(&l0, &level_handler, 4, &mut LocalPickerStatistic::default())
814 .unwrap();
815 assert_eq!(ret.input_levels.len(), 2);
816 }
817
818 #[test]
819 fn test_priority() {
820 let config = Arc::new(
821 CompactionConfigBuilder::new()
822 .level0_max_compact_file_number(20)
823 .sub_level_max_compaction_bytes(1)
824 .level0_sub_level_compact_level_count(2)
825 .build(),
826 );
827 let mut table_infos = vec![];
828 for epoch in 1..3 {
829 let base = epoch * 100;
830 let mut ssts = vec![];
831 for i in 1..50 {
832 let left = (i as usize) * 100;
833 let right = left + 100;
834 ssts.push(generate_table(base + i, 1, left, right, epoch));
835 }
836 table_infos.push(ssts);
837 }
838
839 let mut l0 = generate_l0_nonoverlapping_multi_sublevels(table_infos);
840 l0.sub_levels[1]
842 .table_infos
843 .push(generate_table(9999, 900000000, 0, 100, 1));
844
845 l0.sub_levels[0].total_file_size = 1;
846 l0.sub_levels[1].total_file_size = 1;
847
848 let mut picker = IntraCompactionPicker::new_with_validator(
849 config,
850 Arc::new(CompactionTaskValidator::unused()),
851 Arc::new(CompactionDeveloperConfig::default()),
852 );
853 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
854 let mut local_stats = LocalPickerStatistic::default();
855
856 let levels = Levels {
857 l0,
858 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
859 ..Default::default()
860 };
861
862 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
863 assert!(is_l0_trivial_move(ret.as_ref().unwrap()));
864 ret.as_ref()
865 .unwrap()
866 .add_pending_task(1, &mut levels_handler);
867 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
868 assert!(ret.is_some());
869 let input = ret.as_ref().unwrap();
870 assert_eq!(input.input_levels.len(), 2);
871 assert_ne!(
872 levels.l0.sub_levels[0].table_infos.len(),
873 input.input_levels[0].table_infos.len()
874 );
875 assert_ne!(
876 levels.l0.sub_levels[1].table_infos.len(),
877 input.input_levels[1].table_infos.len()
878 );
879 }
880}