risingwave_meta/hummock/compaction/selector/
level_selector.rs

1//  Copyright 2025 RisingWave Labs
2//
3//  Licensed under the Apache License, Version 2.0 (the "License");
4//  you may not use this file except in compliance with the License.
5//  You may obtain a copy of the License at
6//
7//  http://www.apache.org/licenses/LICENSE-2.0
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14//
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19use std::sync::Arc;
20
21use risingwave_hummock_sdk::HummockCompactionTaskId;
22use risingwave_hummock_sdk::level::Levels;
23use risingwave_pb::hummock::compact_task::PbTaskType;
24use risingwave_pb::hummock::{CompactionConfig, LevelType};
25
26use super::{
27    CompactionSelector, LevelCompactionPicker, TierCompactionPicker, create_compaction_task,
28};
29use crate::hummock::compaction::overlap_strategy::OverlapStrategy;
30use crate::hummock::compaction::picker::{
31    CompactionPicker, CompactionTaskValidator, IntraCompactionPicker, LocalPickerStatistic,
32    MinOverlappingPicker,
33};
34use crate::hummock::compaction::selector::CompactionSelectorContext;
35use crate::hummock::compaction::{
36    CompactionDeveloperConfig, CompactionTask, create_overlap_strategy,
37};
38use crate::hummock::level_handler::LevelHandler;
39
40pub const SCORE_BASE: u64 = 100;
41
42#[derive(Debug, Default, Clone)]
43pub enum PickerType {
44    Tier,
45    Intra,
46    ToBase,
47    #[default]
48    BottomLevel,
49}
50
51impl std::fmt::Display for PickerType {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        f.write_str(match self {
54            PickerType::Tier => "Tier",
55            PickerType::Intra => "Intra",
56            PickerType::ToBase => "ToBase",
57            PickerType::BottomLevel => "BottomLevel",
58        })
59    }
60}
61
62#[derive(Default, Debug)]
63pub struct PickerInfo {
64    pub score: u64,
65    pub select_level: usize,
66    pub target_level: usize,
67    pub picker_type: PickerType,
68}
69
70#[derive(Default, Debug)]
71pub struct SelectContext {
72    pub level_max_bytes: Vec<u64>,
73
74    // All data will be placed in the last level. When the cluster is empty, the files in L0 will
75    // be compact to `max_level`, and the `max_level` would be `base_level`. When the total
76    // size of the files in  `base_level` reaches its capacity, we will place data in a higher
77    // level, which equals to `base_level -= 1;`.
78    pub base_level: usize,
79    pub score_levels: Vec<PickerInfo>,
80}
81
82pub struct DynamicLevelSelectorCore {
83    config: Arc<CompactionConfig>,
84    developer_config: Arc<CompactionDeveloperConfig>,
85}
86
87#[derive(Default)]
88pub struct DynamicLevelSelector {}
89
90impl DynamicLevelSelectorCore {
91    pub fn new(
92        config: Arc<CompactionConfig>,
93        developer_config: Arc<CompactionDeveloperConfig>,
94    ) -> Self {
95        Self {
96            config,
97            developer_config,
98        }
99    }
100
101    pub fn get_config(&self) -> &CompactionConfig {
102        self.config.as_ref()
103    }
104
105    fn create_compaction_picker(
106        &self,
107        picker_info: &PickerInfo,
108        overlap_strategy: Arc<dyn OverlapStrategy>,
109        compaction_task_validator: Arc<CompactionTaskValidator>,
110    ) -> Box<dyn CompactionPicker> {
111        match picker_info.picker_type {
112            PickerType::Tier => Box::new(TierCompactionPicker::new_with_validator(
113                self.config.clone(),
114                compaction_task_validator,
115            )),
116            PickerType::ToBase => Box::new(LevelCompactionPicker::new_with_validator(
117                picker_info.target_level,
118                self.config.clone(),
119                compaction_task_validator,
120                self.developer_config.clone(),
121            )),
122            PickerType::Intra => Box::new(IntraCompactionPicker::new_with_validator(
123                self.config.clone(),
124                compaction_task_validator,
125                self.developer_config.clone(),
126            )),
127            PickerType::BottomLevel => {
128                assert_eq!(picker_info.select_level + 1, picker_info.target_level);
129                Box::new(MinOverlappingPicker::new(
130                    picker_info.select_level,
131                    picker_info.target_level,
132                    self.config.max_bytes_for_level_base / 2,
133                    self.config.split_weight_by_vnode,
134                    overlap_strategy,
135                ))
136            }
137        }
138    }
139
140    // TODO: calculate this scores in apply compact result.
141    /// `calculate_level_base_size` calculate base level and the base size of LSM tree build for
142    /// current dataset. In other words,  `level_max_bytes` is our compaction goal which shall
143    /// reach. This algorithm refers to the implementation in  [`https://github.com/facebook/rocksdb/blob/v7.2.2/db/version_set.cc#L3706`]
144    pub fn calculate_level_base_size(&self, levels: &Levels) -> SelectContext {
145        let mut first_non_empty_level = 0;
146        let mut max_level_size = 0;
147        let mut ctx = SelectContext::default();
148
149        for level in &levels.levels {
150            if level.total_file_size > 0 && first_non_empty_level == 0 {
151                first_non_empty_level = level.level_idx as usize;
152            }
153            max_level_size = std::cmp::max(max_level_size, level.total_file_size);
154        }
155
156        ctx.level_max_bytes
157            .resize(self.config.max_level as usize + 1, u64::MAX);
158
159        if max_level_size == 0 {
160            // Use the bottommost level.
161            ctx.base_level = self.config.max_level as usize;
162            return ctx;
163        }
164
165        let base_bytes_max = self.config.max_bytes_for_level_base;
166        let base_bytes_min = base_bytes_max / self.config.max_bytes_for_level_multiplier;
167
168        let mut cur_level_size = max_level_size;
169        for _ in first_non_empty_level..self.config.max_level as usize {
170            cur_level_size /= self.config.max_bytes_for_level_multiplier;
171        }
172
173        let base_level_size = if cur_level_size <= base_bytes_min {
174            // Case 1. If we make target size of last level to be max_level_size,
175            // target size of the first non-empty level would be smaller than
176            // base_bytes_min. We set it be base_bytes_min.
177            ctx.base_level = first_non_empty_level;
178            base_bytes_min + 1
179        } else {
180            ctx.base_level = first_non_empty_level;
181            while ctx.base_level > 1 && cur_level_size > base_bytes_max {
182                ctx.base_level -= 1;
183                cur_level_size /= self.config.max_bytes_for_level_multiplier;
184            }
185            std::cmp::min(base_bytes_max, cur_level_size)
186        };
187
188        let level_multiplier = self.config.max_bytes_for_level_multiplier as f64;
189        let mut level_size = base_level_size;
190        for i in ctx.base_level..=self.config.max_level as usize {
191            // Don't set any level below base_bytes_max. Otherwise, the LSM can
192            // assume an hourglass shape where L1+ sizes are smaller than L0. This
193            // causes compaction scoring, which depends on level sizes, to favor L1+
194            // at the expense of L0, which may fill up and stall.
195            ctx.level_max_bytes[i] = std::cmp::max(level_size, base_bytes_max);
196            level_size = (level_size as f64 * level_multiplier) as u64;
197        }
198        ctx
199    }
200
201    pub(crate) fn get_priority_levels(
202        &self,
203        levels: &Levels,
204        handlers: &[LevelHandler],
205    ) -> SelectContext {
206        let mut ctx = self.calculate_level_base_size(levels);
207
208        let l0_file_count = levels
209            .l0
210            .sub_levels
211            .iter()
212            .map(|sub_level| sub_level.table_infos.len())
213            .sum::<usize>();
214
215        let idle_file_count = match l0_file_count.checked_sub(handlers[0].pending_file_count()) {
216            Some(count) => count,
217            None => {
218                // If the number of files in L0 is less than the number of pending files, it means
219                // that may be encountered some issue, we can work around it.
220                tracing::warn!(
221                    "The number of files in L0 {} is less than the number of pending files {} group {} pending_tasks_ids {:?} compacting_files {:?}",
222                    l0_file_count,
223                    handlers[0].pending_file_count(),
224                    levels.group_id,
225                    handlers[0].pending_tasks_ids(),
226                    handlers[0].compacting_files()
227                );
228
229                0
230            }
231        };
232
233        if idle_file_count > 0 {
234            // trigger l0 compaction when the number of files is too large.
235
236            // The read query at the overlapping level needs to merge all the ssts, so the number of
237            // ssts is the most important factor affecting the read performance, we use file count
238            // to calculate the score
239            let overlapping_file_count = levels
240                .l0
241                .sub_levels
242                .iter()
243                .filter(|level| level.level_type == LevelType::Overlapping)
244                .map(|level| level.table_infos.len())
245                .sum::<usize>();
246            if overlapping_file_count > 0 {
247                // FIXME: use overlapping idle file count
248                let l0_overlapping_score =
249                    std::cmp::min(idle_file_count, overlapping_file_count) as u64 * SCORE_BASE
250                        / self.config.level0_tier_compact_file_number;
251                // Reduce the level num of l0 overlapping sub_level
252                ctx.score_levels.push(PickerInfo {
253                    score: std::cmp::max(l0_overlapping_score, SCORE_BASE + 1),
254                    select_level: 0,
255                    target_level: 0,
256                    picker_type: PickerType::Tier,
257                })
258            }
259
260            // The read query at the non-overlapping level only selects ssts that match the query
261            // range at each level, so the number of levels is the most important factor affecting
262            // the read performance. At the same time, the size factor is also added to the score
263            // calculation rule to avoid unbalanced compact task due to large size.
264            let total_size = levels
265                .l0
266                .sub_levels
267                .iter()
268                .filter(|level| {
269                    level.vnode_partition_count == self.config.split_weight_by_vnode
270                        && level.level_type == LevelType::Nonoverlapping
271                })
272                .map(|level| level.total_file_size)
273                .sum::<u64>()
274                .saturating_sub(handlers[0].pending_output_file_size(ctx.base_level as u32));
275            let base_level_size = levels.get_level(ctx.base_level).total_file_size;
276            let base_level_sst_count = levels.get_level(ctx.base_level).table_infos.len() as u64;
277
278            // size limit
279            let non_overlapping_size_score = total_size * SCORE_BASE
280                / std::cmp::max(self.config.max_bytes_for_level_base, base_level_size);
281            // level count limit
282            let non_overlapping_level_count = levels
283                .l0
284                .sub_levels
285                .iter()
286                .filter(|level| level.level_type == LevelType::Nonoverlapping)
287                .count() as u64;
288            let non_overlapping_level_score = non_overlapping_level_count * SCORE_BASE
289                / std::cmp::max(
290                    base_level_sst_count / 16,
291                    self.config.level0_sub_level_compact_level_count as u64,
292                );
293
294            let non_overlapping_score =
295                std::cmp::max(non_overlapping_size_score, non_overlapping_level_score);
296
297            // Reduce the level num of l0 non-overlapping sub_level
298            if non_overlapping_size_score > SCORE_BASE {
299                ctx.score_levels.push(PickerInfo {
300                    score: non_overlapping_score + 1,
301                    select_level: 0,
302                    target_level: ctx.base_level,
303                    picker_type: PickerType::ToBase,
304                });
305            }
306
307            if non_overlapping_level_score > SCORE_BASE {
308                // FIXME: more accurate score calculation algorithm will be introduced (#11903)
309                ctx.score_levels.push(PickerInfo {
310                    score: non_overlapping_score,
311                    select_level: 0,
312                    target_level: 0,
313                    picker_type: PickerType::Intra,
314                });
315            }
316        }
317
318        // The bottommost level can not be input level.
319        for level in &levels.levels {
320            let level_idx = level.level_idx as usize;
321            if level_idx < ctx.base_level || level_idx >= self.config.max_level as usize {
322                continue;
323            }
324            let output_file_size =
325                handlers[level_idx].pending_output_file_size(level.level_idx + 1);
326            let total_size = level.total_file_size.saturating_sub(output_file_size);
327            if total_size == 0 {
328                continue;
329            }
330
331            ctx.score_levels.push({
332                PickerInfo {
333                    score: total_size * SCORE_BASE / ctx.level_max_bytes[level_idx],
334                    select_level: level_idx,
335                    target_level: level_idx + 1,
336                    picker_type: PickerType::BottomLevel,
337                }
338            });
339        }
340
341        // sort reverse to pick the largest one.
342        ctx.score_levels.sort_by(|a, b| {
343            b.score
344                .cmp(&a.score)
345                .then_with(|| a.target_level.cmp(&b.target_level))
346        });
347        ctx
348    }
349
350    /// `compact_pending_bytes_needed` calculates the number of compact bytes needed to balance the
351    /// LSM Tree from the current state of each level in the LSM Tree in combination with
352    /// `compaction_config`
353    /// This algorithm refers to the implementation in  [`https://github.com/facebook/rocksdb/blob/main/db/version_set.cc#L3141`]
354    pub fn compact_pending_bytes_needed(&self, levels: &Levels) -> u64 {
355        let ctx = self.calculate_level_base_size(levels);
356        self.compact_pending_bytes_needed_with_ctx(levels, &ctx)
357    }
358
359    pub fn compact_pending_bytes_needed_with_ctx(
360        &self,
361        levels: &Levels,
362        ctx: &SelectContext,
363    ) -> u64 {
364        // l0
365        let mut compact_pending_bytes = 0;
366        let mut compact_to_next_level_bytes = 0;
367        let l0_size = levels
368            .l0
369            .sub_levels
370            .iter()
371            .map(|sub_level| sub_level.total_file_size)
372            .sum::<u64>();
373
374        let mut l0_compaction_trigger = false;
375        if l0_size > self.config.max_bytes_for_level_base {
376            compact_pending_bytes = l0_size;
377            compact_to_next_level_bytes = l0_size;
378            l0_compaction_trigger = true;
379        }
380
381        // l1 and up
382        let mut level_bytes;
383        let mut next_level_bytes = 0;
384        for level in &levels.levels[ctx.base_level - 1..levels.levels.len()] {
385            let level_index = level.level_idx as usize;
386
387            if next_level_bytes > 0 {
388                level_bytes = next_level_bytes;
389                next_level_bytes = 0;
390            } else {
391                level_bytes = level.total_file_size;
392            }
393
394            if level_index == ctx.base_level && l0_compaction_trigger {
395                compact_pending_bytes += level_bytes;
396            }
397
398            level_bytes += compact_to_next_level_bytes;
399            compact_to_next_level_bytes = 0;
400            let level_target = ctx.level_max_bytes[level_index];
401            if level_bytes > level_target {
402                compact_to_next_level_bytes = level_bytes - level_target;
403
404                // Estimate the actual compaction fan-out ratio as size ratio between
405                // the two levels.
406                assert_eq!(0, next_level_bytes);
407                if level_index + 1 < ctx.level_max_bytes.len() {
408                    let next_level = level_index + 1;
409                    next_level_bytes = levels.levels[next_level - 1].total_file_size;
410                }
411
412                if next_level_bytes > 0 {
413                    compact_pending_bytes += (compact_to_next_level_bytes as f64
414                        * (next_level_bytes as f64 / level_bytes as f64 + 1.0))
415                        as u64;
416                }
417            }
418        }
419
420        compact_pending_bytes
421    }
422}
423
424impl CompactionSelector for DynamicLevelSelector {
425    fn pick_compaction(
426        &mut self,
427        task_id: HummockCompactionTaskId,
428        context: CompactionSelectorContext<'_>,
429    ) -> Option<CompactionTask> {
430        let CompactionSelectorContext {
431            group: compaction_group,
432            levels,
433            level_handlers,
434            selector_stats,
435            developer_config,
436            ..
437        } = context;
438        let dynamic_level_core = DynamicLevelSelectorCore::new(
439            compaction_group.compaction_config.clone(),
440            developer_config,
441        );
442        let overlap_strategy =
443            create_overlap_strategy(compaction_group.compaction_config.compaction_mode());
444        let ctx = dynamic_level_core.get_priority_levels(levels, level_handlers);
445        // TODO: Determine which rule to enable by write limit
446        let compaction_task_validator = Arc::new(CompactionTaskValidator::new(
447            compaction_group.compaction_config.clone(),
448        ));
449        for picker_info in &ctx.score_levels {
450            if picker_info.score <= SCORE_BASE {
451                return None;
452            }
453            let mut picker = dynamic_level_core.create_compaction_picker(
454                picker_info,
455                overlap_strategy.clone(),
456                compaction_task_validator.clone(),
457            );
458
459            let mut stats = LocalPickerStatistic::default();
460            if let Some(ret) = picker.pick_compaction(levels, level_handlers, &mut stats) {
461                ret.add_pending_task(task_id, level_handlers);
462                return Some(create_compaction_task(
463                    dynamic_level_core.get_config(),
464                    ret,
465                    ctx.base_level,
466                    self.task_type(),
467                ));
468            }
469            selector_stats.skip_picker.push((
470                picker_info.select_level,
471                picker_info.target_level,
472                stats,
473            ));
474        }
475        None
476    }
477
478    fn name(&self) -> &'static str {
479        "DynamicLevelSelector"
480    }
481
482    fn task_type(&self) -> PbTaskType {
483        PbTaskType::Dynamic
484    }
485}
486
487#[cfg(test)]
488pub mod tests {
489    use std::collections::{BTreeSet, HashMap};
490    use std::sync::Arc;
491
492    use itertools::Itertools;
493    use risingwave_common::constants::hummock::CompactionFilterFlag;
494    use risingwave_hummock_sdk::level::Levels;
495    use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
496    use risingwave_pb::hummock::compaction_config::CompactionMode;
497
498    use crate::hummock::compaction::CompactionDeveloperConfig;
499    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
500    use crate::hummock::compaction::selector::tests::{
501        assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level,
502        generate_tables, push_tables_level0_nonoverlapping,
503    };
504    use crate::hummock::compaction::selector::{
505        CompactionSelector, DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic,
506    };
507    use crate::hummock::level_handler::LevelHandler;
508    use crate::hummock::model::CompactionGroup;
509    use crate::hummock::test_utils::compaction_selector_context;
510
511    #[test]
512    fn test_dynamic_level() {
513        let config = CompactionConfigBuilder::new()
514            .max_bytes_for_level_base(100)
515            .max_level(4)
516            .max_bytes_for_level_multiplier(5)
517            .max_compaction_bytes(1)
518            .level0_tier_compact_file_number(2)
519            .compaction_mode(CompactionMode::Range as i32)
520            .build();
521        let selector = DynamicLevelSelectorCore::new(
522            Arc::new(config),
523            Arc::new(CompactionDeveloperConfig::default()),
524        );
525        let levels = vec![
526            generate_level(1, vec![]),
527            generate_level(2, generate_tables(0..5, 0..1000, 3, 10)),
528            generate_level(3, generate_tables(5..10, 0..1000, 2, 50)),
529            generate_level(4, generate_tables(10..15, 0..1000, 1, 200)),
530        ];
531        let mut levels = Levels {
532            levels,
533            l0: generate_l0_nonoverlapping_sublevels(vec![]),
534            ..Default::default()
535        };
536        let ctx = selector.calculate_level_base_size(&levels);
537        assert_eq!(ctx.base_level, 2);
538        assert_eq!(ctx.level_max_bytes[2], 100);
539        assert_eq!(ctx.level_max_bytes[3], 200);
540        assert_eq!(ctx.level_max_bytes[4], 1000);
541
542        levels.levels[3]
543            .table_infos
544            .append(&mut generate_tables(15..20, 2000..3000, 1, 400));
545        levels.levels[3].total_file_size = levels.levels[3]
546            .table_infos
547            .iter()
548            .map(|sst| sst.sst_size)
549            .sum::<u64>();
550
551        let ctx = selector.calculate_level_base_size(&levels);
552        // data size increase, so we need increase one level to place more data.
553        assert_eq!(ctx.base_level, 1);
554        assert_eq!(ctx.level_max_bytes[1], 100);
555        assert_eq!(ctx.level_max_bytes[2], 120);
556        assert_eq!(ctx.level_max_bytes[3], 600);
557        assert_eq!(ctx.level_max_bytes[4], 3000);
558
559        // append a large data to L0 but it does not change the base size of LSM tree.
560        push_tables_level0_nonoverlapping(&mut levels, generate_tables(20..26, 0..1000, 1, 100));
561
562        let ctx = selector.calculate_level_base_size(&levels);
563        assert_eq!(ctx.base_level, 1);
564        assert_eq!(ctx.level_max_bytes[1], 100);
565        assert_eq!(ctx.level_max_bytes[2], 120);
566        assert_eq!(ctx.level_max_bytes[3], 600);
567        assert_eq!(ctx.level_max_bytes[4], 3000);
568
569        levels.l0.sub_levels.clear();
570        levels.l0.total_file_size = 0;
571        levels.levels[0].table_infos = generate_tables(26..32, 0..1000, 1, 100);
572        levels.levels[0].total_file_size = levels.levels[0]
573            .table_infos
574            .iter()
575            .map(|sst| sst.sst_size)
576            .sum::<u64>();
577
578        let ctx = selector.calculate_level_base_size(&levels);
579        assert_eq!(ctx.base_level, 1);
580        assert_eq!(ctx.level_max_bytes[1], 100);
581        assert_eq!(ctx.level_max_bytes[2], 120);
582        assert_eq!(ctx.level_max_bytes[3], 600);
583        assert_eq!(ctx.level_max_bytes[4], 3000);
584    }
585
586    #[test]
587    fn test_pick_compaction() {
588        let config = CompactionConfigBuilder::new()
589            .max_bytes_for_level_base(200)
590            .max_level(4)
591            .max_bytes_for_level_multiplier(5)
592            .target_file_size_base(5)
593            .max_compaction_bytes(10000)
594            .level0_tier_compact_file_number(4)
595            .compaction_mode(CompactionMode::Range as i32)
596            .level0_sub_level_compact_level_count(3)
597            .build();
598        let group_config = CompactionGroup::new(1, config.clone());
599        let levels = vec![
600            generate_level(1, vec![]),
601            generate_level(2, generate_tables(0..5, 0..1000, 3, 10)),
602            generate_level(3, generate_tables(5..10, 0..1000, 2, 50)),
603            generate_level(4, generate_tables(10..15, 0..1000, 1, 200)),
604        ];
605        let mut levels = Levels {
606            levels,
607            l0: generate_l0_nonoverlapping_sublevels(generate_tables(15..25, 0..600, 3, 10)),
608            ..Default::default()
609        };
610
611        let mut selector = DynamicLevelSelector::default();
612        let mut levels_handlers = (0..5).map(LevelHandler::new).collect_vec();
613        let mut local_stats = LocalSelectorStatistic::default();
614        let compaction = selector
615            .pick_compaction(
616                1,
617                compaction_selector_context(
618                    &group_config,
619                    &levels,
620                    &BTreeSet::new(),
621                    &mut levels_handlers,
622                    &mut local_stats,
623                    &HashMap::default(),
624                    Arc::new(CompactionDeveloperConfig::default()),
625                    &Default::default(),
626                    &HummockVersionStateTableInfo::empty(),
627                ),
628            )
629            .unwrap();
630        assert_compaction_task(&compaction, &levels_handlers);
631
632        let compaction_filter_flag = CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL;
633        let config = CompactionConfigBuilder::with_config(config)
634            .max_bytes_for_level_base(100)
635            .sub_level_max_compaction_bytes(50)
636            .target_file_size_base(20)
637            .compaction_filter_mask(compaction_filter_flag.into())
638            .build();
639        let group_config = CompactionGroup::new(1, config.clone());
640        let mut selector = DynamicLevelSelector::default();
641
642        levels.l0.sub_levels.clear();
643        levels.l0.total_file_size = 0;
644        push_tables_level0_nonoverlapping(&mut levels, generate_tables(15..25, 0..600, 3, 20));
645        let mut levels_handlers = (0..5).map(LevelHandler::new).collect_vec();
646        let compaction = selector
647            .pick_compaction(
648                1,
649                compaction_selector_context(
650                    &group_config,
651                    &levels,
652                    &BTreeSet::new(),
653                    &mut levels_handlers,
654                    &mut local_stats,
655                    &HashMap::default(),
656                    Arc::new(CompactionDeveloperConfig::default()),
657                    &Default::default(),
658                    &HummockVersionStateTableInfo::empty(),
659                ),
660            )
661            .unwrap();
662        assert_compaction_task(&compaction, &levels_handlers);
663        assert_eq!(compaction.input.input_levels[0].level_idx, 0);
664        assert_eq!(compaction.input.target_level, 2);
665
666        levels_handlers[0].remove_task(1);
667        levels_handlers[2].remove_task(1);
668        levels.l0.sub_levels.clear();
669        levels.levels[1].table_infos = generate_tables(20..30, 0..1000, 3, 10);
670        let compaction = selector
671            .pick_compaction(
672                2,
673                compaction_selector_context(
674                    &group_config,
675                    &levels,
676                    &BTreeSet::new(),
677                    &mut levels_handlers,
678                    &mut local_stats,
679                    &HashMap::default(),
680                    Arc::new(CompactionDeveloperConfig::default()),
681                    &Default::default(),
682                    &HummockVersionStateTableInfo::empty(),
683                ),
684            )
685            .unwrap();
686        assert_compaction_task(&compaction, &levels_handlers);
687        assert_eq!(compaction.input.input_levels[0].level_idx, 3);
688        assert_eq!(compaction.input.target_level, 4);
689        assert_eq!(
690            compaction.input.input_levels[0]
691                .table_infos
692                .iter()
693                .map(|sst| sst.sst_id)
694                .collect_vec(),
695            vec![5]
696        );
697        assert_eq!(
698            compaction.input.input_levels[1]
699                .table_infos
700                .iter()
701                .map(|sst| sst.sst_id)
702                .collect_vec(),
703            vec![10]
704        );
705        assert_eq!(
706            compaction.target_file_size,
707            config.target_file_size_base * 2
708        );
709        assert_eq!(compaction.compression_algorithm.as_str(), "Lz4",);
710        // no compaction need to be scheduled because we do not calculate the size of pending files
711        // to score.
712        let compaction = selector.pick_compaction(
713            2,
714            compaction_selector_context(
715                &group_config,
716                &levels,
717                &BTreeSet::new(),
718                &mut levels_handlers,
719                &mut local_stats,
720                &HashMap::default(),
721                Arc::new(CompactionDeveloperConfig::default()),
722                &Default::default(),
723                &HummockVersionStateTableInfo::empty(),
724            ),
725        );
726        assert!(compaction.is_none());
727    }
728
729    #[test]
730    fn test_compact_pending_bytes() {
731        let config = CompactionConfigBuilder::new()
732            .max_bytes_for_level_base(100)
733            .max_level(4)
734            .max_bytes_for_level_multiplier(5)
735            .compaction_mode(CompactionMode::Range as i32)
736            .build();
737        let levels = vec![
738            generate_level(1, vec![]),
739            generate_level(2, generate_tables(0..50, 0..1000, 3, 500)),
740            generate_level(3, generate_tables(30..60, 0..1000, 2, 500)),
741            generate_level(4, generate_tables(60..70, 0..1000, 1, 1000)),
742        ];
743        let levels = Levels {
744            levels,
745            l0: generate_l0_nonoverlapping_sublevels(generate_tables(15..25, 0..600, 3, 100)),
746            ..Default::default()
747        };
748
749        let dynamic_level_core = DynamicLevelSelectorCore::new(
750            Arc::new(config),
751            Arc::new(CompactionDeveloperConfig::default()),
752        );
753        let ctx = dynamic_level_core.calculate_level_base_size(&levels);
754        assert_eq!(1, ctx.base_level);
755        assert_eq!(1000, levels.l0.total_file_size); // l0
756        assert_eq!(0, levels.levels.first().unwrap().total_file_size); // l1
757        assert_eq!(25000, levels.levels.get(1).unwrap().total_file_size); // l2
758        assert_eq!(15000, levels.levels.get(2).unwrap().total_file_size); // l3
759        assert_eq!(10000, levels.levels.get(3).unwrap().total_file_size); // l4
760
761        assert_eq!(100, ctx.level_max_bytes[1]); // l1
762        assert_eq!(500, ctx.level_max_bytes[2]); // l2
763        assert_eq!(2500, ctx.level_max_bytes[3]); // l3
764        assert_eq!(12500, ctx.level_max_bytes[4]); // l4
765
766        // l1 pending = (0 + 1000 - 100) * ((25000 / 1000) + 1) + 1000 = 24400
767        // l2 pending = (25000 + 900 - 500) * ((15000 / (25000 + 900)) + 1) = 40110
768        // l3 pending = (15000 + 25400 - 2500) * ((10000 / (15000 + 25400) + 1)) = 47281
769
770        let compact_pending_bytes = dynamic_level_core.compact_pending_bytes_needed(&levels);
771        assert_eq!(24400 + 40110 + 47281, compact_pending_bytes);
772    }
773}