risingwave_meta/hummock/compaction/
compaction_config.rs1use risingwave_common::config::CompactionConfig as CompactionConfigOpt;
16use risingwave_common::config::meta::default::compaction_config;
17use risingwave_pb::hummock::CompactionConfig;
18use risingwave_pb::hummock::compaction_config::CompactionMode;
19
20pub struct CompactionConfigBuilder {
21 config: CompactionConfig,
22}
23
24impl CompactionConfigBuilder {
25 pub fn new() -> Self {
26 Self {
27 config: CompactionConfig {
28 max_bytes_for_level_base: compaction_config::max_bytes_for_level_base(),
29 max_bytes_for_level_multiplier: compaction_config::max_bytes_for_level_multiplier(),
30 max_level: compaction_config::max_level() as u64,
31 max_compaction_bytes: compaction_config::max_compaction_bytes(),
32 sub_level_max_compaction_bytes: compaction_config::sub_level_max_compaction_bytes(),
33 level0_tier_compact_file_number: compaction_config::level0_tier_compact_file_number(
34 ),
35 target_file_size_base: compaction_config::target_file_size_base(),
36 compaction_mode: CompactionMode::Range as i32,
37 compression_algorithm: vec![
41 "None".to_owned(),
42 "None".to_owned(),
43 "None".to_owned(),
44 "Lz4".to_owned(),
45 "Lz4".to_owned(),
46 "Zstd".to_owned(),
47 "Zstd".to_owned(),
48 ],
49 compaction_filter_mask: compaction_config::compaction_filter_mask(),
50 max_sub_compaction: compaction_config::max_sub_compaction(),
51 max_space_reclaim_bytes: compaction_config::max_space_reclaim_bytes(),
52 split_by_state_table: false,
53 split_weight_by_vnode: 0,
54 level0_stop_write_threshold_sub_level_number:
55 compaction_config::level0_stop_write_threshold_sub_level_number(),
56 level0_max_compact_file_number: compaction_config::level0_max_compact_file_number(),
61 level0_sub_level_compact_level_count:
62 compaction_config::level0_sub_level_compact_level_count(),
63 level0_overlapping_sub_level_compact_level_count:
64 compaction_config::level0_overlapping_sub_level_compact_level_count(),
65 tombstone_reclaim_ratio: compaction_config::tombstone_reclaim_ratio(),
66 enable_emergency_picker: compaction_config::enable_emergency_picker(),
67 max_l0_compact_level_count: Some(compaction_config::max_l0_compact_level_count()),
68 sst_allowed_trivial_move_min_size: Some(
69 compaction_config::sst_allowed_trivial_move_min_size(),
70 ),
71 disable_auto_group_scheduling: Some(
72 compaction_config::disable_auto_group_scheduling(),
73 ),
74 max_overlapping_level_size: Some(compaction_config::max_overlapping_level_size()),
75 sst_allowed_trivial_move_max_count: Some(
76 compaction_config::sst_allowed_trivial_move_max_count(),
77 ),
78 emergency_level0_sst_file_count: Some(
79 compaction_config::emergency_level0_sst_file_count(),
80 ),
81 emergency_level0_sub_level_partition: Some(
82 compaction_config::emergency_level0_sub_level_partition(),
83 ),
84 level0_stop_write_threshold_max_sst_count: Some(
85 compaction_config::level0_stop_write_threshold_max_sst_count(),
86 ),
87 level0_stop_write_threshold_max_size: Some(
88 compaction_config::level0_stop_write_threshold_max_size(),
89 ),
90 enable_optimize_l0_interval_selection: Some(
91 compaction_config::enable_optimize_l0_interval_selection(),
92 ),
93 vnode_aligned_level_size_threshold:
94 compaction_config::vnode_aligned_level_size_threshold(),
95 },
96 }
97 }
98
99 pub fn with_config(config: CompactionConfig) -> Self {
100 Self { config }
101 }
102
103 pub fn with_opt(opt: &CompactionConfigOpt) -> Self {
104 Self::new()
105 .max_bytes_for_level_base(opt.max_bytes_for_level_base)
106 .max_bytes_for_level_multiplier(opt.max_bytes_for_level_multiplier)
107 .max_compaction_bytes(opt.max_compaction_bytes)
108 .sub_level_max_compaction_bytes(opt.sub_level_max_compaction_bytes)
109 .level0_tier_compact_file_number(opt.level0_tier_compact_file_number)
110 .target_file_size_base(opt.target_file_size_base)
111 .compaction_filter_mask(opt.compaction_filter_mask)
112 .max_sub_compaction(opt.max_sub_compaction)
113 .level0_stop_write_threshold_sub_level_number(
114 opt.level0_stop_write_threshold_sub_level_number,
115 )
116 .level0_sub_level_compact_level_count(opt.level0_sub_level_compact_level_count)
117 .level0_overlapping_sub_level_compact_level_count(
118 opt.level0_overlapping_sub_level_compact_level_count,
119 )
120 .max_space_reclaim_bytes(opt.max_space_reclaim_bytes)
121 .level0_max_compact_file_number(opt.level0_max_compact_file_number)
122 .tombstone_reclaim_ratio(opt.tombstone_reclaim_ratio)
123 .max_level(opt.max_level as u64)
124 .sst_allowed_trivial_move_min_size(Some(opt.sst_allowed_trivial_move_min_size))
125 .sst_allowed_trivial_move_max_count(Some(opt.sst_allowed_trivial_move_max_count))
126 .max_overlapping_level_size(Some(opt.max_overlapping_level_size))
127 .emergency_level0_sst_file_count(Some(opt.emergency_level0_sst_file_count))
128 .emergency_level0_sub_level_partition(Some(opt.emergency_level0_sub_level_partition))
129 .level0_stop_write_threshold_max_sst_count(Some(
130 opt.level0_stop_write_threshold_max_sst_count,
131 ))
132 .level0_stop_write_threshold_max_size(Some(opt.level0_stop_write_threshold_max_size))
133 .enable_optimize_l0_interval_selection(Some(opt.enable_optimize_l0_interval_selection))
134 .vnode_aligned_level_size_threshold(opt.vnode_aligned_level_size_threshold)
135 }
136
137 pub fn build(self) -> CompactionConfig {
138 if let Err(reason) = validate_compaction_config(&self.config) {
139 tracing::warn!("Bad compaction config: {}", reason);
140 }
141 self.config
142 }
143}
144
145pub fn validate_compaction_config(config: &CompactionConfig) -> Result<(), String> {
148 let sub_level_number_threshold_min = 1;
149 if config.level0_stop_write_threshold_sub_level_number < sub_level_number_threshold_min {
150 return Err(format!(
151 "{} is too small for level0_stop_write_threshold_sub_level_number, expect >= {}",
152 config.level0_stop_write_threshold_sub_level_number, sub_level_number_threshold_min
153 ));
154 }
155 Ok(())
156}
157
158impl Default for CompactionConfigBuilder {
159 fn default() -> Self {
160 Self::new()
161 }
162}
163
164macro_rules! builder_field {
165 ($( $name:ident: $type:ty ),* ,) => {
166 impl CompactionConfigBuilder {
167 $(
168 pub fn $name(mut self, v:$type) -> Self {
169 self.config.$name = v;
170 self
171 }
172 )*
173 }
174 }
175}
176
177builder_field! {
178 max_bytes_for_level_base: u64,
179 max_bytes_for_level_multiplier: u64,
180 max_level: u64,
181 max_compaction_bytes: u64,
182 sub_level_max_compaction_bytes: u64,
183 level0_tier_compact_file_number: u64,
184 compaction_mode: i32,
185 compression_algorithm: Vec<String>,
186 compaction_filter_mask: u32,
187 target_file_size_base: u64,
188 max_sub_compaction: u32,
189 max_space_reclaim_bytes: u64,
190 level0_stop_write_threshold_sub_level_number: u64,
191 level0_max_compact_file_number: u64,
192 level0_sub_level_compact_level_count: u32,
193 level0_overlapping_sub_level_compact_level_count: u32,
194 tombstone_reclaim_ratio: u32,
195 sst_allowed_trivial_move_min_size: Option<u64>,
196 sst_allowed_trivial_move_max_count: Option<u32>,
197 disable_auto_group_scheduling: Option<bool>,
198 max_overlapping_level_size: Option<u64>,
199 emergency_level0_sst_file_count: Option<u32>,
200 emergency_level0_sub_level_partition: Option<u32>,
201 level0_stop_write_threshold_max_sst_count: Option<u32>,
202 level0_stop_write_threshold_max_size: Option<u64>,
203 enable_optimize_l0_interval_selection: Option<bool>,
204 vnode_aligned_level_size_threshold: Option<u64>,
205}