risingwave_meta/hummock/compaction/picker/
emergency_compaction_picker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_hummock_sdk::level::Levels;
18use risingwave_pb::hummock::{CompactionConfig, LevelType};
19
20use super::{
21    CompactionInput, CompactionPicker, CompactionTaskValidator, LevelCompactionPicker,
22    LocalPickerStatistic, TierCompactionPicker,
23};
24use crate::hummock::compaction::CompactionDeveloperConfig;
25use crate::hummock::compaction::picker::intra_compaction_picker::WholeLevelCompactionPicker;
26use crate::hummock::level_handler::LevelHandler;
27
28pub struct EmergencyCompactionPicker {
29    target_level: usize,
30    config: Arc<CompactionConfig>,
31    developer_config: Arc<CompactionDeveloperConfig>,
32}
33
34impl EmergencyCompactionPicker {
35    pub fn new(
36        target_level: usize,
37        config: Arc<CompactionConfig>,
38        developer_config: Arc<CompactionDeveloperConfig>,
39    ) -> Self {
40        Self {
41            target_level,
42            config,
43            developer_config,
44        }
45    }
46
47    pub fn pick_compaction(
48        &self,
49        levels: &Levels,
50        level_handlers: &[LevelHandler],
51        stats: &mut LocalPickerStatistic,
52    ) -> Option<CompactionInput> {
53        let unused_validator = Arc::new(CompactionTaskValidator::unused());
54        let l0 = &levels.l0;
55        let overlapping_count = l0
56            .sub_levels
57            .iter()
58            .filter(|level| level.level_type == LevelType::Overlapping)
59            .count();
60        let no_overlap_count = l0
61            .sub_levels
62            .iter()
63            .filter(|level| {
64                level.level_type == LevelType::Nonoverlapping && level.vnode_partition_count == 0
65            })
66            .count();
67        let partitioned_count = l0
68            .sub_levels
69            .iter()
70            .filter(|level| {
71                level.level_type == LevelType::Nonoverlapping && level.vnode_partition_count > 0
72            })
73            .count();
74        // We trigger `EmergencyCompactionPicker` only when some unexpected condition cause the number of l0 levels increase and the origin strategy
75        // can not compact those data to lower level. But if most of these levels are overlapping level, it is dangerous to compact small data of non-overlapping sub level
76        // to base level, it will cost a lot of compactor resource because of large write-amplification.
77        if (self.config.split_weight_by_vnode == 0 && no_overlap_count > overlapping_count)
78            || (self.config.split_weight_by_vnode > 0
79                && partitioned_count > no_overlap_count
80                && partitioned_count > overlapping_count)
81        {
82            let mut base_level_compaction_picker = LevelCompactionPicker::new_with_validator(
83                self.target_level,
84                self.config.clone(),
85                unused_validator.clone(),
86                self.developer_config.clone(),
87            );
88
89            if let Some(ret) =
90                base_level_compaction_picker.pick_compaction(levels, level_handlers, stats)
91            {
92                return Some(ret);
93            }
94        }
95        if self.config.split_weight_by_vnode > 0
96            && no_overlap_count > partitioned_count
97            && no_overlap_count > overlapping_count
98        {
99            let intral_level_compaction_picker =
100                WholeLevelCompactionPicker::new(self.config.clone(), unused_validator.clone());
101
102            if let Some(ret) = intral_level_compaction_picker.pick_whole_level(
103                &levels.l0,
104                &level_handlers[0],
105                self.config.split_weight_by_vnode,
106                stats,
107            ) {
108                return Some(ret);
109            }
110        }
111        let mut tier_compaction_picker =
112            TierCompactionPicker::new_with_validator(self.config.clone(), unused_validator);
113
114        tier_compaction_picker.pick_compaction(levels, level_handlers, stats)
115    }
116}