1use std::sync::Arc;
21
22use risingwave_hummock_sdk::HummockCompactionTaskId;
23use risingwave_hummock_sdk::level::Levels;
24use risingwave_pb::hummock::compact_task::PbTaskType;
25use risingwave_pb::hummock::{CompactionConfig, LevelType};
26
27use super::{
28 CompactionSelector, LevelCompactionPicker, TierCompactionPicker, create_compaction_task,
29};
30use crate::hummock::compaction::overlap_strategy::OverlapStrategy;
31use crate::hummock::compaction::picker::{
32 CompactionPicker, CompactionTaskValidator, IntraCompactionPicker, LocalPickerStatistic,
33 MinOverlappingPicker,
34};
35use crate::hummock::compaction::selector::CompactionSelectorContext;
36use crate::hummock::compaction::{
37 CompactionDeveloperConfig, CompactionTask, create_overlap_strategy,
38};
39use crate::hummock::level_handler::LevelHandler;
40
41pub const SCORE_BASE: u64 = 100;
42
43#[derive(Debug, Default, Clone)]
44pub enum PickerType {
45 Tier,
46 Intra,
47 ToBase,
48 #[default]
49 BottomLevel,
50}
51
52impl std::fmt::Display for PickerType {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.write_str(match self {
55 PickerType::Tier => "Tier",
56 PickerType::Intra => "Intra",
57 PickerType::ToBase => "ToBase",
58 PickerType::BottomLevel => "BottomLevel",
59 })
60 }
61}
62
63#[derive(Default, Debug)]
64pub struct PickerInfo {
65 pub score: u64,
66 pub select_level: usize,
67 pub target_level: usize,
68 pub picker_type: PickerType,
69}
70
71#[derive(Default, Debug)]
72pub struct SelectContext {
73 pub level_max_bytes: Vec<u64>,
74
75 pub base_level: usize,
80 pub score_levels: Vec<PickerInfo>,
81}
82
83pub struct DynamicLevelSelectorCore {
84 config: Arc<CompactionConfig>,
85 developer_config: Arc<CompactionDeveloperConfig>,
86}
87
88#[derive(Default)]
89pub struct DynamicLevelSelector {}
90
91impl DynamicLevelSelectorCore {
92 pub fn new(
93 config: Arc<CompactionConfig>,
94 developer_config: Arc<CompactionDeveloperConfig>,
95 ) -> Self {
96 Self {
97 config,
98 developer_config,
99 }
100 }
101
102 pub fn get_config(&self) -> &CompactionConfig {
103 self.config.as_ref()
104 }
105
106 fn create_compaction_picker(
107 &self,
108 picker_info: &PickerInfo,
109 overlap_strategy: Arc<dyn OverlapStrategy>,
110 compaction_task_validator: Arc<CompactionTaskValidator>,
111 ) -> Box<dyn CompactionPicker> {
112 match picker_info.picker_type {
113 PickerType::Tier => Box::new(TierCompactionPicker::new_with_validator(
114 self.config.clone(),
115 compaction_task_validator,
116 )),
117 PickerType::ToBase => Box::new(LevelCompactionPicker::new_with_validator(
118 picker_info.target_level,
119 self.config.clone(),
120 compaction_task_validator,
121 self.developer_config.clone(),
122 )),
123 PickerType::Intra => Box::new(IntraCompactionPicker::new_with_validator(
124 self.config.clone(),
125 compaction_task_validator,
126 self.developer_config.clone(),
127 )),
128 PickerType::BottomLevel => {
129 assert_eq!(picker_info.select_level + 1, picker_info.target_level);
130 Box::new(MinOverlappingPicker::new(
131 picker_info.select_level,
132 picker_info.target_level,
133 self.config.max_bytes_for_level_base / 2,
134 self.config.split_weight_by_vnode,
135 overlap_strategy,
136 ))
137 }
138 }
139 }
140
141 pub fn calculate_level_base_size(&self, levels: &Levels) -> SelectContext {
146 let mut first_non_empty_level = 0;
147 let mut max_level_size = 0;
148 let mut ctx = SelectContext::default();
149
150 for level in &levels.levels {
151 if level.total_file_size > 0 && first_non_empty_level == 0 {
152 first_non_empty_level = level.level_idx as usize;
153 }
154 max_level_size = std::cmp::max(max_level_size, level.total_file_size);
155 }
156
157 ctx.level_max_bytes
158 .resize(self.config.max_level as usize + 1, u64::MAX);
159
160 if max_level_size == 0 {
161 ctx.base_level = self.config.max_level as usize;
163 return ctx;
164 }
165
166 let base_bytes_max = self.config.max_bytes_for_level_base;
167 let base_bytes_min = base_bytes_max / self.config.max_bytes_for_level_multiplier;
168
169 let mut cur_level_size = max_level_size;
170 for _ in first_non_empty_level..self.config.max_level as usize {
171 cur_level_size /= self.config.max_bytes_for_level_multiplier;
172 }
173
174 let base_level_size = if cur_level_size <= base_bytes_min {
175 ctx.base_level = first_non_empty_level;
179 base_bytes_min + 1
180 } else {
181 ctx.base_level = first_non_empty_level;
182 while ctx.base_level > 1 && cur_level_size > base_bytes_max {
183 ctx.base_level -= 1;
184 cur_level_size /= self.config.max_bytes_for_level_multiplier;
185 }
186 std::cmp::min(base_bytes_max, cur_level_size)
187 };
188
189 let level_multiplier = self.config.max_bytes_for_level_multiplier as f64;
190 let mut level_size = base_level_size;
191 for i in ctx.base_level..=self.config.max_level as usize {
192 ctx.level_max_bytes[i] = std::cmp::max(level_size, base_bytes_max);
197 level_size = (level_size as f64 * level_multiplier) as u64;
198 }
199 ctx
200 }
201
202 pub(crate) fn get_priority_levels(
203 &self,
204 levels: &Levels,
205 handlers: &[LevelHandler],
206 ) -> SelectContext {
207 let mut ctx = self.calculate_level_base_size(levels);
208
209 let l0_file_count = levels
210 .l0
211 .sub_levels
212 .iter()
213 .map(|sub_level| sub_level.table_infos.len())
214 .sum::<usize>();
215
216 let idle_file_count = match l0_file_count.checked_sub(handlers[0].pending_file_count()) {
217 Some(count) => count,
218 None => {
219 tracing::warn!(
222 "The number of files in L0 {} is less than the number of pending files {} group {} pending_tasks_ids {:?} compacting_files {:?}",
223 l0_file_count,
224 handlers[0].pending_file_count(),
225 levels.group_id,
226 handlers[0].pending_tasks_ids(),
227 handlers[0].compacting_files()
228 );
229
230 0
231 }
232 };
233
234 if idle_file_count > 0 {
235 let overlapping_file_count = levels
241 .l0
242 .sub_levels
243 .iter()
244 .filter(|level| level.level_type == LevelType::Overlapping)
245 .map(|level| level.table_infos.len())
246 .sum::<usize>();
247 if overlapping_file_count > 0 {
248 let l0_overlapping_score =
250 std::cmp::min(idle_file_count, overlapping_file_count) as u64 * SCORE_BASE
251 / self.config.level0_tier_compact_file_number;
252 ctx.score_levels.push(PickerInfo {
254 score: std::cmp::max(l0_overlapping_score, SCORE_BASE + 1),
255 select_level: 0,
256 target_level: 0,
257 picker_type: PickerType::Tier,
258 })
259 }
260
261 let total_size = levels
266 .l0
267 .sub_levels
268 .iter()
269 .filter(|level| {
270 level.vnode_partition_count == self.config.split_weight_by_vnode
271 && level.level_type == LevelType::Nonoverlapping
272 })
273 .map(|level| level.total_file_size)
274 .sum::<u64>()
275 .saturating_sub(handlers[0].pending_output_file_size(ctx.base_level as u32));
276 let base_level_size = levels.get_level(ctx.base_level).total_file_size;
277 let base_level_sst_count = levels.get_level(ctx.base_level).table_infos.len() as u64;
278
279 let non_overlapping_size_score = total_size * SCORE_BASE
281 / std::cmp::max(self.config.max_bytes_for_level_base, base_level_size);
282 let non_overlapping_level_count = levels
284 .l0
285 .sub_levels
286 .iter()
287 .filter(|level| level.level_type == LevelType::Nonoverlapping)
288 .count() as u64;
289 let non_overlapping_level_score = non_overlapping_level_count * SCORE_BASE
290 / std::cmp::max(
291 base_level_sst_count / 16,
292 self.config.level0_sub_level_compact_level_count as u64,
293 );
294
295 let non_overlapping_score =
296 std::cmp::max(non_overlapping_size_score, non_overlapping_level_score);
297
298 if non_overlapping_size_score > SCORE_BASE {
300 ctx.score_levels.push(PickerInfo {
301 score: non_overlapping_score + 1,
302 select_level: 0,
303 target_level: ctx.base_level,
304 picker_type: PickerType::ToBase,
305 });
306 }
307
308 if non_overlapping_level_score > SCORE_BASE {
309 ctx.score_levels.push(PickerInfo {
311 score: non_overlapping_score,
312 select_level: 0,
313 target_level: 0,
314 picker_type: PickerType::Intra,
315 });
316 }
317 }
318
319 for level in &levels.levels {
321 let level_idx = level.level_idx as usize;
322 if level_idx < ctx.base_level || level_idx >= self.config.max_level as usize {
323 continue;
324 }
325 let output_file_size =
326 handlers[level_idx].pending_output_file_size(level.level_idx + 1);
327 let total_size = level.total_file_size.saturating_sub(output_file_size);
328 if total_size == 0 {
329 continue;
330 }
331
332 ctx.score_levels.push({
333 PickerInfo {
334 score: total_size * SCORE_BASE / ctx.level_max_bytes[level_idx],
335 select_level: level_idx,
336 target_level: level_idx + 1,
337 picker_type: PickerType::BottomLevel,
338 }
339 });
340 }
341
342 ctx.score_levels.sort_by(|a, b| {
344 b.score
345 .cmp(&a.score)
346 .then_with(|| a.target_level.cmp(&b.target_level))
347 });
348 ctx
349 }
350
351 pub fn compact_pending_bytes_needed(&self, levels: &Levels) -> u64 {
356 let ctx = self.calculate_level_base_size(levels);
357 self.compact_pending_bytes_needed_with_ctx(levels, &ctx)
358 }
359
360 pub fn compact_pending_bytes_needed_with_ctx(
361 &self,
362 levels: &Levels,
363 ctx: &SelectContext,
364 ) -> u64 {
365 let mut compact_pending_bytes = 0;
367 let mut compact_to_next_level_bytes = 0;
368 let l0_size = levels
369 .l0
370 .sub_levels
371 .iter()
372 .map(|sub_level| sub_level.total_file_size)
373 .sum::<u64>();
374
375 let mut l0_compaction_trigger = false;
376 if l0_size > self.config.max_bytes_for_level_base {
377 compact_pending_bytes = l0_size;
378 compact_to_next_level_bytes = l0_size;
379 l0_compaction_trigger = true;
380 }
381
382 let mut level_bytes;
384 let mut next_level_bytes = 0;
385 for level in &levels.levels[ctx.base_level - 1..levels.levels.len()] {
386 let level_index = level.level_idx as usize;
387
388 if next_level_bytes > 0 {
389 level_bytes = next_level_bytes;
390 next_level_bytes = 0;
391 } else {
392 level_bytes = level.total_file_size;
393 }
394
395 if level_index == ctx.base_level && l0_compaction_trigger {
396 compact_pending_bytes += level_bytes;
397 }
398
399 level_bytes += compact_to_next_level_bytes;
400 compact_to_next_level_bytes = 0;
401 let level_target = ctx.level_max_bytes[level_index];
402 if level_bytes > level_target {
403 compact_to_next_level_bytes = level_bytes - level_target;
404
405 assert_eq!(0, next_level_bytes);
408 if level_index + 1 < ctx.level_max_bytes.len() {
409 let next_level = level_index + 1;
410 next_level_bytes = levels.levels[next_level - 1].total_file_size;
411 }
412
413 if next_level_bytes > 0 {
414 compact_pending_bytes += (compact_to_next_level_bytes as f64
415 * (next_level_bytes as f64 / level_bytes as f64 + 1.0))
416 as u64;
417 }
418 }
419 }
420
421 compact_pending_bytes
422 }
423}
424
425impl CompactionSelector for DynamicLevelSelector {
426 fn pick_compaction(
427 &mut self,
428 task_id: HummockCompactionTaskId,
429 context: CompactionSelectorContext<'_>,
430 ) -> Option<CompactionTask> {
431 let CompactionSelectorContext {
432 group: compaction_group,
433 levels,
434 level_handlers,
435 selector_stats,
436 developer_config,
437 in_progress_compactions,
438 ..
439 } = context;
440 let dynamic_level_core = DynamicLevelSelectorCore::new(
441 compaction_group.compaction_config.clone(),
442 developer_config,
443 );
444 let overlap_strategy =
445 create_overlap_strategy(compaction_group.compaction_config.compaction_mode());
446 let ctx = dynamic_level_core.get_priority_levels(levels, level_handlers);
447 let compaction_task_validator = Arc::new(CompactionTaskValidator::new(
449 compaction_group.compaction_config.clone(),
450 ));
451 for picker_info in &ctx.score_levels {
452 if picker_info.score <= SCORE_BASE {
453 return None;
454 }
455 let mut picker = dynamic_level_core.create_compaction_picker(
456 picker_info,
457 overlap_strategy.clone(),
458 compaction_task_validator.clone(),
459 );
460
461 let mut stats = LocalPickerStatistic::default();
462 if let Some(ret) = picker.pick_compaction(levels, level_handlers, &mut stats) {
463 if !ret.skip_target_range_conflict_check
464 && in_progress_compactions.has_conflict_with_input(&ret)
465 {
466 stats.skip_by_overlapping += 1;
467 selector_stats.skip_picker.push((
468 picker_info.select_level,
469 picker_info.target_level,
470 stats,
471 ));
472 continue;
473 }
474
475 ret.add_pending_task(task_id, level_handlers);
476 return Some(create_compaction_task(
477 dynamic_level_core.get_config(),
478 ret,
479 ctx.base_level,
480 self.task_type(),
481 ));
482 }
483 selector_stats.skip_picker.push((
484 picker_info.select_level,
485 picker_info.target_level,
486 stats,
487 ));
488 }
489 None
490 }
491
492 fn name(&self) -> &'static str {
493 "DynamicLevelSelector"
494 }
495
496 fn task_type(&self) -> PbTaskType {
497 PbTaskType::Dynamic
498 }
499}
500
501#[cfg(test)]
502pub mod tests {
503 use std::collections::{BTreeSet, HashMap};
504 use std::sync::Arc;
505
506 use itertools::Itertools;
507 use risingwave_common::constants::hummock::CompactionFilterFlag;
508 use risingwave_hummock_sdk::HummockCompactionTaskId;
509 use risingwave_hummock_sdk::compact_task::{CompactTask, CompactTaskAssignment};
510 use risingwave_hummock_sdk::level::{InputLevel, Levels};
511 use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
512 use risingwave_pb::hummock::LevelType;
513 use risingwave_pb::hummock::compaction_config::CompactionMode;
514
515 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
516 use crate::hummock::compaction::in_progress_compaction::InProgressCompactionView;
517 use crate::hummock::compaction::selector::tests::{
518 assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level,
519 generate_table, generate_tables, push_tables_level0_nonoverlapping,
520 };
521 use crate::hummock::compaction::selector::{
522 CompactionSelector, CompactionSelectorContext, DynamicLevelSelector,
523 DynamicLevelSelectorCore, LocalSelectorStatistic,
524 };
525 use crate::hummock::compaction::{CompactionDeveloperConfig, CompactionTask};
526 use crate::hummock::level_handler::LevelHandler;
527 use crate::hummock::model::CompactionGroup;
528 use crate::hummock::test_utils::compaction_selector_context;
529
530 fn pick_compaction_with_in_progress(
531 selector: &mut DynamicLevelSelector,
532 task_id: HummockCompactionTaskId,
533 group: &CompactionGroup,
534 levels: &Levels,
535 level_handlers: &mut [LevelHandler],
536 selector_stats: &mut LocalSelectorStatistic,
537 in_progress_compactions: &InProgressCompactionView,
538 ) -> Option<CompactionTask> {
539 selector.pick_compaction(
540 task_id,
541 CompactionSelectorContext {
542 group,
543 levels,
544 member_table_ids: &BTreeSet::new(),
545 level_handlers,
546 selector_stats,
547 table_id_to_options: &HashMap::default(),
548 developer_config: Arc::new(CompactionDeveloperConfig::default()),
549 table_watermarks: &HashMap::default(),
550 state_table_info: &HummockVersionStateTableInfo::empty(),
551 in_progress_compactions,
552 },
553 )
554 }
555
556 #[test]
557 fn test_dynamic_level() {
558 let config = CompactionConfigBuilder::new()
559 .max_bytes_for_level_base(100)
560 .max_level(4)
561 .max_bytes_for_level_multiplier(5)
562 .max_compaction_bytes(1)
563 .level0_tier_compact_file_number(2)
564 .compaction_mode(CompactionMode::Range as i32)
565 .build();
566 let selector = DynamicLevelSelectorCore::new(
567 Arc::new(config),
568 Arc::new(CompactionDeveloperConfig::default()),
569 );
570 let levels = vec![
571 generate_level(1, vec![]),
572 generate_level(2, generate_tables(0..5, 0..1000, 3, 10)),
573 generate_level(3, generate_tables(5..10, 0..1000, 2, 50)),
574 generate_level(4, generate_tables(10..15, 0..1000, 1, 200)),
575 ];
576 let mut levels = Levels {
577 levels,
578 l0: generate_l0_nonoverlapping_sublevels(vec![]),
579 ..Default::default()
580 };
581 let ctx = selector.calculate_level_base_size(&levels);
582 assert_eq!(ctx.base_level, 2);
583 assert_eq!(ctx.level_max_bytes[2], 100);
584 assert_eq!(ctx.level_max_bytes[3], 200);
585 assert_eq!(ctx.level_max_bytes[4], 1000);
586
587 levels.levels[3]
588 .table_infos
589 .append(&mut generate_tables(15..20, 2000..3000, 1, 400));
590 levels.levels[3].total_file_size = levels.levels[3]
591 .table_infos
592 .iter()
593 .map(|sst| sst.sst_size)
594 .sum::<u64>();
595
596 let ctx = selector.calculate_level_base_size(&levels);
597 assert_eq!(ctx.base_level, 1);
599 assert_eq!(ctx.level_max_bytes[1], 100);
600 assert_eq!(ctx.level_max_bytes[2], 120);
601 assert_eq!(ctx.level_max_bytes[3], 600);
602 assert_eq!(ctx.level_max_bytes[4], 3000);
603
604 push_tables_level0_nonoverlapping(&mut levels, generate_tables(20..26, 0..1000, 1, 100));
606
607 let ctx = selector.calculate_level_base_size(&levels);
608 assert_eq!(ctx.base_level, 1);
609 assert_eq!(ctx.level_max_bytes[1], 100);
610 assert_eq!(ctx.level_max_bytes[2], 120);
611 assert_eq!(ctx.level_max_bytes[3], 600);
612 assert_eq!(ctx.level_max_bytes[4], 3000);
613
614 levels.l0.sub_levels.clear();
615 levels.l0.total_file_size = 0;
616 levels.levels[0].table_infos = generate_tables(26..32, 0..1000, 1, 100);
617 levels.levels[0].total_file_size = levels.levels[0]
618 .table_infos
619 .iter()
620 .map(|sst| sst.sst_size)
621 .sum::<u64>();
622
623 let ctx = selector.calculate_level_base_size(&levels);
624 assert_eq!(ctx.base_level, 1);
625 assert_eq!(ctx.level_max_bytes[1], 100);
626 assert_eq!(ctx.level_max_bytes[2], 120);
627 assert_eq!(ctx.level_max_bytes[3], 600);
628 assert_eq!(ctx.level_max_bytes[4], 3000);
629 }
630
631 #[test]
632 fn test_pick_compaction() {
633 let config = CompactionConfigBuilder::new()
634 .max_bytes_for_level_base(200)
635 .max_level(4)
636 .max_bytes_for_level_multiplier(5)
637 .target_file_size_base(5)
638 .max_compaction_bytes(10000)
639 .level0_tier_compact_file_number(4)
640 .compaction_mode(CompactionMode::Range as i32)
641 .level0_sub_level_compact_level_count(3)
642 .build();
643 let group_config = CompactionGroup::new(1, config.clone());
644 let levels = vec![
645 generate_level(1, vec![]),
646 generate_level(2, generate_tables(0..5, 0..1000, 3, 10)),
647 generate_level(3, generate_tables(5..10, 0..1000, 2, 50)),
648 generate_level(4, generate_tables(10..15, 0..1000, 1, 200)),
649 ];
650 let mut levels = Levels {
651 levels,
652 l0: generate_l0_nonoverlapping_sublevels(generate_tables(15..25, 0..600, 3, 10)),
653 ..Default::default()
654 };
655
656 let mut selector = DynamicLevelSelector::default();
657 let mut levels_handlers = (0..5).map(LevelHandler::new).collect_vec();
658 let mut local_stats = LocalSelectorStatistic::default();
659 let compaction = selector
660 .pick_compaction(
661 1,
662 compaction_selector_context(
663 &group_config,
664 &levels,
665 &BTreeSet::new(),
666 &mut levels_handlers,
667 &mut local_stats,
668 &HashMap::default(),
669 Arc::new(CompactionDeveloperConfig::default()),
670 &Default::default(),
671 &HummockVersionStateTableInfo::empty(),
672 ),
673 )
674 .unwrap();
675 assert_compaction_task(&compaction, &levels_handlers);
676
677 let compaction_filter_flag = CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL;
678 let config = CompactionConfigBuilder::with_config(config)
679 .max_bytes_for_level_base(100)
680 .sub_level_max_compaction_bytes(50)
681 .target_file_size_base(20)
682 .compaction_filter_mask(compaction_filter_flag.into())
683 .build();
684 let group_config = CompactionGroup::new(1, config.clone());
685 let mut selector = DynamicLevelSelector::default();
686
687 levels.l0.sub_levels.clear();
688 levels.l0.total_file_size = 0;
689 push_tables_level0_nonoverlapping(&mut levels, generate_tables(15..25, 0..600, 3, 20));
690 let mut levels_handlers = (0..5).map(LevelHandler::new).collect_vec();
691 let compaction = selector
692 .pick_compaction(
693 1,
694 compaction_selector_context(
695 &group_config,
696 &levels,
697 &BTreeSet::new(),
698 &mut levels_handlers,
699 &mut local_stats,
700 &HashMap::default(),
701 Arc::new(CompactionDeveloperConfig::default()),
702 &Default::default(),
703 &HummockVersionStateTableInfo::empty(),
704 ),
705 )
706 .unwrap();
707 assert_compaction_task(&compaction, &levels_handlers);
708 assert_eq!(compaction.input.input_levels[0].level_idx, 0);
709 assert_eq!(compaction.input.target_level, 2);
710
711 levels_handlers[0].remove_task(1);
712 levels_handlers[2].remove_task(1);
713 levels.l0.sub_levels.clear();
714 levels.levels[1].table_infos = generate_tables(20..30, 0..1000, 3, 10);
715 let compaction = selector
716 .pick_compaction(
717 2,
718 compaction_selector_context(
719 &group_config,
720 &levels,
721 &BTreeSet::new(),
722 &mut levels_handlers,
723 &mut local_stats,
724 &HashMap::default(),
725 Arc::new(CompactionDeveloperConfig::default()),
726 &Default::default(),
727 &HummockVersionStateTableInfo::empty(),
728 ),
729 )
730 .unwrap();
731 assert_compaction_task(&compaction, &levels_handlers);
732 assert_eq!(compaction.input.input_levels[0].level_idx, 3);
733 assert_eq!(compaction.input.target_level, 4);
734 assert_eq!(
735 compaction.input.input_levels[0]
736 .table_infos
737 .iter()
738 .map(|sst| sst.sst_id)
739 .collect_vec(),
740 vec![5]
741 );
742 assert_eq!(
743 compaction.input.input_levels[1]
744 .table_infos
745 .iter()
746 .map(|sst| sst.sst_id)
747 .collect_vec(),
748 vec![10]
749 );
750 assert_eq!(
751 compaction.target_file_size,
752 config.target_file_size_base * 2
753 );
754 assert_eq!(compaction.compression_algorithm.as_str(), "Lz4",);
755 let compaction = selector.pick_compaction(
758 2,
759 compaction_selector_context(
760 &group_config,
761 &levels,
762 &BTreeSet::new(),
763 &mut levels_handlers,
764 &mut local_stats,
765 &HashMap::default(),
766 Arc::new(CompactionDeveloperConfig::default()),
767 &Default::default(),
768 &HummockVersionStateTableInfo::empty(),
769 ),
770 );
771 assert!(compaction.is_none());
772 }
773
774 #[test]
775 fn test_trivial_move_skips_in_progress_target_overlap() {
776 let config = CompactionConfigBuilder::new()
777 .max_bytes_for_level_base(1000)
778 .max_bytes_for_level_multiplier(10)
779 .max_level(4)
780 .max_compaction_bytes(10000)
781 .level0_sub_level_compact_level_count(20)
782 .sst_allowed_trivial_move_min_size(Some(0))
783 .sst_allowed_trivial_move_max_count(Some(10))
784 .compaction_mode(CompactionMode::Range as i32)
785 .build();
786 let group_config = CompactionGroup::new(9, config);
787 let levels = Levels {
788 levels: vec![
789 generate_level(1, vec![]),
790 generate_level(2, vec![]),
791 generate_level(3, vec![]),
792 generate_level(4, vec![generate_table(878784, 1, 565, 633, 1)]),
793 ],
794 l0: generate_l0_nonoverlapping_sublevels(vec![
795 generate_table(877832, 1, 706, 718, 1),
796 generate_table(877833, 1, 800, 2000, 1),
797 ]),
798 ..Default::default()
799 };
800 let in_progress = InProgressCompactionView::for_group(
801 &[CompactTaskAssignment {
802 compact_task: CompactTask {
803 task_id: 15040,
804 compaction_group_id: 9.into(),
805 target_level: 4,
806 input_ssts: vec![
807 InputLevel {
808 level_idx: 0,
809 level_type: LevelType::Nonoverlapping,
810 table_infos: vec![generate_table(881160, 1, 592, 722, 1)],
811 },
812 InputLevel {
813 level_idx: 4,
814 level_type: LevelType::Nonoverlapping,
815 table_infos: vec![generate_table(878784, 1, 565, 633, 1)],
816 },
817 ],
818 ..Default::default()
819 },
820 context_id: 1.into(),
821 }],
822 9.into(),
823 );
824
825 let mut selector = DynamicLevelSelector::default();
826 let mut levels_handlers = (0..5).map(LevelHandler::new).collect_vec();
827 let mut local_stats = LocalSelectorStatistic::default();
828 let empty_in_progress = InProgressCompactionView::default();
829 let compaction = pick_compaction_with_in_progress(
830 &mut selector,
831 1,
832 &group_config,
833 &levels,
834 &mut levels_handlers,
835 &mut local_stats,
836 &empty_in_progress,
837 )
838 .unwrap();
839 assert_eq!(compaction.input.target_level, 4);
840 assert!(compaction.input.input_levels[1].table_infos.is_empty());
841 assert!(
842 compaction.input.input_levels[0]
843 .table_infos
844 .iter()
845 .any(|sst| sst.sst_id.as_raw_id() == 877832)
846 );
847
848 let mut selector = DynamicLevelSelector::default();
849 let mut levels_handlers = (0..5).map(LevelHandler::new).collect_vec();
850 let mut local_stats = LocalSelectorStatistic::default();
851 assert!(
852 pick_compaction_with_in_progress(
853 &mut selector,
854 2,
855 &group_config,
856 &levels,
857 &mut levels_handlers,
858 &mut local_stats,
859 &in_progress,
860 )
861 .is_none()
862 );
863 assert_eq!(local_stats.skip_picker.len(), 1);
864 assert_eq!(local_stats.skip_picker[0].2.skip_by_overlapping, 1);
865 }
866
867 #[test]
868 fn test_compact_pending_bytes() {
869 let config = CompactionConfigBuilder::new()
870 .max_bytes_for_level_base(100)
871 .max_level(4)
872 .max_bytes_for_level_multiplier(5)
873 .compaction_mode(CompactionMode::Range as i32)
874 .build();
875 let levels = vec![
876 generate_level(1, vec![]),
877 generate_level(2, generate_tables(0..50, 0..1000, 3, 500)),
878 generate_level(3, generate_tables(30..60, 0..1000, 2, 500)),
879 generate_level(4, generate_tables(60..70, 0..1000, 1, 1000)),
880 ];
881 let levels = Levels {
882 levels,
883 l0: generate_l0_nonoverlapping_sublevels(generate_tables(15..25, 0..600, 3, 100)),
884 ..Default::default()
885 };
886
887 let dynamic_level_core = DynamicLevelSelectorCore::new(
888 Arc::new(config),
889 Arc::new(CompactionDeveloperConfig::default()),
890 );
891 let ctx = dynamic_level_core.calculate_level_base_size(&levels);
892 assert_eq!(1, ctx.base_level);
893 assert_eq!(1000, levels.l0.total_file_size); assert_eq!(0, levels.levels.first().unwrap().total_file_size); assert_eq!(25000, levels.levels.get(1).unwrap().total_file_size); assert_eq!(15000, levels.levels.get(2).unwrap().total_file_size); assert_eq!(10000, levels.levels.get(3).unwrap().total_file_size); assert_eq!(100, ctx.level_max_bytes[1]); assert_eq!(500, ctx.level_max_bytes[2]); assert_eq!(2500, ctx.level_max_bytes[3]); assert_eq!(12500, ctx.level_max_bytes[4]); let compact_pending_bytes = dynamic_level_core.compact_pending_bytes_needed(&levels);
909 assert_eq!(24400 + 40110 + 47281, compact_pending_bytes);
910 }
911}