risingwave_meta/hummock/compaction/selector/
level_selector.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20use std::sync::Arc;
21
22use risingwave_hummock_sdk::HummockCompactionTaskId;
23use risingwave_hummock_sdk::level::Levels;
24use risingwave_pb::hummock::compact_task::PbTaskType;
25use risingwave_pb::hummock::{CompactionConfig, LevelType};
26
27use super::{
28    CompactionSelector, LevelCompactionPicker, TierCompactionPicker, create_compaction_task,
29};
30use crate::hummock::compaction::overlap_strategy::OverlapStrategy;
31use crate::hummock::compaction::picker::{
32    CompactionPicker, CompactionTaskValidator, IntraCompactionPicker, LocalPickerStatistic,
33    MinOverlappingPicker,
34};
35use crate::hummock::compaction::selector::CompactionSelectorContext;
36use crate::hummock::compaction::{
37    CompactionDeveloperConfig, CompactionTask, create_overlap_strategy,
38};
39use crate::hummock::level_handler::LevelHandler;
40
41pub const SCORE_BASE: u64 = 100;
42
43#[derive(Debug, Default, Clone)]
44pub enum PickerType {
45    Tier,
46    Intra,
47    ToBase,
48    #[default]
49    BottomLevel,
50}
51
52impl std::fmt::Display for PickerType {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.write_str(match self {
55            PickerType::Tier => "Tier",
56            PickerType::Intra => "Intra",
57            PickerType::ToBase => "ToBase",
58            PickerType::BottomLevel => "BottomLevel",
59        })
60    }
61}
62
63#[derive(Default, Debug)]
64pub struct PickerInfo {
65    pub score: u64,
66    pub select_level: usize,
67    pub target_level: usize,
68    pub picker_type: PickerType,
69}
70
71#[derive(Default, Debug)]
72pub struct SelectContext {
73    pub level_max_bytes: Vec<u64>,
74
75    // All data will be placed in the last level. When the cluster is empty, the files in L0 will
76    // be compact to `max_level`, and the `max_level` would be `base_level`. When the total
77    // size of the files in  `base_level` reaches its capacity, we will place data in a higher
78    // level, which equals to `base_level -= 1;`.
79    pub base_level: usize,
80    pub score_levels: Vec<PickerInfo>,
81}
82
83pub struct DynamicLevelSelectorCore {
84    config: Arc<CompactionConfig>,
85    developer_config: Arc<CompactionDeveloperConfig>,
86}
87
88#[derive(Default)]
89pub struct DynamicLevelSelector {}
90
91impl DynamicLevelSelectorCore {
92    pub fn new(
93        config: Arc<CompactionConfig>,
94        developer_config: Arc<CompactionDeveloperConfig>,
95    ) -> Self {
96        Self {
97            config,
98            developer_config,
99        }
100    }
101
102    pub fn get_config(&self) -> &CompactionConfig {
103        self.config.as_ref()
104    }
105
106    fn create_compaction_picker(
107        &self,
108        picker_info: &PickerInfo,
109        overlap_strategy: Arc<dyn OverlapStrategy>,
110        compaction_task_validator: Arc<CompactionTaskValidator>,
111    ) -> Box<dyn CompactionPicker> {
112        match picker_info.picker_type {
113            PickerType::Tier => Box::new(TierCompactionPicker::new_with_validator(
114                self.config.clone(),
115                compaction_task_validator,
116            )),
117            PickerType::ToBase => Box::new(LevelCompactionPicker::new_with_validator(
118                picker_info.target_level,
119                self.config.clone(),
120                compaction_task_validator,
121                self.developer_config.clone(),
122            )),
123            PickerType::Intra => Box::new(IntraCompactionPicker::new_with_validator(
124                self.config.clone(),
125                compaction_task_validator,
126                self.developer_config.clone(),
127            )),
128            PickerType::BottomLevel => {
129                assert_eq!(picker_info.select_level + 1, picker_info.target_level);
130                Box::new(MinOverlappingPicker::new(
131                    picker_info.select_level,
132                    picker_info.target_level,
133                    self.config.max_bytes_for_level_base / 2,
134                    self.config.split_weight_by_vnode,
135                    overlap_strategy,
136                ))
137            }
138        }
139    }
140
141    // TODO: calculate this scores in apply compact result.
142    /// `calculate_level_base_size` calculate base level and the base size of LSM tree build for
143    /// current dataset. In other words,  `level_max_bytes` is our compaction goal which shall
144    /// reach. This algorithm refers to the implementation in  `https://github.com/facebook/rocksdb/blob/v7.2.2/db/version_set.cc#L3706`
145    pub fn calculate_level_base_size(&self, levels: &Levels) -> SelectContext {
146        let mut first_non_empty_level = 0;
147        let mut max_level_size = 0;
148        let mut ctx = SelectContext::default();
149
150        for level in &levels.levels {
151            if level.total_file_size > 0 && first_non_empty_level == 0 {
152                first_non_empty_level = level.level_idx as usize;
153            }
154            max_level_size = std::cmp::max(max_level_size, level.total_file_size);
155        }
156
157        ctx.level_max_bytes
158            .resize(self.config.max_level as usize + 1, u64::MAX);
159
160        if max_level_size == 0 {
161            // Use the bottommost level.
162            ctx.base_level = self.config.max_level as usize;
163            return ctx;
164        }
165
166        let base_bytes_max = self.config.max_bytes_for_level_base;
167        let base_bytes_min = base_bytes_max / self.config.max_bytes_for_level_multiplier;
168
169        let mut cur_level_size = max_level_size;
170        for _ in first_non_empty_level..self.config.max_level as usize {
171            cur_level_size /= self.config.max_bytes_for_level_multiplier;
172        }
173
174        let base_level_size = if cur_level_size <= base_bytes_min {
175            // Case 1. If we make target size of last level to be max_level_size,
176            // target size of the first non-empty level would be smaller than
177            // base_bytes_min. We set it be base_bytes_min.
178            ctx.base_level = first_non_empty_level;
179            base_bytes_min + 1
180        } else {
181            ctx.base_level = first_non_empty_level;
182            while ctx.base_level > 1 && cur_level_size > base_bytes_max {
183                ctx.base_level -= 1;
184                cur_level_size /= self.config.max_bytes_for_level_multiplier;
185            }
186            std::cmp::min(base_bytes_max, cur_level_size)
187        };
188
189        let level_multiplier = self.config.max_bytes_for_level_multiplier as f64;
190        let mut level_size = base_level_size;
191        for i in ctx.base_level..=self.config.max_level as usize {
192            // Don't set any level below base_bytes_max. Otherwise, the LSM can
193            // assume an hourglass shape where L1+ sizes are smaller than L0. This
194            // causes compaction scoring, which depends on level sizes, to favor L1+
195            // at the expense of L0, which may fill up and stall.
196            ctx.level_max_bytes[i] = std::cmp::max(level_size, base_bytes_max);
197            level_size = (level_size as f64 * level_multiplier) as u64;
198        }
199        ctx
200    }
201
202    pub(crate) fn get_priority_levels(
203        &self,
204        levels: &Levels,
205        handlers: &[LevelHandler],
206    ) -> SelectContext {
207        let mut ctx = self.calculate_level_base_size(levels);
208
209        let l0_file_count = levels
210            .l0
211            .sub_levels
212            .iter()
213            .map(|sub_level| sub_level.table_infos.len())
214            .sum::<usize>();
215
216        let idle_file_count = match l0_file_count.checked_sub(handlers[0].pending_file_count()) {
217            Some(count) => count,
218            None => {
219                // If the number of files in L0 is less than the number of pending files, it means
220                // that may be encountered some issue, we can work around it.
221                tracing::warn!(
222                    "The number of files in L0 {} is less than the number of pending files {} group {} pending_tasks_ids {:?} compacting_files {:?}",
223                    l0_file_count,
224                    handlers[0].pending_file_count(),
225                    levels.group_id,
226                    handlers[0].pending_tasks_ids(),
227                    handlers[0].compacting_files()
228                );
229
230                0
231            }
232        };
233
234        if idle_file_count > 0 {
235            // trigger l0 compaction when the number of files is too large.
236
237            // The read query at the overlapping level needs to merge all the ssts, so the number of
238            // ssts is the most important factor affecting the read performance, we use file count
239            // to calculate the score
240            let overlapping_file_count = levels
241                .l0
242                .sub_levels
243                .iter()
244                .filter(|level| level.level_type == LevelType::Overlapping)
245                .map(|level| level.table_infos.len())
246                .sum::<usize>();
247            if overlapping_file_count > 0 {
248                // FIXME: use overlapping idle file count
249                let l0_overlapping_score =
250                    std::cmp::min(idle_file_count, overlapping_file_count) as u64 * SCORE_BASE
251                        / self.config.level0_tier_compact_file_number;
252                // Reduce the level num of l0 overlapping sub_level
253                ctx.score_levels.push(PickerInfo {
254                    score: std::cmp::max(l0_overlapping_score, SCORE_BASE + 1),
255                    select_level: 0,
256                    target_level: 0,
257                    picker_type: PickerType::Tier,
258                })
259            }
260
261            // The read query at the non-overlapping level only selects ssts that match the query
262            // range at each level, so the number of levels is the most important factor affecting
263            // the read performance. At the same time, the size factor is also added to the score
264            // calculation rule to avoid unbalanced compact task due to large size.
265            let total_size = levels
266                .l0
267                .sub_levels
268                .iter()
269                .filter(|level| {
270                    level.vnode_partition_count == self.config.split_weight_by_vnode
271                        && level.level_type == LevelType::Nonoverlapping
272                })
273                .map(|level| level.total_file_size)
274                .sum::<u64>()
275                .saturating_sub(handlers[0].pending_output_file_size(ctx.base_level as u32));
276            let base_level_size = levels.get_level(ctx.base_level).total_file_size;
277            let base_level_sst_count = levels.get_level(ctx.base_level).table_infos.len() as u64;
278
279            // size limit
280            let non_overlapping_size_score = total_size * SCORE_BASE
281                / std::cmp::max(self.config.max_bytes_for_level_base, base_level_size);
282            // level count limit
283            let non_overlapping_level_count = levels
284                .l0
285                .sub_levels
286                .iter()
287                .filter(|level| level.level_type == LevelType::Nonoverlapping)
288                .count() as u64;
289            let non_overlapping_level_score = non_overlapping_level_count * SCORE_BASE
290                / std::cmp::max(
291                    base_level_sst_count / 16,
292                    self.config.level0_sub_level_compact_level_count as u64,
293                );
294
295            let non_overlapping_score =
296                std::cmp::max(non_overlapping_size_score, non_overlapping_level_score);
297
298            // Reduce the level num of l0 non-overlapping sub_level
299            if non_overlapping_size_score > SCORE_BASE {
300                ctx.score_levels.push(PickerInfo {
301                    score: non_overlapping_score + 1,
302                    select_level: 0,
303                    target_level: ctx.base_level,
304                    picker_type: PickerType::ToBase,
305                });
306            }
307
308            if non_overlapping_level_score > SCORE_BASE {
309                // FIXME: more accurate score calculation algorithm will be introduced (#11903)
310                ctx.score_levels.push(PickerInfo {
311                    score: non_overlapping_score,
312                    select_level: 0,
313                    target_level: 0,
314                    picker_type: PickerType::Intra,
315                });
316            }
317        }
318
319        // The bottommost level can not be input level.
320        for level in &levels.levels {
321            let level_idx = level.level_idx as usize;
322            if level_idx < ctx.base_level || level_idx >= self.config.max_level as usize {
323                continue;
324            }
325            let output_file_size =
326                handlers[level_idx].pending_output_file_size(level.level_idx + 1);
327            let total_size = level.total_file_size.saturating_sub(output_file_size);
328            if total_size == 0 {
329                continue;
330            }
331
332            ctx.score_levels.push({
333                PickerInfo {
334                    score: total_size * SCORE_BASE / ctx.level_max_bytes[level_idx],
335                    select_level: level_idx,
336                    target_level: level_idx + 1,
337                    picker_type: PickerType::BottomLevel,
338                }
339            });
340        }
341
342        // sort reverse to pick the largest one.
343        ctx.score_levels.sort_by(|a, b| {
344            b.score
345                .cmp(&a.score)
346                .then_with(|| a.target_level.cmp(&b.target_level))
347        });
348        ctx
349    }
350
351    /// `compact_pending_bytes_needed` calculates the number of compact bytes needed to balance the
352    /// LSM Tree from the current state of each level in the LSM Tree in combination with
353    /// `compaction_config`
354    /// This algorithm refers to the implementation in  `https://github.com/facebook/rocksdb/blob/main/db/version_set.cc#L3141`
355    pub fn compact_pending_bytes_needed(&self, levels: &Levels) -> u64 {
356        let ctx = self.calculate_level_base_size(levels);
357        self.compact_pending_bytes_needed_with_ctx(levels, &ctx)
358    }
359
360    pub fn compact_pending_bytes_needed_with_ctx(
361        &self,
362        levels: &Levels,
363        ctx: &SelectContext,
364    ) -> u64 {
365        // l0
366        let mut compact_pending_bytes = 0;
367        let mut compact_to_next_level_bytes = 0;
368        let l0_size = levels
369            .l0
370            .sub_levels
371            .iter()
372            .map(|sub_level| sub_level.total_file_size)
373            .sum::<u64>();
374
375        let mut l0_compaction_trigger = false;
376        if l0_size > self.config.max_bytes_for_level_base {
377            compact_pending_bytes = l0_size;
378            compact_to_next_level_bytes = l0_size;
379            l0_compaction_trigger = true;
380        }
381
382        // l1 and up
383        let mut level_bytes;
384        let mut next_level_bytes = 0;
385        for level in &levels.levels[ctx.base_level - 1..levels.levels.len()] {
386            let level_index = level.level_idx as usize;
387
388            if next_level_bytes > 0 {
389                level_bytes = next_level_bytes;
390                next_level_bytes = 0;
391            } else {
392                level_bytes = level.total_file_size;
393            }
394
395            if level_index == ctx.base_level && l0_compaction_trigger {
396                compact_pending_bytes += level_bytes;
397            }
398
399            level_bytes += compact_to_next_level_bytes;
400            compact_to_next_level_bytes = 0;
401            let level_target = ctx.level_max_bytes[level_index];
402            if level_bytes > level_target {
403                compact_to_next_level_bytes = level_bytes - level_target;
404
405                // Estimate the actual compaction fan-out ratio as size ratio between
406                // the two levels.
407                assert_eq!(0, next_level_bytes);
408                if level_index + 1 < ctx.level_max_bytes.len() {
409                    let next_level = level_index + 1;
410                    next_level_bytes = levels.levels[next_level - 1].total_file_size;
411                }
412
413                if next_level_bytes > 0 {
414                    compact_pending_bytes += (compact_to_next_level_bytes as f64
415                        * (next_level_bytes as f64 / level_bytes as f64 + 1.0))
416                        as u64;
417                }
418            }
419        }
420
421        compact_pending_bytes
422    }
423}
424
425impl CompactionSelector for DynamicLevelSelector {
426    fn pick_compaction(
427        &mut self,
428        task_id: HummockCompactionTaskId,
429        context: CompactionSelectorContext<'_>,
430    ) -> Option<CompactionTask> {
431        let CompactionSelectorContext {
432            group: compaction_group,
433            levels,
434            level_handlers,
435            selector_stats,
436            developer_config,
437            ..
438        } = context;
439        let dynamic_level_core = DynamicLevelSelectorCore::new(
440            compaction_group.compaction_config.clone(),
441            developer_config,
442        );
443        let overlap_strategy =
444            create_overlap_strategy(compaction_group.compaction_config.compaction_mode());
445        let ctx = dynamic_level_core.get_priority_levels(levels, level_handlers);
446        // TODO: Determine which rule to enable by write limit
447        let compaction_task_validator = Arc::new(CompactionTaskValidator::new(
448            compaction_group.compaction_config.clone(),
449        ));
450        for picker_info in &ctx.score_levels {
451            if picker_info.score <= SCORE_BASE {
452                return None;
453            }
454            let mut picker = dynamic_level_core.create_compaction_picker(
455                picker_info,
456                overlap_strategy.clone(),
457                compaction_task_validator.clone(),
458            );
459
460            let mut stats = LocalPickerStatistic::default();
461            if let Some(ret) = picker.pick_compaction(levels, level_handlers, &mut stats) {
462                ret.add_pending_task(task_id, level_handlers);
463                return Some(create_compaction_task(
464                    dynamic_level_core.get_config(),
465                    ret,
466                    ctx.base_level,
467                    self.task_type(),
468                ));
469            }
470            selector_stats.skip_picker.push((
471                picker_info.select_level,
472                picker_info.target_level,
473                stats,
474            ));
475        }
476        None
477    }
478
479    fn name(&self) -> &'static str {
480        "DynamicLevelSelector"
481    }
482
483    fn task_type(&self) -> PbTaskType {
484        PbTaskType::Dynamic
485    }
486}
487
488#[cfg(test)]
489pub mod tests {
490    use std::collections::{BTreeSet, HashMap};
491    use std::sync::Arc;
492
493    use itertools::Itertools;
494    use risingwave_common::constants::hummock::CompactionFilterFlag;
495    use risingwave_hummock_sdk::level::Levels;
496    use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
497    use risingwave_pb::hummock::compaction_config::CompactionMode;
498
499    use crate::hummock::compaction::CompactionDeveloperConfig;
500    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
501    use crate::hummock::compaction::selector::tests::{
502        assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level,
503        generate_tables, push_tables_level0_nonoverlapping,
504    };
505    use crate::hummock::compaction::selector::{
506        CompactionSelector, DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic,
507    };
508    use crate::hummock::level_handler::LevelHandler;
509    use crate::hummock::model::CompactionGroup;
510    use crate::hummock::test_utils::compaction_selector_context;
511
512    #[test]
513    fn test_dynamic_level() {
514        let config = CompactionConfigBuilder::new()
515            .max_bytes_for_level_base(100)
516            .max_level(4)
517            .max_bytes_for_level_multiplier(5)
518            .max_compaction_bytes(1)
519            .level0_tier_compact_file_number(2)
520            .compaction_mode(CompactionMode::Range as i32)
521            .build();
522        let selector = DynamicLevelSelectorCore::new(
523            Arc::new(config),
524            Arc::new(CompactionDeveloperConfig::default()),
525        );
526        let levels = vec![
527            generate_level(1, vec![]),
528            generate_level(2, generate_tables(0..5, 0..1000, 3, 10)),
529            generate_level(3, generate_tables(5..10, 0..1000, 2, 50)),
530            generate_level(4, generate_tables(10..15, 0..1000, 1, 200)),
531        ];
532        let mut levels = Levels {
533            levels,
534            l0: generate_l0_nonoverlapping_sublevels(vec![]),
535            ..Default::default()
536        };
537        let ctx = selector.calculate_level_base_size(&levels);
538        assert_eq!(ctx.base_level, 2);
539        assert_eq!(ctx.level_max_bytes[2], 100);
540        assert_eq!(ctx.level_max_bytes[3], 200);
541        assert_eq!(ctx.level_max_bytes[4], 1000);
542
543        levels.levels[3]
544            .table_infos
545            .append(&mut generate_tables(15..20, 2000..3000, 1, 400));
546        levels.levels[3].total_file_size = levels.levels[3]
547            .table_infos
548            .iter()
549            .map(|sst| sst.sst_size)
550            .sum::<u64>();
551
552        let ctx = selector.calculate_level_base_size(&levels);
553        // data size increase, so we need increase one level to place more data.
554        assert_eq!(ctx.base_level, 1);
555        assert_eq!(ctx.level_max_bytes[1], 100);
556        assert_eq!(ctx.level_max_bytes[2], 120);
557        assert_eq!(ctx.level_max_bytes[3], 600);
558        assert_eq!(ctx.level_max_bytes[4], 3000);
559
560        // append a large data to L0 but it does not change the base size of LSM tree.
561        push_tables_level0_nonoverlapping(&mut levels, generate_tables(20..26, 0..1000, 1, 100));
562
563        let ctx = selector.calculate_level_base_size(&levels);
564        assert_eq!(ctx.base_level, 1);
565        assert_eq!(ctx.level_max_bytes[1], 100);
566        assert_eq!(ctx.level_max_bytes[2], 120);
567        assert_eq!(ctx.level_max_bytes[3], 600);
568        assert_eq!(ctx.level_max_bytes[4], 3000);
569
570        levels.l0.sub_levels.clear();
571        levels.l0.total_file_size = 0;
572        levels.levels[0].table_infos = generate_tables(26..32, 0..1000, 1, 100);
573        levels.levels[0].total_file_size = levels.levels[0]
574            .table_infos
575            .iter()
576            .map(|sst| sst.sst_size)
577            .sum::<u64>();
578
579        let ctx = selector.calculate_level_base_size(&levels);
580        assert_eq!(ctx.base_level, 1);
581        assert_eq!(ctx.level_max_bytes[1], 100);
582        assert_eq!(ctx.level_max_bytes[2], 120);
583        assert_eq!(ctx.level_max_bytes[3], 600);
584        assert_eq!(ctx.level_max_bytes[4], 3000);
585    }
586
587    #[test]
588    fn test_pick_compaction() {
589        let config = CompactionConfigBuilder::new()
590            .max_bytes_for_level_base(200)
591            .max_level(4)
592            .max_bytes_for_level_multiplier(5)
593            .target_file_size_base(5)
594            .max_compaction_bytes(10000)
595            .level0_tier_compact_file_number(4)
596            .compaction_mode(CompactionMode::Range as i32)
597            .level0_sub_level_compact_level_count(3)
598            .build();
599        let group_config = CompactionGroup::new(1, config.clone());
600        let levels = vec![
601            generate_level(1, vec![]),
602            generate_level(2, generate_tables(0..5, 0..1000, 3, 10)),
603            generate_level(3, generate_tables(5..10, 0..1000, 2, 50)),
604            generate_level(4, generate_tables(10..15, 0..1000, 1, 200)),
605        ];
606        let mut levels = Levels {
607            levels,
608            l0: generate_l0_nonoverlapping_sublevels(generate_tables(15..25, 0..600, 3, 10)),
609            ..Default::default()
610        };
611
612        let mut selector = DynamicLevelSelector::default();
613        let mut levels_handlers = (0..5).map(LevelHandler::new).collect_vec();
614        let mut local_stats = LocalSelectorStatistic::default();
615        let compaction = selector
616            .pick_compaction(
617                1,
618                compaction_selector_context(
619                    &group_config,
620                    &levels,
621                    &BTreeSet::new(),
622                    &mut levels_handlers,
623                    &mut local_stats,
624                    &HashMap::default(),
625                    Arc::new(CompactionDeveloperConfig::default()),
626                    &Default::default(),
627                    &HummockVersionStateTableInfo::empty(),
628                ),
629            )
630            .unwrap();
631        assert_compaction_task(&compaction, &levels_handlers);
632
633        let compaction_filter_flag = CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL;
634        let config = CompactionConfigBuilder::with_config(config)
635            .max_bytes_for_level_base(100)
636            .sub_level_max_compaction_bytes(50)
637            .target_file_size_base(20)
638            .compaction_filter_mask(compaction_filter_flag.into())
639            .build();
640        let group_config = CompactionGroup::new(1, config.clone());
641        let mut selector = DynamicLevelSelector::default();
642
643        levels.l0.sub_levels.clear();
644        levels.l0.total_file_size = 0;
645        push_tables_level0_nonoverlapping(&mut levels, generate_tables(15..25, 0..600, 3, 20));
646        let mut levels_handlers = (0..5).map(LevelHandler::new).collect_vec();
647        let compaction = selector
648            .pick_compaction(
649                1,
650                compaction_selector_context(
651                    &group_config,
652                    &levels,
653                    &BTreeSet::new(),
654                    &mut levels_handlers,
655                    &mut local_stats,
656                    &HashMap::default(),
657                    Arc::new(CompactionDeveloperConfig::default()),
658                    &Default::default(),
659                    &HummockVersionStateTableInfo::empty(),
660                ),
661            )
662            .unwrap();
663        assert_compaction_task(&compaction, &levels_handlers);
664        assert_eq!(compaction.input.input_levels[0].level_idx, 0);
665        assert_eq!(compaction.input.target_level, 2);
666
667        levels_handlers[0].remove_task(1);
668        levels_handlers[2].remove_task(1);
669        levels.l0.sub_levels.clear();
670        levels.levels[1].table_infos = generate_tables(20..30, 0..1000, 3, 10);
671        let compaction = selector
672            .pick_compaction(
673                2,
674                compaction_selector_context(
675                    &group_config,
676                    &levels,
677                    &BTreeSet::new(),
678                    &mut levels_handlers,
679                    &mut local_stats,
680                    &HashMap::default(),
681                    Arc::new(CompactionDeveloperConfig::default()),
682                    &Default::default(),
683                    &HummockVersionStateTableInfo::empty(),
684                ),
685            )
686            .unwrap();
687        assert_compaction_task(&compaction, &levels_handlers);
688        assert_eq!(compaction.input.input_levels[0].level_idx, 3);
689        assert_eq!(compaction.input.target_level, 4);
690        assert_eq!(
691            compaction.input.input_levels[0]
692                .table_infos
693                .iter()
694                .map(|sst| sst.sst_id)
695                .collect_vec(),
696            vec![5]
697        );
698        assert_eq!(
699            compaction.input.input_levels[1]
700                .table_infos
701                .iter()
702                .map(|sst| sst.sst_id)
703                .collect_vec(),
704            vec![10]
705        );
706        assert_eq!(
707            compaction.target_file_size,
708            config.target_file_size_base * 2
709        );
710        assert_eq!(compaction.compression_algorithm.as_str(), "Lz4",);
711        // no compaction need to be scheduled because we do not calculate the size of pending files
712        // to score.
713        let compaction = selector.pick_compaction(
714            2,
715            compaction_selector_context(
716                &group_config,
717                &levels,
718                &BTreeSet::new(),
719                &mut levels_handlers,
720                &mut local_stats,
721                &HashMap::default(),
722                Arc::new(CompactionDeveloperConfig::default()),
723                &Default::default(),
724                &HummockVersionStateTableInfo::empty(),
725            ),
726        );
727        assert!(compaction.is_none());
728    }
729
730    #[test]
731    fn test_compact_pending_bytes() {
732        let config = CompactionConfigBuilder::new()
733            .max_bytes_for_level_base(100)
734            .max_level(4)
735            .max_bytes_for_level_multiplier(5)
736            .compaction_mode(CompactionMode::Range as i32)
737            .build();
738        let levels = vec![
739            generate_level(1, vec![]),
740            generate_level(2, generate_tables(0..50, 0..1000, 3, 500)),
741            generate_level(3, generate_tables(30..60, 0..1000, 2, 500)),
742            generate_level(4, generate_tables(60..70, 0..1000, 1, 1000)),
743        ];
744        let levels = Levels {
745            levels,
746            l0: generate_l0_nonoverlapping_sublevels(generate_tables(15..25, 0..600, 3, 100)),
747            ..Default::default()
748        };
749
750        let dynamic_level_core = DynamicLevelSelectorCore::new(
751            Arc::new(config),
752            Arc::new(CompactionDeveloperConfig::default()),
753        );
754        let ctx = dynamic_level_core.calculate_level_base_size(&levels);
755        assert_eq!(1, ctx.base_level);
756        assert_eq!(1000, levels.l0.total_file_size); // l0
757        assert_eq!(0, levels.levels.first().unwrap().total_file_size); // l1
758        assert_eq!(25000, levels.levels.get(1).unwrap().total_file_size); // l2
759        assert_eq!(15000, levels.levels.get(2).unwrap().total_file_size); // l3
760        assert_eq!(10000, levels.levels.get(3).unwrap().total_file_size); // l4
761
762        assert_eq!(100, ctx.level_max_bytes[1]); // l1
763        assert_eq!(500, ctx.level_max_bytes[2]); // l2
764        assert_eq!(2500, ctx.level_max_bytes[3]); // l3
765        assert_eq!(12500, ctx.level_max_bytes[4]); // l4
766
767        // l1 pending = (0 + 1000 - 100) * ((25000 / 1000) + 1) + 1000 = 24400
768        // l2 pending = (25000 + 900 - 500) * ((15000 / (25000 + 900)) + 1) = 40110
769        // l3 pending = (15000 + 25400 - 2500) * ((10000 / (15000 + 25400) + 1)) = 47281
770
771        let compact_pending_bytes = dynamic_level_core.compact_pending_bytes_needed(&levels);
772        assert_eq!(24400 + 40110 + 47281, compact_pending_bytes);
773    }
774}