risingwave_meta/hummock/compaction/picker/
compaction_task_validator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use risingwave_common::config::default::compaction_config;
19use risingwave_pb::hummock::CompactionConfig;
20
21use super::{CompactionInput, LocalPickerStatistic};
22
23#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
24pub enum ValidationRuleType {
25    Tier = 0,
26    Intra = 1,
27    ToBase = 2,
28}
29
30pub struct CompactionTaskValidator {
31    validation_rules: HashMap<ValidationRuleType, Box<dyn CompactionTaskValidationRule>>,
32}
33
34impl CompactionTaskValidator {
35    pub fn new(config: Arc<CompactionConfig>) -> Self {
36        let mut validation_rules: HashMap<
37            ValidationRuleType,
38            Box<dyn CompactionTaskValidationRule>,
39        > = HashMap::default();
40
41        validation_rules.insert(
42            ValidationRuleType::Tier,
43            Box::new(TierCompactionTaskValidationRule {
44                config: config.clone(),
45            }),
46        );
47
48        validation_rules.insert(
49            ValidationRuleType::Intra,
50            Box::new(IntraCompactionTaskValidationRule {
51                config: config.clone(),
52            }),
53        );
54
55        validation_rules.insert(
56            ValidationRuleType::ToBase,
57            Box::new(BaseCompactionTaskValidationRule { config }),
58        );
59
60        CompactionTaskValidator { validation_rules }
61    }
62
63    pub fn unused() -> Self {
64        CompactionTaskValidator {
65            validation_rules: HashMap::default(),
66        }
67    }
68
69    pub fn valid_compact_task(
70        &self,
71        input: &CompactionInput,
72        picker_type: ValidationRuleType,
73        stats: &mut LocalPickerStatistic,
74    ) -> bool {
75        if let Some(validation_rule) = self.validation_rules.get(&picker_type) {
76            validation_rule.validate(input, stats)
77        } else {
78            true
79        }
80    }
81
82    pub fn is_enable(&self) -> bool {
83        !self.validation_rules.is_empty()
84    }
85}
86
87pub trait CompactionTaskValidationRule {
88    fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool;
89}
90
91struct TierCompactionTaskValidationRule {
92    config: Arc<CompactionConfig>,
93}
94
95impl CompactionTaskValidationRule for TierCompactionTaskValidationRule {
96    fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool {
97        if input.total_file_count >= self.config.level0_max_compact_file_number
98            || input.input_levels.len()
99                >= self
100                    .config
101                    .max_l0_compact_level_count
102                    .unwrap_or(compaction_config::max_l0_compact_level_count())
103                    as usize
104        {
105            return true;
106        }
107
108        // so the design here wants to merge multiple overlapping-levels in one compaction
109        let max_compaction_bytes = std::cmp::min(
110            self.config.max_compaction_bytes,
111            self.config.sub_level_max_compaction_bytes
112                * self.config.level0_overlapping_sub_level_compact_level_count as u64,
113        );
114
115        // If waiting_enough_files is not satisfied, we will raise the priority of the number of
116        // levels to ensure that we can merge as many sub_levels as possible
117        let tier_sub_level_compact_level_count =
118            self.config.level0_overlapping_sub_level_compact_level_count as usize;
119        if input.input_levels.len() < tier_sub_level_compact_level_count
120            && input.select_input_size < max_compaction_bytes
121        {
122            stats.skip_by_count_limit += 1;
123            return false;
124        }
125        true
126    }
127}
128
129struct IntraCompactionTaskValidationRule {
130    config: Arc<CompactionConfig>,
131}
132
133impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule {
134    fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool {
135        if (input.total_file_count >= self.config.level0_max_compact_file_number
136            && input.input_levels.len() > 1)
137            || input.input_levels.len()
138                >= self
139                    .config
140                    .max_l0_compact_level_count
141                    .unwrap_or(compaction_config::max_l0_compact_level_count())
142                    as usize
143        {
144            return true;
145        }
146
147        let intra_sub_level_compact_level_count =
148            self.config.level0_sub_level_compact_level_count as usize;
149
150        if input.input_levels.len() < intra_sub_level_compact_level_count {
151            stats.skip_by_count_limit += 1;
152            return false;
153        }
154
155        let mut max_level_size = 0;
156        for select_level in &input.input_levels {
157            let level_select_size = select_level
158                .table_infos
159                .iter()
160                .map(|sst| sst.sst_size)
161                .sum::<u64>();
162
163            max_level_size = std::cmp::max(max_level_size, level_select_size);
164        }
165
166        // This limitation would keep our write-amplification no more than
167        // ln(max_compaction_bytes/flush_level_bytes) /
168        // ln(self.config.level0_sub_level_compact_level_count/2) Here we only use half
169        // of level0_sub_level_compact_level_count just for convenient.
170        let is_write_amp_large =
171            max_level_size * self.config.level0_sub_level_compact_level_count as u64 / 2
172                >= input.select_input_size;
173
174        if is_write_amp_large {
175            stats.skip_by_write_amp_limit += 1;
176            return false;
177        }
178
179        true
180    }
181}
182
183struct BaseCompactionTaskValidationRule {
184    config: Arc<CompactionConfig>,
185}
186
187impl CompactionTaskValidationRule for BaseCompactionTaskValidationRule {
188    fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool {
189        if input.total_file_count >= self.config.level0_max_compact_file_number
190            || input.input_levels.len()
191                >= self
192                    .config
193                    .max_l0_compact_level_count
194                    .unwrap_or(compaction_config::max_l0_compact_level_count())
195                    as usize
196        {
197            return true;
198        }
199
200        // The size of target level may be too large, we shall skip this compact task and wait
201        //  the data in base level compact to lower level.
202        if input.target_input_size > self.config.max_compaction_bytes {
203            stats.skip_by_count_limit += 1;
204            return false;
205        }
206
207        if input.select_input_size < input.target_input_size {
208            stats.skip_by_write_amp_limit += 1;
209            return false;
210        }
211
212        true
213    }
214}