1use std::sync::Arc;
16
17use risingwave_common::config::meta::default::compaction_config;
18use risingwave_hummock_sdk::level::{InputLevel, Levels, OverlappingLevel};
19use risingwave_pb::hummock::{CompactionConfig, LevelType};
20
21use super::{
22 CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
23 ValidationRuleType,
24};
25use crate::hummock::compaction::picker::{NonOverlapSubLevelPicker, TrivialMovePicker};
26use crate::hummock::compaction::{CompactionDeveloperConfig, create_overlap_strategy};
27use crate::hummock::level_handler::LevelHandler;
28
29pub struct IntraCompactionPicker {
30 config: Arc<CompactionConfig>,
31 compaction_task_validator: Arc<CompactionTaskValidator>,
32
33 developer_config: Arc<CompactionDeveloperConfig>,
34}
35
36impl CompactionPicker for IntraCompactionPicker {
37 fn pick_compaction(
38 &mut self,
39 levels: &Levels,
40 level_handlers: &[LevelHandler],
41 stats: &mut LocalPickerStatistic,
42 ) -> Option<CompactionInput> {
43 let l0 = &levels.l0;
44 if l0.sub_levels.is_empty() {
45 return None;
46 }
47
48 if let Some(ret) = self.pick_l0_trivial_move_file(l0, level_handlers, stats) {
49 return Some(ret);
50 }
51
52 let vnode_partition_count = self.config.split_weight_by_vnode;
53
54 if let Some(ret) =
55 self.pick_whole_level(l0, &level_handlers[0], vnode_partition_count, stats)
56 {
57 if ret.input_levels.len() < 2 {
58 tracing::error!(
59 ?ret,
60 vnode_partition_count,
61 "pick_whole_level failed to pick enough levels"
62 );
63 return None;
64 }
65
66 return Some(ret);
67 }
68
69 if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], vnode_partition_count, stats)
70 {
71 if ret.input_levels.len() < 2 {
72 tracing::error!(
73 ?ret,
74 vnode_partition_count,
75 "pick_l0_intra failed to pick enough levels"
76 );
77 return None;
78 }
79
80 return Some(ret);
81 }
82
83 None
84 }
85}
86
87impl IntraCompactionPicker {
88 #[cfg(test)]
89 pub fn for_test(
90 config: Arc<CompactionConfig>,
91 developer_config: Arc<CompactionDeveloperConfig>,
92 ) -> IntraCompactionPicker {
93 IntraCompactionPicker {
94 compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
95 config,
96 developer_config,
97 }
98 }
99
100 pub fn new_with_validator(
101 config: Arc<CompactionConfig>,
102 compaction_task_validator: Arc<CompactionTaskValidator>,
103 developer_config: Arc<CompactionDeveloperConfig>,
104 ) -> IntraCompactionPicker {
105 assert!(config.level0_sub_level_compact_level_count > 1);
106 IntraCompactionPicker {
107 config,
108 compaction_task_validator,
109 developer_config,
110 }
111 }
112
113 fn pick_whole_level(
114 &self,
115 l0: &OverlappingLevel,
116 level_handler: &LevelHandler,
117 partition_count: u32,
118 stats: &mut LocalPickerStatistic,
119 ) -> Option<CompactionInput> {
120 let picker = WholeLevelCompactionPicker::new(
121 self.config.clone(),
122 self.compaction_task_validator.clone(),
123 );
124 picker.pick_whole_level(l0, level_handler, partition_count, stats)
125 }
126
127 fn pick_l0_intra(
128 &self,
129 l0: &OverlappingLevel,
130 level_handler: &LevelHandler,
131 vnode_partition_count: u32,
132 stats: &mut LocalPickerStatistic,
133 ) -> Option<CompactionInput> {
134 let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
135 let mut max_vnode_partition_idx = 0;
136 for (idx, level) in l0.sub_levels.iter().enumerate() {
137 if level.vnode_partition_count < vnode_partition_count {
138 break;
139 }
140 max_vnode_partition_idx = idx;
141 }
142
143 for (idx, level) in l0.sub_levels.iter().enumerate() {
144 if level.level_type != LevelType::Nonoverlapping
145 || level.total_file_size > self.config.sub_level_max_compaction_bytes
146 {
147 continue;
148 }
149
150 if idx > max_vnode_partition_idx {
151 break;
152 }
153
154 if level_handler.is_level_all_pending_compact(level) {
155 continue;
156 }
157
158 let max_compaction_bytes = std::cmp::min(
159 self.config.max_compaction_bytes,
160 self.config.sub_level_max_compaction_bytes,
161 );
162
163 let non_overlap_sub_level_picker = NonOverlapSubLevelPicker::new(
164 self.config.sub_level_max_compaction_bytes / 2,
165 max_compaction_bytes,
166 self.config.level0_sub_level_compact_level_count as usize,
167 self.config.level0_max_compact_file_number,
168 overlap_strategy.clone(),
169 self.developer_config.enable_check_task_level_overlap,
170 self.config
171 .max_l0_compact_level_count
172 .unwrap_or(compaction_config::max_l0_compact_level_count())
173 as usize,
174 self.config
175 .enable_optimize_l0_interval_selection
176 .unwrap_or(compaction_config::enable_optimize_l0_interval_selection()),
177 );
178
179 let candidate_l0_plans = non_overlap_sub_level_picker.pick_l0_multi_non_overlap_level(
180 &l0.sub_levels[idx..=max_vnode_partition_idx],
181 level_handler,
182 );
183
184 if candidate_l0_plans.is_empty() {
185 continue;
186 }
187
188 for input in candidate_l0_plans {
189 let mut task_input_size = 0;
190 let mut task_file_count = 0;
191 let mut select_level_inputs = Vec::with_capacity(input.sstable_infos.len());
192 let mut target_sub_level_id = None;
193 for (sub_level_id, level_select_sst) in input.sstable_infos {
194 if level_select_sst.is_empty() {
195 continue;
196 }
197
198 if target_sub_level_id.is_none() {
199 target_sub_level_id = Some(sub_level_id);
200 }
201
202 task_input_size += level_select_sst.iter().map(|sst| sst.sst_size).sum::<u64>();
203 task_file_count += level_select_sst.len();
204
205 select_level_inputs.push(InputLevel {
206 level_idx: 0,
207 level_type: LevelType::Nonoverlapping,
208 table_infos: level_select_sst,
209 });
210 }
211 select_level_inputs.reverse();
212
213 let result = CompactionInput {
214 input_levels: select_level_inputs,
215 target_sub_level_id: target_sub_level_id.unwrap(),
216 select_input_size: task_input_size,
217 total_file_count: task_file_count as u64,
218 ..Default::default()
219 };
220
221 if !self.compaction_task_validator.valid_compact_task(
222 &result,
223 ValidationRuleType::Intra,
224 stats,
225 ) {
226 continue;
227 }
228
229 return Some(result);
230 }
231 }
232
233 None
234 }
235
236 fn pick_l0_trivial_move_file(
237 &self,
238 l0: &OverlappingLevel,
239 level_handlers: &[LevelHandler],
240 stats: &mut LocalPickerStatistic,
241 ) -> Option<CompactionInput> {
242 if !self.developer_config.enable_trivial_move {
243 return None;
244 }
245
246 let overlap_strategy = create_overlap_strategy(self.config.compaction_mode());
247
248 for (idx, level) in l0.sub_levels.iter().enumerate() {
249 if level.level_type == LevelType::Overlapping || idx + 1 >= l0.sub_levels.len() {
250 continue;
251 }
252
253 if l0.sub_levels[idx + 1].level_type == LevelType::Overlapping {
254 continue;
255 }
256
257 if level_handlers[0].is_level_pending_compact(level) {
258 continue;
259 }
260
261 if l0.sub_levels[idx + 1].vnode_partition_count
262 != l0.sub_levels[idx].vnode_partition_count
263 {
264 continue;
265 }
266
267 let trivial_move_picker = TrivialMovePicker::new(
268 0,
269 0,
270 overlap_strategy.clone(),
271 0,
272 self.config
273 .sst_allowed_trivial_move_max_count
274 .unwrap_or(compaction_config::sst_allowed_trivial_move_max_count())
275 as usize,
276 );
277
278 if let Some(select_ssts) = trivial_move_picker.pick_multi_trivial_move_ssts(
279 &l0.sub_levels[idx + 1].table_infos,
280 &level.table_infos,
281 level_handlers,
282 stats,
283 ) {
284 let mut overlap = overlap_strategy.create_overlap_info();
285 select_ssts
286 .iter()
287 .for_each(|sst| overlap.update(&sst.key_range));
288
289 assert!(
290 overlap
291 .check_multiple_overlap(&l0.sub_levels[idx].table_infos)
292 .is_empty()
293 );
294
295 let select_input_size = select_ssts.iter().map(|sst| sst.sst_size).sum();
296 let total_file_count = select_ssts.len() as u64;
297 let input_levels = vec![
298 InputLevel {
299 level_idx: 0,
300 level_type: LevelType::Nonoverlapping,
301 table_infos: select_ssts,
302 },
303 InputLevel {
304 level_idx: 0,
305 level_type: LevelType::Nonoverlapping,
306 table_infos: vec![],
307 },
308 ];
309 return Some(CompactionInput {
310 input_levels,
311 target_level: 0,
312 target_sub_level_id: level.sub_level_id,
313 select_input_size,
314 total_file_count,
315 ..Default::default()
316 });
317 }
318 }
319 None
320 }
321}
322
323pub struct WholeLevelCompactionPicker {
324 config: Arc<CompactionConfig>,
325 compaction_task_validator: Arc<CompactionTaskValidator>,
326}
327
328impl WholeLevelCompactionPicker {
329 pub fn new(
330 config: Arc<CompactionConfig>,
331 compaction_task_validator: Arc<CompactionTaskValidator>,
332 ) -> Self {
333 Self {
334 config,
335 compaction_task_validator,
336 }
337 }
338
339 pub fn pick_whole_level(
340 &self,
341 l0: &OverlappingLevel,
342 level_handler: &LevelHandler,
343 partition_count: u32,
344 stats: &mut LocalPickerStatistic,
345 ) -> Option<CompactionInput> {
346 if partition_count == 0 {
347 return None;
348 }
349 for (idx, level) in l0.sub_levels.iter().enumerate() {
350 if level.level_type != LevelType::Nonoverlapping
351 || level.vnode_partition_count == partition_count
352 {
353 continue;
354 }
355
356 let max_compaction_bytes = std::cmp::max(
357 self.config.max_bytes_for_level_base,
358 self.config.sub_level_max_compaction_bytes
359 * (self.config.level0_sub_level_compact_level_count as u64),
360 );
361
362 let mut select_input_size = 0;
363
364 let mut select_level_inputs = vec![];
365 let mut total_file_count = 0;
366 let mut wait_enough = false;
367 for next_level in l0.sub_levels.iter().skip(idx) {
368 if (select_input_size > max_compaction_bytes
369 || total_file_count > self.config.level0_max_compact_file_number
370 || next_level.vnode_partition_count == partition_count)
371 && select_level_inputs.len() > 1
372 {
373 wait_enough = true;
374 break;
375 }
376
377 if level_handler.is_level_pending_compact(next_level) {
378 break;
379 }
380
381 select_input_size += next_level.total_file_size;
382 total_file_count += next_level.table_infos.len() as u64;
383
384 select_level_inputs.push(InputLevel {
385 level_idx: 0,
386 level_type: next_level.level_type,
387 table_infos: next_level.table_infos.clone(),
388 });
389 }
390 if select_level_inputs.len() > 1 {
391 let vnode_partition_count =
392 if select_input_size > self.config.sub_level_max_compaction_bytes / 2 {
393 partition_count
394 } else {
395 0
396 };
397 let result = CompactionInput {
398 input_levels: select_level_inputs,
399 target_sub_level_id: level.sub_level_id,
400 select_input_size,
401 total_file_count,
402 vnode_partition_count,
403 skip_target_range_conflict_check: true,
404 ..Default::default()
405 };
406 if wait_enough
407 || self.compaction_task_validator.valid_compact_task(
408 &result,
409 ValidationRuleType::Intra,
410 stats,
411 )
412 {
413 return Some(result);
414 }
415 }
416 }
417
418 None
419 }
420}
421
422#[cfg(test)]
423pub mod tests {
424 use risingwave_hummock_sdk::level::Level;
425
426 use super::*;
427 use crate::hummock::compaction::TierCompactionPicker;
428 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
429 use crate::hummock::compaction::selector::tests::{
430 generate_l0_nonoverlapping_multi_sublevels, generate_l0_nonoverlapping_sublevels,
431 generate_l0_overlapping_sublevels, generate_level, generate_table,
432 push_table_level0_overlapping, push_tables_level0_nonoverlapping,
433 };
434
435 fn create_compaction_picker_for_test() -> IntraCompactionPicker {
436 let config = Arc::new(
437 CompactionConfigBuilder::new()
438 .level0_tier_compact_file_number(2)
439 .level0_sub_level_compact_level_count(1)
440 .build(),
441 );
442 IntraCompactionPicker::for_test(config, Arc::new(CompactionDeveloperConfig::default()))
443 }
444
445 #[test]
446 fn test_l0_to_l1_compact_conflict() {
447 let levels = vec![Level {
450 level_idx: 1,
451 level_type: LevelType::Nonoverlapping,
452 table_infos: vec![],
453 ..Default::default()
454 }];
455 let mut levels = Levels {
456 levels,
457 l0: OverlappingLevel {
458 sub_levels: vec![],
459 total_file_size: 0,
460 uncompressed_file_size: 0,
461 },
462 ..Default::default()
463 };
464 push_tables_level0_nonoverlapping(
465 &mut levels,
466 vec![
467 generate_table(1, 1, 100, 300, 2),
468 generate_table(2, 1, 350, 500, 2),
469 ],
470 );
471 let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
472
473 let mut local_stats = LocalPickerStatistic::default();
474 push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(3, 1, 250, 300, 3)]);
475 let config: CompactionConfig = CompactionConfigBuilder::new()
476 .level0_tier_compact_file_number(2)
477 .max_compaction_bytes(1000)
478 .sub_level_max_compaction_bytes(150)
479 .max_bytes_for_level_multiplier(1)
480 .level0_sub_level_compact_level_count(3)
481 .build();
482 let mut picker = TierCompactionPicker::new(Arc::new(config));
483
484 let ret: Option<CompactionInput> =
485 picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
486 assert!(ret.is_none());
487 }
488
489 #[test]
490 fn test_compacting_key_range_overlap_intra_l0() {
491 let mut picker = create_compaction_picker_for_test();
494
495 let mut levels = Levels {
496 levels: vec![Level {
497 level_idx: 1,
498 level_type: LevelType::Nonoverlapping,
499 table_infos: vec![generate_table(3, 1, 200, 300, 2)],
500 ..Default::default()
501 }],
502 l0: generate_l0_nonoverlapping_sublevels(vec![
503 generate_table(1, 1, 100, 210, 2),
504 generate_table(2, 1, 200, 250, 2),
505 ]),
506 ..Default::default()
507 };
508 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
509
510 let mut local_stats = LocalPickerStatistic::default();
511 let ret = picker
512 .pick_compaction(&levels, &levels_handler, &mut local_stats)
513 .unwrap();
514 ret.add_pending_task(0, &mut levels_handler);
515
516 push_table_level0_overlapping(&mut levels, generate_table(4, 1, 170, 180, 3));
517 assert!(
518 picker
519 .pick_compaction(&levels, &levels_handler, &mut local_stats)
520 .is_none()
521 );
522 }
523
524 #[test]
525 fn test_pick_l0_intra() {
526 {
527 let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
528 vec![
529 generate_table(6, 1, 50, 99, 1),
530 generate_table(1, 1, 100, 200, 1),
531 generate_table(2, 1, 250, 300, 1),
532 ],
533 vec![
534 generate_table(3, 1, 10, 90, 1),
535 generate_table(6, 1, 100, 110, 1),
536 ],
537 vec![
538 generate_table(4, 1, 50, 99, 1),
539 generate_table(5, 1, 100, 200, 1),
540 ],
541 ]);
542 let levels = Levels {
543 l0,
544 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
545 ..Default::default()
546 };
547 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
548 levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
549 let config = Arc::new(
550 CompactionConfigBuilder::new()
551 .level0_sub_level_compact_level_count(1)
552 .level0_overlapping_sub_level_compact_level_count(4)
553 .build(),
554 );
555 let mut picker = IntraCompactionPicker::for_test(
556 config,
557 Arc::new(CompactionDeveloperConfig::default()),
558 );
559 let mut local_stats = LocalPickerStatistic::default();
560 let ret = picker
561 .pick_compaction(&levels, &levels_handler, &mut local_stats)
562 .unwrap();
563 ret.add_pending_task(1, &mut levels_handler);
564 assert_eq!(
565 ret.input_levels
566 .iter()
567 .map(|i| i.table_infos.len())
568 .sum::<usize>(),
569 3
570 );
571 }
572
573 {
574 let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
577 vec![
578 generate_table(1, 1, 100, 200, 1),
579 generate_table(2, 1, 300, 400, 1),
580 ],
581 vec![
582 generate_table(3, 1, 100, 200, 1),
583 generate_table(6, 1, 300, 500, 1),
584 ],
585 vec![
586 generate_table(4, 1, 100, 200, 1),
587 generate_table(5, 1, 300, 400, 1),
588 ],
589 ]);
590 let levels = Levels {
591 l0,
592 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
593 ..Default::default()
594 };
595 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
596 levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
597 let config = Arc::new(
598 CompactionConfigBuilder::new()
599 .level0_sub_level_compact_level_count(1)
600 .sub_level_max_compaction_bytes(300)
601 .build(),
602 );
603 let mut picker = IntraCompactionPicker::for_test(
604 config,
605 Arc::new(CompactionDeveloperConfig::default()),
606 );
607 let mut local_stats = LocalPickerStatistic::default();
608 let ret = picker
609 .pick_compaction(&levels, &levels_handler, &mut local_stats)
610 .unwrap();
611 ret.add_pending_task(1, &mut levels_handler);
612 assert_eq!(
613 ret.input_levels
614 .iter()
615 .map(|i| i.table_infos.len())
616 .sum::<usize>(),
617 3
618 );
619
620 assert_eq!(4, ret.input_levels[0].table_infos[0].sst_id);
621 assert_eq!(3, ret.input_levels[1].table_infos[0].sst_id);
622 assert_eq!(1, ret.input_levels[2].table_infos[0].sst_id);
623
624 let ret2 = picker
626 .pick_compaction(&levels, &levels_handler, &mut local_stats)
627 .unwrap();
628
629 assert_eq!(
630 ret2.input_levels
631 .iter()
632 .map(|i| i.table_infos.len())
633 .sum::<usize>(),
634 2
635 );
636
637 assert_eq!(6, ret2.input_levels[0].table_infos[0].sst_id);
638 assert_eq!(2, ret2.input_levels[1].table_infos[0].sst_id);
639 }
640
641 {
642 let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
643 vec![
644 generate_table(1, 1, 100, 149, 1),
645 generate_table(6, 1, 150, 199, 1),
646 generate_table(7, 1, 200, 250, 1),
647 generate_table(2, 1, 300, 400, 1),
648 ],
649 vec![
650 generate_table(3, 1, 100, 149, 1),
651 generate_table(8, 1, 150, 199, 1),
652 generate_table(9, 1, 200, 250, 1),
653 generate_table(10, 1, 300, 400, 1),
654 ],
655 vec![
656 generate_table(4, 1, 100, 199, 1),
657 generate_table(11, 1, 200, 250, 1),
658 generate_table(5, 1, 300, 350, 1),
659 ],
660 ]);
661 let levels = Levels {
662 l0,
663 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
664 ..Default::default()
665 };
666 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
667 levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
668 let config = Arc::new(
669 CompactionConfigBuilder::new()
670 .level0_sub_level_compact_level_count(1)
671 .sub_level_max_compaction_bytes(300)
672 .build(),
673 );
674 let mut picker = IntraCompactionPicker::for_test(
675 config,
676 Arc::new(CompactionDeveloperConfig::default()),
677 );
678 let mut local_stats = LocalPickerStatistic::default();
679 let ret = picker
680 .pick_compaction(&levels, &levels_handler, &mut local_stats)
681 .unwrap();
682 ret.add_pending_task(1, &mut levels_handler);
683 assert_eq!(
684 ret.input_levels
685 .iter()
686 .map(|i| i.table_infos.len())
687 .sum::<usize>(),
688 3
689 );
690
691 assert_eq!(11, ret.input_levels[0].table_infos[0].sst_id);
692 assert_eq!(9, ret.input_levels[1].table_infos[0].sst_id);
693 assert_eq!(7, ret.input_levels[2].table_infos[0].sst_id);
694
695 let ret2 = picker
696 .pick_compaction(&levels, &levels_handler, &mut local_stats)
697 .unwrap();
698
699 assert_eq!(
700 ret2.input_levels
701 .iter()
702 .map(|i| i.table_infos.len())
703 .sum::<usize>(),
704 3
705 );
706
707 assert_eq!(5, ret2.input_levels[0].table_infos[0].sst_id);
708 assert_eq!(10, ret2.input_levels[1].table_infos[0].sst_id);
709 assert_eq!(2, ret2.input_levels[2].table_infos[0].sst_id);
710 }
711
712 {
713 let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![
715 vec![
716 generate_table(1, 1, 0, 50, 1), generate_table(2, 1, 100, 150, 1), ],
719 vec![generate_table(3, 1, 0, 150, 1)], ]);
721 let levels = Levels {
722 l0,
723 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
724 ..Default::default()
725 };
726 let levels_handler = [LevelHandler::new(0), LevelHandler::new(1)];
727 let config = Arc::new(
728 CompactionConfigBuilder::new()
729 .level0_sub_level_compact_level_count(2)
730 .level0_max_compact_file_number(2)
731 .sub_level_max_compaction_bytes(500)
732 .max_compaction_bytes(300) .build(),
734 );
735 let picker = IntraCompactionPicker::for_test(
736 config,
737 Arc::new(CompactionDeveloperConfig::default()),
738 );
739 let mut local_stats = LocalPickerStatistic::default();
740 let ret = picker
741 .pick_l0_intra(
742 &levels.l0,
743 &levels_handler[0],
744 levels.l0.sub_levels[0].vnode_partition_count,
745 &mut local_stats,
746 )
747 .unwrap();
748
749 assert_eq!(ret.input_levels.len(), 2);
751
752 let actual_file_count: usize = ret
753 .input_levels
754 .iter()
755 .map(|lvl| lvl.table_infos.len())
756 .sum();
757 assert_eq!(ret.total_file_count as usize, actual_file_count);
758
759 let expect_size: u64 = ret
760 .input_levels
761 .iter()
762 .flat_map(|lvl| lvl.table_infos.iter())
763 .map(|sst| sst.sst_size)
764 .sum();
765 assert_eq!(ret.select_input_size, expect_size);
766 }
767 }
768
769 fn is_l0_trivial_move(compaction_input: &CompactionInput) -> bool {
770 compaction_input.input_levels.len() == 2
771 && !compaction_input.input_levels[0].table_infos.is_empty()
772 && compaction_input.input_levels[1].table_infos.is_empty()
773 }
774
775 #[test]
776 fn test_trivial_move() {
777 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
778 let config = Arc::new(
779 CompactionConfigBuilder::new()
780 .level0_tier_compact_file_number(2)
781 .target_file_size_base(30)
782 .level0_sub_level_compact_level_count(20) .build(),
784 );
785 let mut picker =
786 IntraCompactionPicker::for_test(config, Arc::new(CompactionDeveloperConfig::default()));
787
788 let l0 = generate_l0_overlapping_sublevels(vec![vec![
790 generate_table(1, 1, 100, 110, 1),
791 generate_table(2, 1, 150, 250, 1),
792 ]]);
793 let levels = Levels {
794 l0,
795 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
796 ..Default::default()
797 };
798 levels_handler[1].add_pending_task(100, 1, &levels.levels[0].table_infos);
799 let mut local_stats = LocalPickerStatistic::default();
800 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
801 assert!(ret.is_none());
802
803 let l0: OverlappingLevel = generate_l0_overlapping_sublevels(vec![
805 vec![
806 generate_table(1, 1, 100, 110, 1),
807 generate_table(2, 1, 150, 250, 1),
808 ],
809 vec![generate_table(3, 1, 10, 90, 1)],
810 vec![generate_table(4, 1, 10, 90, 1)],
811 vec![generate_table(5, 1, 10, 90, 1)],
812 ]);
813 let mut levels = Levels {
814 l0,
815 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
816 ..Default::default()
817 };
818 assert!(
819 picker
820 .pick_compaction(&levels, &levels_handler, &mut local_stats)
821 .is_none()
822 );
823
824 levels.l0.sub_levels[0].level_type = LevelType::Nonoverlapping;
826 levels.l0.sub_levels[1].level_type = LevelType::Overlapping;
827 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
828 assert!(ret.is_none());
829
830 levels.l0.sub_levels[0].level_type = LevelType::Overlapping;
832 levels.l0.sub_levels[1].level_type = LevelType::Nonoverlapping;
833 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
834 assert!(ret.is_none());
835
836 levels.l0.sub_levels[0].level_type = LevelType::Nonoverlapping;
838 levels.l0.sub_levels[1].level_type = LevelType::Nonoverlapping;
839 let ret = picker
840 .pick_compaction(&levels, &levels_handler, &mut local_stats)
841 .unwrap();
842 assert!(is_l0_trivial_move(&ret));
843 assert_eq!(ret.input_levels[0].table_infos.len(), 1);
844 }
845 #[test]
846 fn test_pick_whole_level() {
847 let config = Arc::new(
848 CompactionConfigBuilder::new()
849 .level0_max_compact_file_number(20)
850 .build(),
851 );
852 let mut table_infos = vec![];
853 for epoch in 1..3 {
854 let base = epoch * 100;
855 let mut ssts = vec![];
856 for i in 1..50 {
857 let left = (i as usize) * 100;
858 let right = left + 100;
859 ssts.push(generate_table(base + i, 1, left, right, epoch));
860 }
861 table_infos.push(ssts);
862 }
863
864 let l0 = generate_l0_nonoverlapping_multi_sublevels(table_infos);
865 let compaction_task_validator = Arc::new(CompactionTaskValidator::new(config.clone()));
866 let picker = WholeLevelCompactionPicker::new(config, compaction_task_validator);
867 let level_handler = LevelHandler::new(0);
868 let ret = picker
869 .pick_whole_level(&l0, &level_handler, 4, &mut LocalPickerStatistic::default())
870 .unwrap();
871 assert_eq!(ret.input_levels.len(), 2);
872 }
873
874 #[test]
875 fn test_priority() {
876 let config = Arc::new(
877 CompactionConfigBuilder::new()
878 .level0_max_compact_file_number(20)
879 .sub_level_max_compaction_bytes(1)
880 .level0_sub_level_compact_level_count(2)
881 .build(),
882 );
883 let mut table_infos = vec![];
884 for epoch in 1..3 {
885 let base = epoch * 100;
886 let mut ssts = vec![];
887 for i in 1..50 {
888 let left = (i as usize) * 100;
889 let right = left + 100;
890 ssts.push(generate_table(base + i, 1, left, right, epoch));
891 }
892 table_infos.push(ssts);
893 }
894
895 let mut l0 = generate_l0_nonoverlapping_multi_sublevels(table_infos);
896 l0.sub_levels[1]
898 .table_infos
899 .push(generate_table(9999, 900000000, 0, 100, 1));
900
901 l0.sub_levels[0].total_file_size = 1;
902 l0.sub_levels[1].total_file_size = 1;
903
904 let mut picker = IntraCompactionPicker::new_with_validator(
905 config,
906 Arc::new(CompactionTaskValidator::unused()),
907 Arc::new(CompactionDeveloperConfig::default()),
908 );
909 let mut levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
910 let mut local_stats = LocalPickerStatistic::default();
911
912 let levels = Levels {
913 l0,
914 levels: vec![generate_level(1, vec![generate_table(100, 1, 0, 1000, 1)])],
915 ..Default::default()
916 };
917
918 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
919 assert!(is_l0_trivial_move(ret.as_ref().unwrap()));
920 ret.as_ref()
921 .unwrap()
922 .add_pending_task(1, &mut levels_handler);
923 let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
924 assert!(ret.is_some());
925 let input = ret.as_ref().unwrap();
926 assert_eq!(input.input_levels.len(), 2);
927 assert_ne!(
928 levels.l0.sub_levels[0].table_infos.len(),
929 input.input_levels[0].table_infos.len()
930 );
931 assert_ne!(
932 levels.l0.sub_levels[1].table_infos.len(),
933 input.input_levels[1].table_infos.len()
934 );
935 }
936}