risingwave_meta/hummock/compaction/selector/
mod.rs1mod emergency_selector;
21pub(crate) mod level_selector;
22mod manual_selector;
23mod space_reclaim_selector;
24mod tombstone_compaction_selector;
25mod ttl_selector;
26mod vnode_watermark_selector;
27
28use std::collections::{BTreeSet, HashMap};
29use std::sync::Arc;
30
31pub use emergency_selector::EmergencySelector;
32pub use level_selector::{DynamicLevelSelector, DynamicLevelSelectorCore};
33pub use manual_selector::{ManualCompactionOption, ManualCompactionSelector};
34use risingwave_common::catalog::{TableId, TableOption};
35use risingwave_hummock_sdk::HummockCompactionTaskId;
36use risingwave_hummock_sdk::level::Levels;
37use risingwave_hummock_sdk::table_watermark::TableWatermarks;
38use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
39use risingwave_pb::hummock::compact_task;
40pub use space_reclaim_selector::SpaceReclaimCompactionSelector;
41pub use tombstone_compaction_selector::TombstoneCompactionSelector;
42pub use ttl_selector::TtlCompactionSelector;
43pub use vnode_watermark_selector::VnodeWatermarkCompactionSelector;
44
45use super::picker::LocalPickerStatistic;
46use super::{
47 CompactionDeveloperConfig, LevelCompactionPicker, TierCompactionPicker, create_compaction_task,
48};
49use crate::hummock::compaction::CompactionTask;
50use crate::hummock::level_handler::LevelHandler;
51use crate::hummock::model::CompactionGroup;
52use crate::rpc::metrics::MetaMetrics;
53
54pub struct CompactionSelectorContext<'a> {
55 pub group: &'a CompactionGroup,
56 pub levels: &'a Levels,
57 pub member_table_ids: &'a BTreeSet<TableId>,
58 pub level_handlers: &'a mut [LevelHandler],
59 pub selector_stats: &'a mut LocalSelectorStatistic,
60 pub table_id_to_options: &'a HashMap<u32, TableOption>,
61 pub developer_config: Arc<CompactionDeveloperConfig>,
62 pub table_watermarks: &'a HashMap<TableId, Arc<TableWatermarks>>,
63 pub state_table_info: &'a HummockVersionStateTableInfo,
64}
65
66pub trait CompactionSelector: Sync + Send {
67 fn pick_compaction(
68 &mut self,
69 task_id: HummockCompactionTaskId,
70 context: CompactionSelectorContext<'_>,
71 ) -> Option<CompactionTask>;
72
73 fn report_statistic_metrics(&self, _metrics: &MetaMetrics) {}
74
75 fn name(&self) -> &'static str;
76
77 fn task_type(&self) -> compact_task::TaskType;
78}
79
80pub fn default_compaction_selector() -> Box<dyn CompactionSelector> {
81 Box::<DynamicLevelSelector>::default()
82}
83
84#[derive(Default)]
85pub struct LocalSelectorStatistic {
86 skip_picker: Vec<(usize, usize, LocalPickerStatistic)>,
87}
88
89impl LocalSelectorStatistic {
90 pub fn report_to_metrics(&self, group_id: u64, metrics: &MetaMetrics) {
91 for (start_level, target_level, stats) in &self.skip_picker {
92 let level_label = format!("cg{}-{}-to-{}", group_id, start_level, target_level);
93 if stats.skip_by_write_amp_limit > 0 {
94 metrics
95 .compact_skip_frequency
96 .with_label_values(&[level_label.as_str(), "write-amp"])
97 .inc();
98 }
99 if stats.skip_by_count_limit > 0 {
100 metrics
101 .compact_skip_frequency
102 .with_label_values(&[level_label.as_str(), "count"])
103 .inc();
104 }
105 if stats.skip_by_pending_files > 0 {
106 metrics
107 .compact_skip_frequency
108 .with_label_values(&[level_label.as_str(), "pending-files"])
109 .inc();
110 }
111 if stats.skip_by_overlapping > 0 {
112 metrics
113 .compact_skip_frequency
114 .with_label_values(&[level_label.as_str(), "overlapping"])
115 .inc();
116 }
117 metrics
118 .compact_skip_frequency
119 .with_label_values(&[level_label.as_str(), "picker"])
120 .inc();
121 }
122 }
123}
124
125#[cfg(test)]
126pub mod tests {
127 use std::ops::Range;
128
129 use itertools::Itertools;
130 use risingwave_hummock_sdk::key_range::KeyRange;
131 use risingwave_hummock_sdk::level::{Level, OverlappingLevel};
132 use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
133 use risingwave_pb::hummock::LevelType;
134
135 use super::*;
136 use crate::hummock::test_utils::iterator_test_key_of_epoch;
137
138 pub fn push_table_level0_overlapping(levels: &mut Levels, sst: SstableInfo) {
139 levels.l0.total_file_size += sst.sst_size;
140 levels.l0.sub_levels.push(Level {
141 level_idx: 0,
142 level_type: LevelType::Overlapping,
143 total_file_size: sst.sst_size,
144 uncompressed_file_size: sst.uncompressed_file_size,
145 sub_level_id: sst.sst_id,
146 table_infos: vec![sst],
147 ..Default::default()
148 });
149 }
150
151 pub fn push_table_level0_nonoverlapping(levels: &mut Levels, sst: SstableInfo) {
152 push_table_level0_overlapping(levels, sst);
153 levels.l0.sub_levels.last_mut().unwrap().level_type = LevelType::Nonoverlapping;
154 }
155
156 pub fn push_tables_level0_nonoverlapping(levels: &mut Levels, table_infos: Vec<SstableInfo>) {
157 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum::<u64>();
158 let uncompressed_file_size = table_infos
159 .iter()
160 .map(|table| table.uncompressed_file_size)
161 .sum();
162 let sub_level_id = table_infos[0].sst_id;
163 levels.l0.total_file_size += total_file_size;
164 levels.l0.sub_levels.push(Level {
165 level_idx: 0,
166 level_type: LevelType::Nonoverlapping,
167 total_file_size,
168 sub_level_id,
169 table_infos,
170 uncompressed_file_size,
171 ..Default::default()
172 });
173 }
174
175 pub fn generate_table(
176 id: u64,
177 table_prefix: u64,
178 left: usize,
179 right: usize,
180 epoch: u64,
181 ) -> SstableInfo {
182 generate_table_impl(id, table_prefix, left, right, epoch).into()
183 }
184
185 pub fn generate_table_impl(
186 id: u64,
187 table_prefix: u64,
188 left: usize,
189 right: usize,
190 epoch: u64,
191 ) -> SstableInfoInner {
192 let object_size = (right - left + 1) as u64;
193 SstableInfoInner {
194 object_id: id,
195 sst_id: id,
196 key_range: KeyRange {
197 left: iterator_test_key_of_epoch(table_prefix, left, epoch).into(),
198 right: iterator_test_key_of_epoch(table_prefix, right, epoch).into(),
199 right_exclusive: false,
200 },
201 file_size: object_size,
202 table_ids: vec![table_prefix as u32],
203 uncompressed_file_size: (right - left + 1) as u64,
204 total_key_count: (right - left + 1) as u64,
205 sst_size: object_size,
206 ..Default::default()
207 }
208 }
209
210 #[allow(clippy::too_many_arguments)]
211 pub fn generate_table_with_ids_and_epochs(
212 id: u64,
213 table_prefix: u64,
214 left: usize,
215 right: usize,
216 epoch: u64,
217 table_ids: Vec<u32>,
218 min_epoch: u64,
219 max_epoch: u64,
220 ) -> SstableInfo {
221 let object_size = (right - left + 1) as u64;
222 SstableInfoInner {
223 object_id: id,
224 sst_id: id,
225 key_range: KeyRange {
226 left: iterator_test_key_of_epoch(table_prefix, left, epoch).into(),
227 right: iterator_test_key_of_epoch(table_prefix, right, epoch).into(),
228 right_exclusive: false,
229 },
230 file_size: object_size,
231 table_ids,
232 uncompressed_file_size: object_size,
233 min_epoch,
234 max_epoch,
235 sst_size: object_size,
236 ..Default::default()
237 }
238 .into()
239 }
240
241 pub fn generate_tables(
242 ids: Range<u64>,
243 keys: Range<usize>,
244 epoch: u64,
245 file_size: u64,
246 ) -> Vec<SstableInfo> {
247 let step = (keys.end - keys.start) / (ids.end - ids.start) as usize;
248 let mut start = keys.start;
249 let mut tables = vec![];
250 for id in ids {
251 let mut table = generate_table_impl(id, 1, start, start + step - 1, epoch);
252 table.file_size = file_size;
253 table.sst_size = file_size;
254 tables.push(table.into());
255 start += step;
256 }
257 tables
258 }
259
260 pub fn generate_level(level_idx: u32, table_infos: Vec<SstableInfo>) -> Level {
261 let total_file_size = table_infos.iter().map(|sst| sst.sst_size).sum();
262 let uncompressed_file_size = table_infos
263 .iter()
264 .map(|sst| sst.uncompressed_file_size)
265 .sum();
266 Level {
267 level_idx,
268 level_type: LevelType::Nonoverlapping,
269 table_infos,
270 total_file_size,
271 sub_level_id: 0,
272 uncompressed_file_size,
273 ..Default::default()
274 }
275 }
276
277 pub fn generate_l0_nonoverlapping_sublevels(table_infos: Vec<SstableInfo>) -> OverlappingLevel {
280 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum::<u64>();
281 let uncompressed_file_size = table_infos
282 .iter()
283 .map(|table| table.uncompressed_file_size)
284 .sum::<u64>();
285 OverlappingLevel {
286 sub_levels: table_infos
287 .into_iter()
288 .enumerate()
289 .map(|(idx, table)| Level {
290 level_idx: 0,
291 level_type: LevelType::Nonoverlapping,
292 total_file_size: table.sst_size,
293 uncompressed_file_size: table.uncompressed_file_size,
294 sub_level_id: idx as u64,
295 table_infos: vec![table],
296 ..Default::default()
297 })
298 .collect_vec(),
299 total_file_size,
300 uncompressed_file_size,
301 }
302 }
303
304 pub fn generate_l0_nonoverlapping_multi_sublevels(
305 table_infos: Vec<Vec<SstableInfo>>,
306 ) -> OverlappingLevel {
307 let mut l0 = OverlappingLevel {
308 sub_levels: table_infos
309 .into_iter()
310 .enumerate()
311 .map(|(idx, table)| Level {
312 level_idx: 0,
313 level_type: LevelType::Nonoverlapping,
314 total_file_size: table.iter().map(|table| table.sst_size).sum::<u64>(),
315 uncompressed_file_size: table
316 .iter()
317 .map(|sst| sst.uncompressed_file_size)
318 .sum::<u64>(),
319 sub_level_id: idx as u64,
320 table_infos: table,
321 ..Default::default()
322 })
323 .collect_vec(),
324 total_file_size: 0,
325 uncompressed_file_size: 0,
326 };
327
328 l0.total_file_size = l0.sub_levels.iter().map(|l| l.total_file_size).sum::<u64>();
329 l0.uncompressed_file_size = l0
330 .sub_levels
331 .iter()
332 .map(|l| l.uncompressed_file_size)
333 .sum::<u64>();
334 l0
335 }
336
337 pub fn generate_l0_overlapping_sublevels(
340 table_infos: Vec<Vec<SstableInfo>>,
341 ) -> OverlappingLevel {
342 let mut l0 = OverlappingLevel {
343 sub_levels: table_infos
344 .into_iter()
345 .enumerate()
346 .map(|(idx, table)| Level {
347 level_idx: 0,
348 level_type: LevelType::Overlapping,
349 total_file_size: table.iter().map(|table| table.sst_size).sum::<u64>(),
350 sub_level_id: idx as u64,
351 table_infos: table.clone(),
352 uncompressed_file_size: table
353 .iter()
354 .map(|sst| sst.uncompressed_file_size)
355 .sum::<u64>(),
356 ..Default::default()
357 })
358 .collect_vec(),
359 total_file_size: 0,
360 uncompressed_file_size: 0,
361 };
362 l0.total_file_size = l0.sub_levels.iter().map(|l| l.total_file_size).sum::<u64>();
363 l0.uncompressed_file_size = l0
364 .sub_levels
365 .iter()
366 .map(|l| l.uncompressed_file_size)
367 .sum::<u64>();
368 l0
369 }
370
371 pub fn assert_compaction_task(compact_task: &CompactionTask, level_handlers: &[LevelHandler]) {
372 for i in &compact_task.input.input_levels {
373 for t in &i.table_infos {
374 assert!(level_handlers[i.level_idx as usize].is_pending_compact(&t.sst_id));
375 }
376 }
377 }
378}