Skip to main content

risingwave_meta/hummock/compaction/
compaction_config.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::config::CompactionConfig as CompactionConfigOpt;
16use risingwave_common::config::meta::default::compaction_config;
17use risingwave_hummock_sdk::filter_utils::{
18    parse_sstable_filter_layout, parse_sstable_filter_type,
19};
20use risingwave_pb::hummock::CompactionConfig;
21use risingwave_pb::hummock::compaction_config::CompactionMode;
22
23pub struct CompactionConfigBuilder {
24    config: CompactionConfig,
25}
26
27impl CompactionConfigBuilder {
28    pub fn new() -> Self {
29        #[expect(deprecated)]
30        Self {
31            config: CompactionConfig {
32                max_bytes_for_level_base: compaction_config::max_bytes_for_level_base(),
33                max_bytes_for_level_multiplier: compaction_config::max_bytes_for_level_multiplier(),
34                max_level: compaction_config::max_level() as u64,
35                max_compaction_bytes: compaction_config::max_compaction_bytes(),
36                sub_level_max_compaction_bytes: compaction_config::sub_level_max_compaction_bytes(),
37                level0_tier_compact_file_number: compaction_config::level0_tier_compact_file_number(
38                ),
39                target_file_size_base: compaction_config::target_file_size_base(),
40                compaction_mode: CompactionMode::Range as i32,
41                compression_algorithm: compaction_config::compression_algorithm_vec(
42                    compaction_config::max_level(),
43                ),
44                sstable_filter_type: compaction_config::sstable_filter_type(),
45                sstable_filter_layout: compaction_config::sstable_filter_layout(),
46                compaction_filter_mask: compaction_config::compaction_filter_mask(),
47                max_sub_compaction: compaction_config::max_sub_compaction(),
48                max_space_reclaim_bytes: compaction_config::max_space_reclaim_bytes(),
49                split_by_state_table: false,
50                split_weight_by_vnode: 0,
51                level0_stop_write_threshold_sub_level_number:
52                    compaction_config::level0_stop_write_threshold_sub_level_number(),
53                // This configure variable shall be larger than level0_tier_compact_file_number, and
54                // it shall meet the following condition:
55                //    level0_max_compact_file_number * target_file_size_base >
56                // max_bytes_for_level_base
57                level0_max_compact_file_number: compaction_config::level0_max_compact_file_number(),
58                level0_sub_level_compact_level_count:
59                    compaction_config::level0_sub_level_compact_level_count(),
60                level0_overlapping_sub_level_compact_level_count:
61                    compaction_config::level0_overlapping_sub_level_compact_level_count(),
62                tombstone_reclaim_ratio: compaction_config::tombstone_reclaim_ratio(),
63                enable_emergency_picker: compaction_config::enable_emergency_picker(),
64                max_l0_compact_level_count: Some(compaction_config::max_l0_compact_level_count()),
65                sst_allowed_trivial_move_min_size: Some(
66                    compaction_config::sst_allowed_trivial_move_min_size(),
67                ),
68                disable_auto_group_scheduling: Some(
69                    compaction_config::disable_auto_group_scheduling(),
70                ),
71                max_overlapping_level_size: Some(compaction_config::max_overlapping_level_size()),
72                sst_allowed_trivial_move_max_count: Some(
73                    compaction_config::sst_allowed_trivial_move_max_count(),
74                ),
75                emergency_level0_sst_file_count: Some(
76                    compaction_config::emergency_level0_sst_file_count(),
77                ),
78                emergency_level0_sub_level_partition: Some(
79                    compaction_config::emergency_level0_sub_level_partition(),
80                ),
81                level0_stop_write_threshold_max_sst_count: Some(
82                    compaction_config::level0_stop_write_threshold_max_sst_count(),
83                ),
84                level0_stop_write_threshold_max_size: Some(
85                    compaction_config::level0_stop_write_threshold_max_size(),
86                ),
87                enable_optimize_l0_interval_selection: Some(
88                    compaction_config::enable_optimize_l0_interval_selection(),
89                ),
90                vnode_aligned_level_size_threshold: None,
91                max_kv_count_for_xor16: compaction_config::blocked_xor_filter_kv_count_threshold(),
92                max_vnode_key_range_bytes: compaction_config::max_vnode_key_range_bytes(),
93            },
94        }
95    }
96
97    pub fn with_config(config: CompactionConfig) -> Self {
98        Self { config }
99    }
100
101    pub fn with_opt(opt: &CompactionConfigOpt) -> Self {
102        Self::new()
103            .max_bytes_for_level_base(opt.max_bytes_for_level_base)
104            .max_bytes_for_level_multiplier(opt.max_bytes_for_level_multiplier)
105            .max_compaction_bytes(opt.max_compaction_bytes)
106            .sub_level_max_compaction_bytes(opt.sub_level_max_compaction_bytes)
107            .level0_tier_compact_file_number(opt.level0_tier_compact_file_number)
108            .target_file_size_base(opt.target_file_size_base)
109            .compaction_filter_mask(opt.compaction_filter_mask)
110            .max_sub_compaction(opt.max_sub_compaction)
111            .level0_stop_write_threshold_sub_level_number(
112                opt.level0_stop_write_threshold_sub_level_number,
113            )
114            .level0_sub_level_compact_level_count(opt.level0_sub_level_compact_level_count)
115            .level0_overlapping_sub_level_compact_level_count(
116                opt.level0_overlapping_sub_level_compact_level_count,
117            )
118            .max_space_reclaim_bytes(opt.max_space_reclaim_bytes)
119            .level0_max_compact_file_number(opt.level0_max_compact_file_number)
120            .tombstone_reclaim_ratio(opt.tombstone_reclaim_ratio)
121            .max_level(opt.max_level as u64)
122            .sst_allowed_trivial_move_min_size(Some(opt.sst_allowed_trivial_move_min_size))
123            .sst_allowed_trivial_move_max_count(Some(opt.sst_allowed_trivial_move_max_count))
124            .max_overlapping_level_size(Some(opt.max_overlapping_level_size))
125            .emergency_level0_sst_file_count(Some(opt.emergency_level0_sst_file_count))
126            .emergency_level0_sub_level_partition(Some(opt.emergency_level0_sub_level_partition))
127            .level0_stop_write_threshold_max_sst_count(Some(
128                opt.level0_stop_write_threshold_max_sst_count,
129            ))
130            .level0_stop_write_threshold_max_size(Some(opt.level0_stop_write_threshold_max_size))
131            .enable_optimize_l0_interval_selection(Some(opt.enable_optimize_l0_interval_selection))
132            .max_kv_count_for_xor16(opt.blocked_xor_filter_kv_count_threshold)
133            .max_vnode_key_range_bytes(opt.max_vnode_key_range_bytes)
134            .sstable_filter_type(opt.sstable_filter_type.clone())
135            .sstable_filter_layout(opt.sstable_filter_layout.clone())
136    }
137
138    pub fn build(self) -> CompactionConfig {
139        let mut config = self.config;
140        if let Err(reason) = validate_compaction_config(&config) {
141            // Avoid crashing later in compaction task planning due to invalid per-level filter
142            // configs. Fallback to legacy behavior (xor16 + auto) and keep other fields unchanged.
143            tracing::warn!(
144                "Bad compaction config: {}. Falling back to default sstable filter type/layout.",
145                reason
146            );
147            config.sstable_filter_type.clear();
148            config.sstable_filter_layout.clear();
149        }
150        config
151    }
152}
153
154/// Returns Ok if `config` is valid,
155/// or the reason why it's invalid.
156pub fn validate_compaction_config(config: &CompactionConfig) -> Result<(), String> {
157    let sub_level_number_threshold_min = 1;
158    if config.level0_stop_write_threshold_sub_level_number < sub_level_number_threshold_min {
159        return Err(format!(
160            "{} is too small for level0_stop_write_threshold_sub_level_number, expect >= {}",
161            config.level0_stop_write_threshold_sub_level_number, sub_level_number_threshold_min
162        ));
163    }
164    if !config.sstable_filter_type.is_empty() {
165        if config.sstable_filter_type.len() < config.max_level as usize + 1 {
166            return Err(format!(
167                "sstable_filter_type must provide at least {} entries for max_level {}",
168                config.max_level + 1,
169                config.max_level
170            ));
171        }
172        for filter_type in &config.sstable_filter_type {
173            parse_sstable_filter_type(filter_type)?;
174        }
175    }
176
177    if !config.sstable_filter_layout.is_empty() {
178        if config.sstable_filter_layout.len() < config.max_level as usize + 1 {
179            return Err(format!(
180                "sstable_filter_layout must provide at least {} entries for max_level {}",
181                config.max_level + 1,
182                config.max_level
183            ));
184        }
185        for layout in &config.sstable_filter_layout {
186            parse_sstable_filter_layout(layout)?;
187        }
188    }
189    Ok(())
190}
191
192impl Default for CompactionConfigBuilder {
193    fn default() -> Self {
194        Self::new()
195    }
196}
197
198macro_rules! builder_field {
199    ($( $name:ident: $type:ty ),* ,) => {
200        impl CompactionConfigBuilder {
201            $(
202                pub fn $name(mut self, v:$type) -> Self {
203                    self.config.$name = v;
204                    self
205                }
206            )*
207        }
208    }
209}
210
211builder_field! {
212    max_bytes_for_level_base: u64,
213    max_bytes_for_level_multiplier: u64,
214    max_level: u64,
215    max_compaction_bytes: u64,
216    sub_level_max_compaction_bytes: u64,
217    level0_tier_compact_file_number: u64,
218    compaction_mode: i32,
219    compression_algorithm: Vec<String>,
220    sstable_filter_type: Vec<String>,
221    sstable_filter_layout: Vec<String>,
222    compaction_filter_mask: u32,
223    target_file_size_base: u64,
224    max_sub_compaction: u32,
225    max_space_reclaim_bytes: u64,
226    level0_stop_write_threshold_sub_level_number: u64,
227    level0_max_compact_file_number: u64,
228    level0_sub_level_compact_level_count: u32,
229    level0_overlapping_sub_level_compact_level_count: u32,
230    tombstone_reclaim_ratio: u32,
231    sst_allowed_trivial_move_min_size: Option<u64>,
232    sst_allowed_trivial_move_max_count: Option<u32>,
233    disable_auto_group_scheduling: Option<bool>,
234    max_overlapping_level_size: Option<u64>,
235    emergency_level0_sst_file_count: Option<u32>,
236    emergency_level0_sub_level_partition: Option<u32>,
237    level0_stop_write_threshold_max_sst_count: Option<u32>,
238    level0_stop_write_threshold_max_size: Option<u64>,
239    enable_optimize_l0_interval_selection: Option<bool>,
240    max_kv_count_for_xor16: Option<u64>,
241    max_vnode_key_range_bytes: Option<u64>,
242}
243
244#[cfg(test)]
245mod tests {
246    use super::{CompactionConfigBuilder, validate_compaction_config};
247
248    #[test]
249    fn test_validate_compaction_config_accepts_missing_filter_config() {
250        let mut config = CompactionConfigBuilder::new().build();
251        config.sstable_filter_type.clear();
252        config.sstable_filter_layout.clear();
253        assert!(validate_compaction_config(&config).is_ok());
254    }
255
256    #[test]
257    fn test_validate_compaction_config_rejects_invalid_filter_config() {
258        let mut config = CompactionConfigBuilder::new().build();
259        config.sstable_filter_type = vec!["xor16".to_owned()];
260        assert!(validate_compaction_config(&config).is_err());
261
262        let mut config = CompactionConfigBuilder::new().build();
263        config.sstable_filter_layout = vec!["auto".to_owned()];
264        assert!(validate_compaction_config(&config).is_err());
265
266        let mut config = CompactionConfigBuilder::new().build();
267        config.sstable_filter_layout = vec!["auto".to_owned(); config.max_level as usize + 1];
268        config.sstable_filter_layout[0] = "unknown".to_owned();
269        assert!(validate_compaction_config(&config).is_err());
270    }
271
272    #[test]
273    fn test_validate_compaction_config_accepts_none_filter_type() {
274        let mut config = CompactionConfigBuilder::new().build();
275        config.sstable_filter_type = vec!["none".to_owned(); config.max_level as usize + 1];
276        assert!(validate_compaction_config(&config).is_ok());
277    }
278}