risingwave_meta/hummock/compaction/
compaction_config.rs1use risingwave_common::config::CompactionConfig as CompactionConfigOpt;
16use risingwave_common::config::meta::default::compaction_config;
17use risingwave_hummock_sdk::filter_utils::{
18 parse_sstable_filter_layout, parse_sstable_filter_type,
19};
20use risingwave_pb::hummock::CompactionConfig;
21use risingwave_pb::hummock::compaction_config::CompactionMode;
22
23pub struct CompactionConfigBuilder {
24 config: CompactionConfig,
25}
26
27impl CompactionConfigBuilder {
28 pub fn new() -> Self {
29 #[expect(deprecated)]
30 Self {
31 config: CompactionConfig {
32 max_bytes_for_level_base: compaction_config::max_bytes_for_level_base(),
33 max_bytes_for_level_multiplier: compaction_config::max_bytes_for_level_multiplier(),
34 max_level: compaction_config::max_level() as u64,
35 max_compaction_bytes: compaction_config::max_compaction_bytes(),
36 sub_level_max_compaction_bytes: compaction_config::sub_level_max_compaction_bytes(),
37 level0_tier_compact_file_number: compaction_config::level0_tier_compact_file_number(
38 ),
39 target_file_size_base: compaction_config::target_file_size_base(),
40 compaction_mode: CompactionMode::Range as i32,
41 compression_algorithm: compaction_config::compression_algorithm_vec(
42 compaction_config::max_level(),
43 ),
44 sstable_filter_type: compaction_config::sstable_filter_type(),
45 sstable_filter_layout: compaction_config::sstable_filter_layout(),
46 compaction_filter_mask: compaction_config::compaction_filter_mask(),
47 max_sub_compaction: compaction_config::max_sub_compaction(),
48 max_space_reclaim_bytes: compaction_config::max_space_reclaim_bytes(),
49 split_by_state_table: false,
50 split_weight_by_vnode: 0,
51 level0_stop_write_threshold_sub_level_number:
52 compaction_config::level0_stop_write_threshold_sub_level_number(),
53 level0_max_compact_file_number: compaction_config::level0_max_compact_file_number(),
58 level0_sub_level_compact_level_count:
59 compaction_config::level0_sub_level_compact_level_count(),
60 level0_overlapping_sub_level_compact_level_count:
61 compaction_config::level0_overlapping_sub_level_compact_level_count(),
62 tombstone_reclaim_ratio: compaction_config::tombstone_reclaim_ratio(),
63 enable_emergency_picker: compaction_config::enable_emergency_picker(),
64 max_l0_compact_level_count: Some(compaction_config::max_l0_compact_level_count()),
65 sst_allowed_trivial_move_min_size: Some(
66 compaction_config::sst_allowed_trivial_move_min_size(),
67 ),
68 disable_auto_group_scheduling: Some(
69 compaction_config::disable_auto_group_scheduling(),
70 ),
71 max_overlapping_level_size: Some(compaction_config::max_overlapping_level_size()),
72 sst_allowed_trivial_move_max_count: Some(
73 compaction_config::sst_allowed_trivial_move_max_count(),
74 ),
75 emergency_level0_sst_file_count: Some(
76 compaction_config::emergency_level0_sst_file_count(),
77 ),
78 emergency_level0_sub_level_partition: Some(
79 compaction_config::emergency_level0_sub_level_partition(),
80 ),
81 level0_stop_write_threshold_max_sst_count: Some(
82 compaction_config::level0_stop_write_threshold_max_sst_count(),
83 ),
84 level0_stop_write_threshold_max_size: Some(
85 compaction_config::level0_stop_write_threshold_max_size(),
86 ),
87 enable_optimize_l0_interval_selection: Some(
88 compaction_config::enable_optimize_l0_interval_selection(),
89 ),
90 vnode_aligned_level_size_threshold: None,
91 max_kv_count_for_xor16: compaction_config::blocked_xor_filter_kv_count_threshold(),
92 max_vnode_key_range_bytes: compaction_config::max_vnode_key_range_bytes(),
93 },
94 }
95 }
96
97 pub fn with_config(config: CompactionConfig) -> Self {
98 Self { config }
99 }
100
101 pub fn with_opt(opt: &CompactionConfigOpt) -> Self {
102 Self::new()
103 .max_bytes_for_level_base(opt.max_bytes_for_level_base)
104 .max_bytes_for_level_multiplier(opt.max_bytes_for_level_multiplier)
105 .max_compaction_bytes(opt.max_compaction_bytes)
106 .sub_level_max_compaction_bytes(opt.sub_level_max_compaction_bytes)
107 .level0_tier_compact_file_number(opt.level0_tier_compact_file_number)
108 .target_file_size_base(opt.target_file_size_base)
109 .compaction_filter_mask(opt.compaction_filter_mask)
110 .max_sub_compaction(opt.max_sub_compaction)
111 .level0_stop_write_threshold_sub_level_number(
112 opt.level0_stop_write_threshold_sub_level_number,
113 )
114 .level0_sub_level_compact_level_count(opt.level0_sub_level_compact_level_count)
115 .level0_overlapping_sub_level_compact_level_count(
116 opt.level0_overlapping_sub_level_compact_level_count,
117 )
118 .max_space_reclaim_bytes(opt.max_space_reclaim_bytes)
119 .level0_max_compact_file_number(opt.level0_max_compact_file_number)
120 .tombstone_reclaim_ratio(opt.tombstone_reclaim_ratio)
121 .max_level(opt.max_level as u64)
122 .sst_allowed_trivial_move_min_size(Some(opt.sst_allowed_trivial_move_min_size))
123 .sst_allowed_trivial_move_max_count(Some(opt.sst_allowed_trivial_move_max_count))
124 .max_overlapping_level_size(Some(opt.max_overlapping_level_size))
125 .emergency_level0_sst_file_count(Some(opt.emergency_level0_sst_file_count))
126 .emergency_level0_sub_level_partition(Some(opt.emergency_level0_sub_level_partition))
127 .level0_stop_write_threshold_max_sst_count(Some(
128 opt.level0_stop_write_threshold_max_sst_count,
129 ))
130 .level0_stop_write_threshold_max_size(Some(opt.level0_stop_write_threshold_max_size))
131 .enable_optimize_l0_interval_selection(Some(opt.enable_optimize_l0_interval_selection))
132 .max_kv_count_for_xor16(opt.blocked_xor_filter_kv_count_threshold)
133 .max_vnode_key_range_bytes(opt.max_vnode_key_range_bytes)
134 .sstable_filter_type(opt.sstable_filter_type.clone())
135 .sstable_filter_layout(opt.sstable_filter_layout.clone())
136 }
137
138 pub fn build(self) -> CompactionConfig {
139 let mut config = self.config;
140 if let Err(reason) = validate_compaction_config(&config) {
141 tracing::warn!(
144 "Bad compaction config: {}. Falling back to default sstable filter type/layout.",
145 reason
146 );
147 config.sstable_filter_type.clear();
148 config.sstable_filter_layout.clear();
149 }
150 config
151 }
152}
153
154pub fn validate_compaction_config(config: &CompactionConfig) -> Result<(), String> {
157 let sub_level_number_threshold_min = 1;
158 if config.level0_stop_write_threshold_sub_level_number < sub_level_number_threshold_min {
159 return Err(format!(
160 "{} is too small for level0_stop_write_threshold_sub_level_number, expect >= {}",
161 config.level0_stop_write_threshold_sub_level_number, sub_level_number_threshold_min
162 ));
163 }
164 if !config.sstable_filter_type.is_empty() {
165 if config.sstable_filter_type.len() < config.max_level as usize + 1 {
166 return Err(format!(
167 "sstable_filter_type must provide at least {} entries for max_level {}",
168 config.max_level + 1,
169 config.max_level
170 ));
171 }
172 for filter_type in &config.sstable_filter_type {
173 parse_sstable_filter_type(filter_type)?;
174 }
175 }
176
177 if !config.sstable_filter_layout.is_empty() {
178 if config.sstable_filter_layout.len() < config.max_level as usize + 1 {
179 return Err(format!(
180 "sstable_filter_layout must provide at least {} entries for max_level {}",
181 config.max_level + 1,
182 config.max_level
183 ));
184 }
185 for layout in &config.sstable_filter_layout {
186 parse_sstable_filter_layout(layout)?;
187 }
188 }
189 Ok(())
190}
191
192impl Default for CompactionConfigBuilder {
193 fn default() -> Self {
194 Self::new()
195 }
196}
197
198macro_rules! builder_field {
199 ($( $name:ident: $type:ty ),* ,) => {
200 impl CompactionConfigBuilder {
201 $(
202 pub fn $name(mut self, v:$type) -> Self {
203 self.config.$name = v;
204 self
205 }
206 )*
207 }
208 }
209}
210
211builder_field! {
212 max_bytes_for_level_base: u64,
213 max_bytes_for_level_multiplier: u64,
214 max_level: u64,
215 max_compaction_bytes: u64,
216 sub_level_max_compaction_bytes: u64,
217 level0_tier_compact_file_number: u64,
218 compaction_mode: i32,
219 compression_algorithm: Vec<String>,
220 sstable_filter_type: Vec<String>,
221 sstable_filter_layout: Vec<String>,
222 compaction_filter_mask: u32,
223 target_file_size_base: u64,
224 max_sub_compaction: u32,
225 max_space_reclaim_bytes: u64,
226 level0_stop_write_threshold_sub_level_number: u64,
227 level0_max_compact_file_number: u64,
228 level0_sub_level_compact_level_count: u32,
229 level0_overlapping_sub_level_compact_level_count: u32,
230 tombstone_reclaim_ratio: u32,
231 sst_allowed_trivial_move_min_size: Option<u64>,
232 sst_allowed_trivial_move_max_count: Option<u32>,
233 disable_auto_group_scheduling: Option<bool>,
234 max_overlapping_level_size: Option<u64>,
235 emergency_level0_sst_file_count: Option<u32>,
236 emergency_level0_sub_level_partition: Option<u32>,
237 level0_stop_write_threshold_max_sst_count: Option<u32>,
238 level0_stop_write_threshold_max_size: Option<u64>,
239 enable_optimize_l0_interval_selection: Option<bool>,
240 max_kv_count_for_xor16: Option<u64>,
241 max_vnode_key_range_bytes: Option<u64>,
242}
243
244#[cfg(test)]
245mod tests {
246 use super::{CompactionConfigBuilder, validate_compaction_config};
247
248 #[test]
249 fn test_validate_compaction_config_accepts_missing_filter_config() {
250 let mut config = CompactionConfigBuilder::new().build();
251 config.sstable_filter_type.clear();
252 config.sstable_filter_layout.clear();
253 assert!(validate_compaction_config(&config).is_ok());
254 }
255
256 #[test]
257 fn test_validate_compaction_config_rejects_invalid_filter_config() {
258 let mut config = CompactionConfigBuilder::new().build();
259 config.sstable_filter_type = vec!["xor16".to_owned()];
260 assert!(validate_compaction_config(&config).is_err());
261
262 let mut config = CompactionConfigBuilder::new().build();
263 config.sstable_filter_layout = vec!["auto".to_owned()];
264 assert!(validate_compaction_config(&config).is_err());
265
266 let mut config = CompactionConfigBuilder::new().build();
267 config.sstable_filter_layout = vec!["auto".to_owned(); config.max_level as usize + 1];
268 config.sstable_filter_layout[0] = "unknown".to_owned();
269 assert!(validate_compaction_config(&config).is_err());
270 }
271
272 #[test]
273 fn test_validate_compaction_config_accepts_none_filter_type() {
274 let mut config = CompactionConfigBuilder::new().build();
275 config.sstable_filter_type = vec!["none".to_owned(); config.max_level as usize + 1];
276 assert!(validate_compaction_config(&config).is_ok());
277 }
278}