Skip to main content

risingwave_meta/hummock/compaction/selector/
level_selector.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20use std::sync::Arc;
21
22use risingwave_hummock_sdk::HummockCompactionTaskId;
23use risingwave_hummock_sdk::level::Levels;
24use risingwave_pb::hummock::compact_task::PbTaskType;
25use risingwave_pb::hummock::{CompactionConfig, LevelType};
26
27use super::{
28    CompactionSelector, LevelCompactionPicker, TierCompactionPicker, create_compaction_task,
29};
30use crate::hummock::compaction::overlap_strategy::OverlapStrategy;
31use crate::hummock::compaction::picker::{
32    CompactionPicker, CompactionTaskValidator, IntraCompactionPicker, LocalPickerStatistic,
33    MinOverlappingPicker,
34};
35use crate::hummock::compaction::selector::CompactionSelectorContext;
36use crate::hummock::compaction::{
37    CompactionDeveloperConfig, CompactionTask, create_overlap_strategy,
38};
39use crate::hummock::level_handler::LevelHandler;
40
41pub const SCORE_BASE: u64 = 100;
42
43#[derive(Debug, Default, Clone)]
44pub enum PickerType {
45    Tier,
46    Intra,
47    ToBase,
48    #[default]
49    BottomLevel,
50}
51
52impl std::fmt::Display for PickerType {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.write_str(match self {
55            PickerType::Tier => "Tier",
56            PickerType::Intra => "Intra",
57            PickerType::ToBase => "ToBase",
58            PickerType::BottomLevel => "BottomLevel",
59        })
60    }
61}
62
63#[derive(Default, Debug)]
64pub struct PickerInfo {
65    pub score: u64,
66    pub select_level: usize,
67    pub target_level: usize,
68    pub picker_type: PickerType,
69}
70
71#[derive(Default, Debug)]
72pub struct SelectContext {
73    pub level_max_bytes: Vec<u64>,
74
75    // All data will be placed in the last level. When the cluster is empty, the files in L0 will
76    // be compact to `max_level`, and the `max_level` would be `base_level`. When the total
77    // size of the files in  `base_level` reaches its capacity, we will place data in a higher
78    // level, which equals to `base_level -= 1;`.
79    pub base_level: usize,
80    pub score_levels: Vec<PickerInfo>,
81}
82
83pub struct DynamicLevelSelectorCore {
84    config: Arc<CompactionConfig>,
85    developer_config: Arc<CompactionDeveloperConfig>,
86}
87
88#[derive(Default)]
89pub struct DynamicLevelSelector {}
90
91impl DynamicLevelSelectorCore {
92    pub fn new(
93        config: Arc<CompactionConfig>,
94        developer_config: Arc<CompactionDeveloperConfig>,
95    ) -> Self {
96        Self {
97            config,
98            developer_config,
99        }
100    }
101
102    pub fn get_config(&self) -> &CompactionConfig {
103        self.config.as_ref()
104    }
105
106    fn create_compaction_picker(
107        &self,
108        picker_info: &PickerInfo,
109        overlap_strategy: Arc<dyn OverlapStrategy>,
110        compaction_task_validator: Arc<CompactionTaskValidator>,
111    ) -> Box<dyn CompactionPicker> {
112        match picker_info.picker_type {
113            PickerType::Tier => Box::new(TierCompactionPicker::new_with_validator(
114                self.config.clone(),
115                compaction_task_validator,
116            )),
117            PickerType::ToBase => Box::new(LevelCompactionPicker::new_with_validator(
118                picker_info.target_level,
119                self.config.clone(),
120                compaction_task_validator,
121                self.developer_config.clone(),
122            )),
123            PickerType::Intra => Box::new(IntraCompactionPicker::new_with_validator(
124                self.config.clone(),
125                compaction_task_validator,
126                self.developer_config.clone(),
127            )),
128            PickerType::BottomLevel => {
129                assert_eq!(picker_info.select_level + 1, picker_info.target_level);
130                Box::new(MinOverlappingPicker::new(
131                    picker_info.select_level,
132                    picker_info.target_level,
133                    self.config.max_bytes_for_level_base / 2,
134                    self.config.split_weight_by_vnode,
135                    overlap_strategy,
136                ))
137            }
138        }
139    }
140
141    // TODO: calculate this scores in apply compact result.
142    /// `calculate_level_base_size` calculate base level and the base size of LSM tree build for
143    /// current dataset. In other words,  `level_max_bytes` is our compaction goal which shall
144    /// reach. This algorithm refers to the implementation in  `https://github.com/facebook/rocksdb/blob/v7.2.2/db/version_set.cc#L3706`
145    pub fn calculate_level_base_size(&self, levels: &Levels) -> SelectContext {
146        let mut first_non_empty_level = 0;
147        let mut max_level_size = 0;
148        let mut ctx = SelectContext::default();
149
150        for level in &levels.levels {
151            if level.total_file_size > 0 && first_non_empty_level == 0 {
152                first_non_empty_level = level.level_idx as usize;
153            }
154            max_level_size = std::cmp::max(max_level_size, level.total_file_size);
155        }
156
157        ctx.level_max_bytes
158            .resize(self.config.max_level as usize + 1, u64::MAX);
159
160        if max_level_size == 0 {
161            // Use the bottommost level.
162            ctx.base_level = self.config.max_level as usize;
163            return ctx;
164        }
165
166        let base_bytes_max = self.config.max_bytes_for_level_base;
167        let base_bytes_min = base_bytes_max / self.config.max_bytes_for_level_multiplier;
168
169        let mut cur_level_size = max_level_size;
170        for _ in first_non_empty_level..self.config.max_level as usize {
171            cur_level_size /= self.config.max_bytes_for_level_multiplier;
172        }
173
174        let base_level_size = if cur_level_size <= base_bytes_min {
175            // Case 1. If we make target size of last level to be max_level_size,
176            // target size of the first non-empty level would be smaller than
177            // base_bytes_min. We set it be base_bytes_min.
178            ctx.base_level = first_non_empty_level;
179            base_bytes_min + 1
180        } else {
181            ctx.base_level = first_non_empty_level;
182            while ctx.base_level > 1 && cur_level_size > base_bytes_max {
183                ctx.base_level -= 1;
184                cur_level_size /= self.config.max_bytes_for_level_multiplier;
185            }
186            std::cmp::min(base_bytes_max, cur_level_size)
187        };
188
189        let level_multiplier = self.config.max_bytes_for_level_multiplier as f64;
190        let mut level_size = base_level_size;
191        for i in ctx.base_level..=self.config.max_level as usize {
192            // Don't set any level below base_bytes_max. Otherwise, the LSM can
193            // assume an hourglass shape where L1+ sizes are smaller than L0. This
194            // causes compaction scoring, which depends on level sizes, to favor L1+
195            // at the expense of L0, which may fill up and stall.
196            ctx.level_max_bytes[i] = std::cmp::max(level_size, base_bytes_max);
197            level_size = (level_size as f64 * level_multiplier) as u64;
198        }
199        ctx
200    }
201
202    pub(crate) fn get_priority_levels(
203        &self,
204        levels: &Levels,
205        handlers: &[LevelHandler],
206    ) -> SelectContext {
207        let mut ctx = self.calculate_level_base_size(levels);
208
209        let l0_file_count = levels
210            .l0
211            .sub_levels
212            .iter()
213            .map(|sub_level| sub_level.table_infos.len())
214            .sum::<usize>();
215
216        let idle_file_count = match l0_file_count.checked_sub(handlers[0].pending_file_count()) {
217            Some(count) => count,
218            None => {
219                // If the number of files in L0 is less than the number of pending files, it means
220                // that may be encountered some issue, we can work around it.
221                tracing::warn!(
222                    "The number of files in L0 {} is less than the number of pending files {} group {} pending_tasks_ids {:?} compacting_files {:?}",
223                    l0_file_count,
224                    handlers[0].pending_file_count(),
225                    levels.group_id,
226                    handlers[0].pending_tasks_ids(),
227                    handlers[0].compacting_files()
228                );
229
230                0
231            }
232        };
233
234        if idle_file_count > 0 {
235            // trigger l0 compaction when the number of files is too large.
236
237            // The read query at the overlapping level needs to merge all the ssts, so the number of
238            // ssts is the most important factor affecting the read performance, we use file count
239            // to calculate the score
240            let overlapping_file_count = levels
241                .l0
242                .sub_levels
243                .iter()
244                .filter(|level| level.level_type == LevelType::Overlapping)
245                .map(|level| level.table_infos.len())
246                .sum::<usize>();
247            if overlapping_file_count > 0 {
248                // FIXME: use overlapping idle file count
249                let l0_overlapping_score =
250                    std::cmp::min(idle_file_count, overlapping_file_count) as u64 * SCORE_BASE
251                        / self.config.level0_tier_compact_file_number;
252                // Reduce the level num of l0 overlapping sub_level
253                ctx.score_levels.push(PickerInfo {
254                    score: std::cmp::max(l0_overlapping_score, SCORE_BASE + 1),
255                    select_level: 0,
256                    target_level: 0,
257                    picker_type: PickerType::Tier,
258                })
259            }
260
261            // The read query at the non-overlapping level only selects ssts that match the query
262            // range at each level, so the number of levels is the most important factor affecting
263            // the read performance. At the same time, the size factor is also added to the score
264            // calculation rule to avoid unbalanced compact task due to large size.
265            let total_size = levels
266                .l0
267                .sub_levels
268                .iter()
269                .filter(|level| {
270                    level.vnode_partition_count == self.config.split_weight_by_vnode
271                        && level.level_type == LevelType::Nonoverlapping
272                })
273                .map(|level| level.total_file_size)
274                .sum::<u64>()
275                .saturating_sub(handlers[0].pending_output_file_size(ctx.base_level as u32));
276            let base_level_size = levels.get_level(ctx.base_level).total_file_size;
277            let base_level_sst_count = levels.get_level(ctx.base_level).table_infos.len() as u64;
278
279            // size limit
280            let non_overlapping_size_score = total_size * SCORE_BASE
281                / std::cmp::max(self.config.max_bytes_for_level_base, base_level_size);
282            // level count limit
283            let non_overlapping_level_count = levels
284                .l0
285                .sub_levels
286                .iter()
287                .filter(|level| level.level_type == LevelType::Nonoverlapping)
288                .count() as u64;
289            let non_overlapping_level_score = non_overlapping_level_count * SCORE_BASE
290                / std::cmp::max(
291                    base_level_sst_count / 16,
292                    self.config.level0_sub_level_compact_level_count as u64,
293                );
294
295            let non_overlapping_score =
296                std::cmp::max(non_overlapping_size_score, non_overlapping_level_score);
297
298            // Reduce the level num of l0 non-overlapping sub_level
299            if non_overlapping_size_score > SCORE_BASE {
300                ctx.score_levels.push(PickerInfo {
301                    score: non_overlapping_score + 1,
302                    select_level: 0,
303                    target_level: ctx.base_level,
304                    picker_type: PickerType::ToBase,
305                });
306            }
307
308            if non_overlapping_level_score > SCORE_BASE {
309                // FIXME: more accurate score calculation algorithm will be introduced (#11903)
310                ctx.score_levels.push(PickerInfo {
311                    score: non_overlapping_score,
312                    select_level: 0,
313                    target_level: 0,
314                    picker_type: PickerType::Intra,
315                });
316            }
317        }
318
319        // The bottommost level can not be input level.
320        for level in &levels.levels {
321            let level_idx = level.level_idx as usize;
322            if level_idx < ctx.base_level || level_idx >= self.config.max_level as usize {
323                continue;
324            }
325            let output_file_size =
326                handlers[level_idx].pending_output_file_size(level.level_idx + 1);
327            let total_size = level.total_file_size.saturating_sub(output_file_size);
328            if total_size == 0 {
329                continue;
330            }
331
332            ctx.score_levels.push({
333                PickerInfo {
334                    score: total_size * SCORE_BASE / ctx.level_max_bytes[level_idx],
335                    select_level: level_idx,
336                    target_level: level_idx + 1,
337                    picker_type: PickerType::BottomLevel,
338                }
339            });
340        }
341
342        // sort reverse to pick the largest one.
343        ctx.score_levels.sort_by(|a, b| {
344            b.score
345                .cmp(&a.score)
346                .then_with(|| a.target_level.cmp(&b.target_level))
347        });
348        ctx
349    }
350
351    /// `compact_pending_bytes_needed` calculates the number of compact bytes needed to balance the
352    /// LSM Tree from the current state of each level in the LSM Tree in combination with
353    /// `compaction_config`
354    /// This algorithm refers to the implementation in  `https://github.com/facebook/rocksdb/blob/main/db/version_set.cc#L3141`
355    pub fn compact_pending_bytes_needed(&self, levels: &Levels) -> u64 {
356        let ctx = self.calculate_level_base_size(levels);
357        self.compact_pending_bytes_needed_with_ctx(levels, &ctx)
358    }
359
360    pub fn compact_pending_bytes_needed_with_ctx(
361        &self,
362        levels: &Levels,
363        ctx: &SelectContext,
364    ) -> u64 {
365        // l0
366        let mut compact_pending_bytes = 0;
367        let mut compact_to_next_level_bytes = 0;
368        let l0_size = levels
369            .l0
370            .sub_levels
371            .iter()
372            .map(|sub_level| sub_level.total_file_size)
373            .sum::<u64>();
374
375        let mut l0_compaction_trigger = false;
376        if l0_size > self.config.max_bytes_for_level_base {
377            compact_pending_bytes = l0_size;
378            compact_to_next_level_bytes = l0_size;
379            l0_compaction_trigger = true;
380        }
381
382        // l1 and up
383        let mut level_bytes;
384        let mut next_level_bytes = 0;
385        for level in &levels.levels[ctx.base_level - 1..levels.levels.len()] {
386            let level_index = level.level_idx as usize;
387
388            if next_level_bytes > 0 {
389                level_bytes = next_level_bytes;
390                next_level_bytes = 0;
391            } else {
392                level_bytes = level.total_file_size;
393            }
394
395            if level_index == ctx.base_level && l0_compaction_trigger {
396                compact_pending_bytes += level_bytes;
397            }
398
399            level_bytes += compact_to_next_level_bytes;
400            compact_to_next_level_bytes = 0;
401            let level_target = ctx.level_max_bytes[level_index];
402            if level_bytes > level_target {
403                compact_to_next_level_bytes = level_bytes - level_target;
404
405                // Estimate the actual compaction fan-out ratio as size ratio between
406                // the two levels.
407                assert_eq!(0, next_level_bytes);
408                if level_index + 1 < ctx.level_max_bytes.len() {
409                    let next_level = level_index + 1;
410                    next_level_bytes = levels.levels[next_level - 1].total_file_size;
411                }
412
413                if next_level_bytes > 0 {
414                    compact_pending_bytes += (compact_to_next_level_bytes as f64
415                        * (next_level_bytes as f64 / level_bytes as f64 + 1.0))
416                        as u64;
417                }
418            }
419        }
420
421        compact_pending_bytes
422    }
423}
424
425impl CompactionSelector for DynamicLevelSelector {
426    fn pick_compaction(
427        &mut self,
428        task_id: HummockCompactionTaskId,
429        context: CompactionSelectorContext<'_>,
430    ) -> Option<CompactionTask> {
431        let CompactionSelectorContext {
432            group: compaction_group,
433            levels,
434            level_handlers,
435            selector_stats,
436            developer_config,
437            in_progress_compactions,
438            ..
439        } = context;
440        let dynamic_level_core = DynamicLevelSelectorCore::new(
441            compaction_group.compaction_config.clone(),
442            developer_config,
443        );
444        let overlap_strategy =
445            create_overlap_strategy(compaction_group.compaction_config.compaction_mode());
446        let ctx = dynamic_level_core.get_priority_levels(levels, level_handlers);
447        // TODO: Determine which rule to enable by write limit
448        let compaction_task_validator = Arc::new(CompactionTaskValidator::new(
449            compaction_group.compaction_config.clone(),
450        ));
451        for picker_info in &ctx.score_levels {
452            if picker_info.score <= SCORE_BASE {
453                return None;
454            }
455            let mut picker = dynamic_level_core.create_compaction_picker(
456                picker_info,
457                overlap_strategy.clone(),
458                compaction_task_validator.clone(),
459            );
460
461            let mut stats = LocalPickerStatistic::default();
462            if let Some(ret) = picker.pick_compaction(levels, level_handlers, &mut stats) {
463                if !ret.skip_target_range_conflict_check
464                    && in_progress_compactions.has_conflict_with_input(&ret)
465                {
466                    stats.skip_by_overlapping += 1;
467                    selector_stats.skip_picker.push((
468                        picker_info.select_level,
469                        picker_info.target_level,
470                        stats,
471                    ));
472                    continue;
473                }
474
475                ret.add_pending_task(task_id, level_handlers);
476                return Some(create_compaction_task(
477                    dynamic_level_core.get_config(),
478                    ret,
479                    ctx.base_level,
480                    self.task_type(),
481                ));
482            }
483            selector_stats.skip_picker.push((
484                picker_info.select_level,
485                picker_info.target_level,
486                stats,
487            ));
488        }
489        None
490    }
491
492    fn name(&self) -> &'static str {
493        "DynamicLevelSelector"
494    }
495
496    fn task_type(&self) -> PbTaskType {
497        PbTaskType::Dynamic
498    }
499}
500
501#[cfg(test)]
502pub mod tests {
503    use std::collections::{BTreeSet, HashMap};
504    use std::sync::Arc;
505
506    use itertools::Itertools;
507    use risingwave_common::constants::hummock::CompactionFilterFlag;
508    use risingwave_hummock_sdk::HummockCompactionTaskId;
509    use risingwave_hummock_sdk::compact_task::{CompactTask, CompactTaskAssignment};
510    use risingwave_hummock_sdk::level::{InputLevel, Levels};
511    use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
512    use risingwave_pb::hummock::LevelType;
513    use risingwave_pb::hummock::compaction_config::CompactionMode;
514
515    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
516    use crate::hummock::compaction::in_progress_compaction::InProgressCompactionView;
517    use crate::hummock::compaction::selector::tests::{
518        assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level,
519        generate_table, generate_tables, push_tables_level0_nonoverlapping,
520    };
521    use crate::hummock::compaction::selector::{
522        CompactionSelector, CompactionSelectorContext, DynamicLevelSelector,
523        DynamicLevelSelectorCore, LocalSelectorStatistic,
524    };
525    use crate::hummock::compaction::{CompactionDeveloperConfig, CompactionTask};
526    use crate::hummock::level_handler::LevelHandler;
527    use crate::hummock::model::CompactionGroup;
528    use crate::hummock::test_utils::compaction_selector_context;
529
530    fn pick_compaction_with_in_progress(
531        selector: &mut DynamicLevelSelector,
532        task_id: HummockCompactionTaskId,
533        group: &CompactionGroup,
534        levels: &Levels,
535        level_handlers: &mut [LevelHandler],
536        selector_stats: &mut LocalSelectorStatistic,
537        in_progress_compactions: &InProgressCompactionView,
538    ) -> Option<CompactionTask> {
539        selector.pick_compaction(
540            task_id,
541            CompactionSelectorContext {
542                group,
543                levels,
544                member_table_ids: &BTreeSet::new(),
545                level_handlers,
546                selector_stats,
547                table_id_to_options: &HashMap::default(),
548                developer_config: Arc::new(CompactionDeveloperConfig::default()),
549                table_watermarks: &HashMap::default(),
550                state_table_info: &HummockVersionStateTableInfo::empty(),
551                in_progress_compactions,
552            },
553        )
554    }
555
556    #[test]
557    fn test_dynamic_level() {
558        let config = CompactionConfigBuilder::new()
559            .max_bytes_for_level_base(100)
560            .max_level(4)
561            .max_bytes_for_level_multiplier(5)
562            .max_compaction_bytes(1)
563            .level0_tier_compact_file_number(2)
564            .compaction_mode(CompactionMode::Range as i32)
565            .build();
566        let selector = DynamicLevelSelectorCore::new(
567            Arc::new(config),
568            Arc::new(CompactionDeveloperConfig::default()),
569        );
570        let levels = vec![
571            generate_level(1, vec![]),
572            generate_level(2, generate_tables(0..5, 0..1000, 3, 10)),
573            generate_level(3, generate_tables(5..10, 0..1000, 2, 50)),
574            generate_level(4, generate_tables(10..15, 0..1000, 1, 200)),
575        ];
576        let mut levels = Levels {
577            levels,
578            l0: generate_l0_nonoverlapping_sublevels(vec![]),
579            ..Default::default()
580        };
581        let ctx = selector.calculate_level_base_size(&levels);
582        assert_eq!(ctx.base_level, 2);
583        assert_eq!(ctx.level_max_bytes[2], 100);
584        assert_eq!(ctx.level_max_bytes[3], 200);
585        assert_eq!(ctx.level_max_bytes[4], 1000);
586
587        levels.levels[3]
588            .table_infos
589            .append(&mut generate_tables(15..20, 2000..3000, 1, 400));
590        levels.levels[3].total_file_size = levels.levels[3]
591            .table_infos
592            .iter()
593            .map(|sst| sst.sst_size)
594            .sum::<u64>();
595
596        let ctx = selector.calculate_level_base_size(&levels);
597        // data size increase, so we need increase one level to place more data.
598        assert_eq!(ctx.base_level, 1);
599        assert_eq!(ctx.level_max_bytes[1], 100);
600        assert_eq!(ctx.level_max_bytes[2], 120);
601        assert_eq!(ctx.level_max_bytes[3], 600);
602        assert_eq!(ctx.level_max_bytes[4], 3000);
603
604        // append a large data to L0 but it does not change the base size of LSM tree.
605        push_tables_level0_nonoverlapping(&mut levels, generate_tables(20..26, 0..1000, 1, 100));
606
607        let ctx = selector.calculate_level_base_size(&levels);
608        assert_eq!(ctx.base_level, 1);
609        assert_eq!(ctx.level_max_bytes[1], 100);
610        assert_eq!(ctx.level_max_bytes[2], 120);
611        assert_eq!(ctx.level_max_bytes[3], 600);
612        assert_eq!(ctx.level_max_bytes[4], 3000);
613
614        levels.l0.sub_levels.clear();
615        levels.l0.total_file_size = 0;
616        levels.levels[0].table_infos = generate_tables(26..32, 0..1000, 1, 100);
617        levels.levels[0].total_file_size = levels.levels[0]
618            .table_infos
619            .iter()
620            .map(|sst| sst.sst_size)
621            .sum::<u64>();
622
623        let ctx = selector.calculate_level_base_size(&levels);
624        assert_eq!(ctx.base_level, 1);
625        assert_eq!(ctx.level_max_bytes[1], 100);
626        assert_eq!(ctx.level_max_bytes[2], 120);
627        assert_eq!(ctx.level_max_bytes[3], 600);
628        assert_eq!(ctx.level_max_bytes[4], 3000);
629    }
630
631    #[test]
632    fn test_pick_compaction() {
633        let config = CompactionConfigBuilder::new()
634            .max_bytes_for_level_base(200)
635            .max_level(4)
636            .max_bytes_for_level_multiplier(5)
637            .target_file_size_base(5)
638            .max_compaction_bytes(10000)
639            .level0_tier_compact_file_number(4)
640            .compaction_mode(CompactionMode::Range as i32)
641            .level0_sub_level_compact_level_count(3)
642            .build();
643        let group_config = CompactionGroup::new(1, config.clone());
644        let levels = vec![
645            generate_level(1, vec![]),
646            generate_level(2, generate_tables(0..5, 0..1000, 3, 10)),
647            generate_level(3, generate_tables(5..10, 0..1000, 2, 50)),
648            generate_level(4, generate_tables(10..15, 0..1000, 1, 200)),
649        ];
650        let mut levels = Levels {
651            levels,
652            l0: generate_l0_nonoverlapping_sublevels(generate_tables(15..25, 0..600, 3, 10)),
653            ..Default::default()
654        };
655
656        let mut selector = DynamicLevelSelector::default();
657        let mut levels_handlers = (0..5).map(LevelHandler::new).collect_vec();
658        let mut local_stats = LocalSelectorStatistic::default();
659        let compaction = selector
660            .pick_compaction(
661                1,
662                compaction_selector_context(
663                    &group_config,
664                    &levels,
665                    &BTreeSet::new(),
666                    &mut levels_handlers,
667                    &mut local_stats,
668                    &HashMap::default(),
669                    Arc::new(CompactionDeveloperConfig::default()),
670                    &Default::default(),
671                    &HummockVersionStateTableInfo::empty(),
672                ),
673            )
674            .unwrap();
675        assert_compaction_task(&compaction, &levels_handlers);
676
677        let compaction_filter_flag = CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL;
678        let config = CompactionConfigBuilder::with_config(config)
679            .max_bytes_for_level_base(100)
680            .sub_level_max_compaction_bytes(50)
681            .target_file_size_base(20)
682            .compaction_filter_mask(compaction_filter_flag.into())
683            .build();
684        let group_config = CompactionGroup::new(1, config.clone());
685        let mut selector = DynamicLevelSelector::default();
686
687        levels.l0.sub_levels.clear();
688        levels.l0.total_file_size = 0;
689        push_tables_level0_nonoverlapping(&mut levels, generate_tables(15..25, 0..600, 3, 20));
690        let mut levels_handlers = (0..5).map(LevelHandler::new).collect_vec();
691        let compaction = selector
692            .pick_compaction(
693                1,
694                compaction_selector_context(
695                    &group_config,
696                    &levels,
697                    &BTreeSet::new(),
698                    &mut levels_handlers,
699                    &mut local_stats,
700                    &HashMap::default(),
701                    Arc::new(CompactionDeveloperConfig::default()),
702                    &Default::default(),
703                    &HummockVersionStateTableInfo::empty(),
704                ),
705            )
706            .unwrap();
707        assert_compaction_task(&compaction, &levels_handlers);
708        assert_eq!(compaction.input.input_levels[0].level_idx, 0);
709        assert_eq!(compaction.input.target_level, 2);
710
711        levels_handlers[0].remove_task(1);
712        levels_handlers[2].remove_task(1);
713        levels.l0.sub_levels.clear();
714        levels.levels[1].table_infos = generate_tables(20..30, 0..1000, 3, 10);
715        let compaction = selector
716            .pick_compaction(
717                2,
718                compaction_selector_context(
719                    &group_config,
720                    &levels,
721                    &BTreeSet::new(),
722                    &mut levels_handlers,
723                    &mut local_stats,
724                    &HashMap::default(),
725                    Arc::new(CompactionDeveloperConfig::default()),
726                    &Default::default(),
727                    &HummockVersionStateTableInfo::empty(),
728                ),
729            )
730            .unwrap();
731        assert_compaction_task(&compaction, &levels_handlers);
732        assert_eq!(compaction.input.input_levels[0].level_idx, 3);
733        assert_eq!(compaction.input.target_level, 4);
734        assert_eq!(
735            compaction.input.input_levels[0]
736                .table_infos
737                .iter()
738                .map(|sst| sst.sst_id)
739                .collect_vec(),
740            vec![5]
741        );
742        assert_eq!(
743            compaction.input.input_levels[1]
744                .table_infos
745                .iter()
746                .map(|sst| sst.sst_id)
747                .collect_vec(),
748            vec![10]
749        );
750        assert_eq!(
751            compaction.target_file_size,
752            config.target_file_size_base * 2
753        );
754        assert_eq!(compaction.compression_algorithm.as_str(), "Lz4",);
755        // no compaction need to be scheduled because we do not calculate the size of pending files
756        // to score.
757        let compaction = selector.pick_compaction(
758            2,
759            compaction_selector_context(
760                &group_config,
761                &levels,
762                &BTreeSet::new(),
763                &mut levels_handlers,
764                &mut local_stats,
765                &HashMap::default(),
766                Arc::new(CompactionDeveloperConfig::default()),
767                &Default::default(),
768                &HummockVersionStateTableInfo::empty(),
769            ),
770        );
771        assert!(compaction.is_none());
772    }
773
774    #[test]
775    fn test_trivial_move_skips_in_progress_target_overlap() {
776        let config = CompactionConfigBuilder::new()
777            .max_bytes_for_level_base(1000)
778            .max_bytes_for_level_multiplier(10)
779            .max_level(4)
780            .max_compaction_bytes(10000)
781            .level0_sub_level_compact_level_count(20)
782            .sst_allowed_trivial_move_min_size(Some(0))
783            .sst_allowed_trivial_move_max_count(Some(10))
784            .compaction_mode(CompactionMode::Range as i32)
785            .build();
786        let group_config = CompactionGroup::new(9, config);
787        let levels = Levels {
788            levels: vec![
789                generate_level(1, vec![]),
790                generate_level(2, vec![]),
791                generate_level(3, vec![]),
792                generate_level(4, vec![generate_table(878784, 1, 565, 633, 1)]),
793            ],
794            l0: generate_l0_nonoverlapping_sublevels(vec![
795                generate_table(877832, 1, 706, 718, 1),
796                generate_table(877833, 1, 800, 2000, 1),
797            ]),
798            ..Default::default()
799        };
800        let in_progress = InProgressCompactionView::for_group(
801            &[CompactTaskAssignment {
802                compact_task: CompactTask {
803                    task_id: 15040,
804                    compaction_group_id: 9.into(),
805                    target_level: 4,
806                    input_ssts: vec![
807                        InputLevel {
808                            level_idx: 0,
809                            level_type: LevelType::Nonoverlapping,
810                            table_infos: vec![generate_table(881160, 1, 592, 722, 1)],
811                        },
812                        InputLevel {
813                            level_idx: 4,
814                            level_type: LevelType::Nonoverlapping,
815                            table_infos: vec![generate_table(878784, 1, 565, 633, 1)],
816                        },
817                    ],
818                    ..Default::default()
819                },
820                context_id: 1.into(),
821            }],
822            9.into(),
823        );
824
825        let mut selector = DynamicLevelSelector::default();
826        let mut levels_handlers = (0..5).map(LevelHandler::new).collect_vec();
827        let mut local_stats = LocalSelectorStatistic::default();
828        let empty_in_progress = InProgressCompactionView::default();
829        let compaction = pick_compaction_with_in_progress(
830            &mut selector,
831            1,
832            &group_config,
833            &levels,
834            &mut levels_handlers,
835            &mut local_stats,
836            &empty_in_progress,
837        )
838        .unwrap();
839        assert_eq!(compaction.input.target_level, 4);
840        assert!(compaction.input.input_levels[1].table_infos.is_empty());
841        assert!(
842            compaction.input.input_levels[0]
843                .table_infos
844                .iter()
845                .any(|sst| sst.sst_id.as_raw_id() == 877832)
846        );
847
848        let mut selector = DynamicLevelSelector::default();
849        let mut levels_handlers = (0..5).map(LevelHandler::new).collect_vec();
850        let mut local_stats = LocalSelectorStatistic::default();
851        assert!(
852            pick_compaction_with_in_progress(
853                &mut selector,
854                2,
855                &group_config,
856                &levels,
857                &mut levels_handlers,
858                &mut local_stats,
859                &in_progress,
860            )
861            .is_none()
862        );
863        assert_eq!(local_stats.skip_picker.len(), 1);
864        assert_eq!(local_stats.skip_picker[0].2.skip_by_overlapping, 1);
865    }
866
867    #[test]
868    fn test_compact_pending_bytes() {
869        let config = CompactionConfigBuilder::new()
870            .max_bytes_for_level_base(100)
871            .max_level(4)
872            .max_bytes_for_level_multiplier(5)
873            .compaction_mode(CompactionMode::Range as i32)
874            .build();
875        let levels = vec![
876            generate_level(1, vec![]),
877            generate_level(2, generate_tables(0..50, 0..1000, 3, 500)),
878            generate_level(3, generate_tables(30..60, 0..1000, 2, 500)),
879            generate_level(4, generate_tables(60..70, 0..1000, 1, 1000)),
880        ];
881        let levels = Levels {
882            levels,
883            l0: generate_l0_nonoverlapping_sublevels(generate_tables(15..25, 0..600, 3, 100)),
884            ..Default::default()
885        };
886
887        let dynamic_level_core = DynamicLevelSelectorCore::new(
888            Arc::new(config),
889            Arc::new(CompactionDeveloperConfig::default()),
890        );
891        let ctx = dynamic_level_core.calculate_level_base_size(&levels);
892        assert_eq!(1, ctx.base_level);
893        assert_eq!(1000, levels.l0.total_file_size); // l0
894        assert_eq!(0, levels.levels.first().unwrap().total_file_size); // l1
895        assert_eq!(25000, levels.levels.get(1).unwrap().total_file_size); // l2
896        assert_eq!(15000, levels.levels.get(2).unwrap().total_file_size); // l3
897        assert_eq!(10000, levels.levels.get(3).unwrap().total_file_size); // l4
898
899        assert_eq!(100, ctx.level_max_bytes[1]); // l1
900        assert_eq!(500, ctx.level_max_bytes[2]); // l2
901        assert_eq!(2500, ctx.level_max_bytes[3]); // l3
902        assert_eq!(12500, ctx.level_max_bytes[4]); // l4
903
904        // l1 pending = (0 + 1000 - 100) * ((25000 / 1000) + 1) + 1000 = 24400
905        // l2 pending = (25000 + 900 - 500) * ((15000 / (25000 + 900)) + 1) = 40110
906        // l3 pending = (15000 + 25400 - 2500) * ((10000 / (15000 + 25400) + 1)) = 47281
907
908        let compact_pending_bytes = dynamic_level_core.compact_pending_bytes_needed(&levels);
909        assert_eq!(24400 + 40110 + 47281, compact_pending_bytes);
910    }
911}