risingwave_meta/hummock/compaction/selector/
mod.rs1mod emergency_selector;
21pub(crate) mod level_selector;
22mod manual_selector;
23mod space_reclaim_selector;
24mod tombstone_compaction_selector;
25mod ttl_selector;
26mod vnode_watermark_selector;
27
28use std::collections::{BTreeSet, HashMap};
29use std::sync::Arc;
30
31pub use emergency_selector::EmergencySelector;
32pub use level_selector::{DynamicLevelSelector, DynamicLevelSelectorCore};
33pub use manual_selector::{ManualCompactionOption, ManualCompactionSelector};
34use risingwave_common::catalog::{TableId, TableOption};
35use risingwave_hummock_sdk::level::Levels;
36use risingwave_hummock_sdk::table_watermark::TableWatermarks;
37use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
38use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
39use risingwave_pb::hummock::compact_task;
40pub use space_reclaim_selector::SpaceReclaimCompactionSelector;
41pub use tombstone_compaction_selector::TombstoneCompactionSelector;
42pub use ttl_selector::TtlCompactionSelector;
43pub use vnode_watermark_selector::VnodeWatermarkCompactionSelector;
44
45use super::in_progress_compaction::InProgressCompactionView;
46use super::picker::LocalPickerStatistic;
47use super::{
48 CompactionDeveloperConfig, LevelCompactionPicker, TierCompactionPicker, create_compaction_task,
49};
50use crate::hummock::compaction::CompactionTask;
51use crate::hummock::level_handler::LevelHandler;
52use crate::hummock::model::CompactionGroup;
53use crate::rpc::metrics::MetaMetrics;
54
55pub struct CompactionSelectorContext<'a> {
56 pub group: &'a CompactionGroup,
57 pub levels: &'a Levels,
58 pub member_table_ids: &'a BTreeSet<TableId>,
59 pub level_handlers: &'a mut [LevelHandler],
60 pub selector_stats: &'a mut LocalSelectorStatistic,
61 pub table_id_to_options: &'a HashMap<TableId, TableOption>,
62 pub developer_config: Arc<CompactionDeveloperConfig>,
63 pub table_watermarks: &'a HashMap<TableId, Arc<TableWatermarks>>,
64 pub state_table_info: &'a HummockVersionStateTableInfo,
65 pub in_progress_compactions: &'a InProgressCompactionView,
66}
67
68pub trait CompactionSelector: Sync + Send {
69 fn pick_compaction(
70 &mut self,
71 task_id: HummockCompactionTaskId,
72 context: CompactionSelectorContext<'_>,
73 ) -> Option<CompactionTask>;
74
75 fn report_statistic_metrics(&self, _metrics: &MetaMetrics) {}
76
77 fn name(&self) -> &'static str;
78
79 fn task_type(&self) -> compact_task::TaskType;
80}
81
82pub fn default_compaction_selector() -> Box<dyn CompactionSelector> {
83 Box::<DynamicLevelSelector>::default()
84}
85
86#[derive(Default)]
87pub struct LocalSelectorStatistic {
88 skip_picker: Vec<(usize, usize, LocalPickerStatistic)>,
89}
90
91impl LocalSelectorStatistic {
92 pub fn report_to_metrics(&self, group_id: CompactionGroupId, metrics: &MetaMetrics) {
93 for (start_level, target_level, stats) in &self.skip_picker {
94 let level_label = format!("cg{}-{}-to-{}", group_id, start_level, target_level);
95 if stats.skip_by_write_amp_limit > 0 {
96 metrics
97 .compact_skip_frequency
98 .with_label_values(&[level_label.as_str(), "write-amp"])
99 .inc();
100 }
101 if stats.skip_by_count_limit > 0 {
102 metrics
103 .compact_skip_frequency
104 .with_label_values(&[level_label.as_str(), "count"])
105 .inc();
106 }
107 if stats.skip_by_pending_files > 0 {
108 metrics
109 .compact_skip_frequency
110 .with_label_values(&[level_label.as_str(), "pending-files"])
111 .inc();
112 }
113 if stats.skip_by_overlapping > 0 {
114 metrics
115 .compact_skip_frequency
116 .with_label_values(&[level_label.as_str(), "overlapping"])
117 .inc();
118 }
119 metrics
120 .compact_skip_frequency
121 .with_label_values(&[level_label.as_str(), "picker"])
122 .inc();
123 }
124 }
125}
126
127#[cfg(test)]
128pub mod tests {
129 use std::ops::Range;
130
131 use itertools::Itertools;
132 use risingwave_hummock_sdk::key_range::KeyRange;
133 use risingwave_hummock_sdk::level::{Level, OverlappingLevel};
134 use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
135 use risingwave_pb::hummock::LevelType;
136
137 use super::*;
138 use crate::hummock::test_utils::iterator_test_key_of_epoch;
139
140 pub fn push_table_level0_overlapping(levels: &mut Levels, sst: SstableInfo) {
141 levels.l0.total_file_size += sst.sst_size;
142 levels.l0.sub_levels.push(Level {
143 level_idx: 0,
144 level_type: LevelType::Overlapping,
145 total_file_size: sst.sst_size,
146 uncompressed_file_size: sst.uncompressed_file_size,
147 sub_level_id: sst.sst_id.as_raw_id(),
148 table_infos: vec![sst],
149 ..Default::default()
150 });
151 }
152
153 pub fn push_table_level0_nonoverlapping(levels: &mut Levels, sst: SstableInfo) {
154 push_table_level0_overlapping(levels, sst);
155 levels.l0.sub_levels.last_mut().unwrap().level_type = LevelType::Nonoverlapping;
156 }
157
158 pub fn push_tables_level0_nonoverlapping(levels: &mut Levels, table_infos: Vec<SstableInfo>) {
159 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum::<u64>();
160 let uncompressed_file_size = table_infos
161 .iter()
162 .map(|table| table.uncompressed_file_size)
163 .sum();
164 let sub_level_id = table_infos[0].sst_id.as_raw_id();
165 levels.l0.total_file_size += total_file_size;
166 levels.l0.sub_levels.push(Level {
167 level_idx: 0,
168 level_type: LevelType::Nonoverlapping,
169 total_file_size,
170 sub_level_id,
171 table_infos,
172 uncompressed_file_size,
173 ..Default::default()
174 });
175 }
176
177 pub fn generate_table(
178 id: u64,
179 table_prefix: u64,
180 left: usize,
181 right: usize,
182 epoch: u64,
183 ) -> SstableInfo {
184 generate_table_impl(id, table_prefix, left, right, epoch).into()
185 }
186
187 pub fn generate_table_impl(
188 id: u64,
189 table_prefix: u64,
190 left: usize,
191 right: usize,
192 epoch: u64,
193 ) -> SstableInfoInner {
194 let object_size = (right - left + 1) as u64;
195 SstableInfoInner {
196 object_id: id.into(),
197 sst_id: id.into(),
198 key_range: KeyRange {
199 left: iterator_test_key_of_epoch(table_prefix, left, epoch).into(),
200 right: iterator_test_key_of_epoch(table_prefix, right, epoch).into(),
201 right_exclusive: false,
202 },
203 file_size: object_size,
204 table_ids: vec![(table_prefix as u32).into()],
205 uncompressed_file_size: (right - left + 1) as u64,
206 total_key_count: (right - left + 1) as u64,
207 sst_size: object_size,
208 ..Default::default()
209 }
210 }
211
212 pub fn generate_table_with_ids_and_epochs(
213 id: u64,
214 table_prefix: u64,
215 left: usize,
216 right: usize,
217 epoch: u64,
218 table_ids: Vec<u32>,
219 min_epoch: u64,
220 max_epoch: u64,
221 ) -> SstableInfo {
222 let object_size = (right - left + 1) as u64;
223 SstableInfoInner {
224 object_id: id.into(),
225 sst_id: id.into(),
226 key_range: KeyRange {
227 left: iterator_test_key_of_epoch(table_prefix, left, epoch).into(),
228 right: iterator_test_key_of_epoch(table_prefix, right, epoch).into(),
229 right_exclusive: false,
230 },
231 file_size: object_size,
232 table_ids: table_ids.into_iter().map_into().collect(),
233 uncompressed_file_size: object_size,
234 min_epoch,
235 max_epoch,
236 sst_size: object_size,
237 ..Default::default()
238 }
239 .into()
240 }
241
242 pub fn generate_tables(
243 ids: Range<u64>,
244 keys: Range<usize>,
245 epoch: u64,
246 file_size: u64,
247 ) -> Vec<SstableInfo> {
248 let step = (keys.end - keys.start) / (ids.end - ids.start) as usize;
249 let mut start = keys.start;
250 let mut tables = vec![];
251 for id in ids {
252 let mut table = generate_table_impl(id, 1, start, start + step - 1, epoch);
253 table.file_size = file_size;
254 table.sst_size = file_size;
255 tables.push(table.into());
256 start += step;
257 }
258 tables
259 }
260
261 pub fn generate_level(level_idx: u32, table_infos: Vec<SstableInfo>) -> Level {
262 let total_file_size = table_infos.iter().map(|sst| sst.sst_size).sum();
263 let uncompressed_file_size = table_infos
264 .iter()
265 .map(|sst| sst.uncompressed_file_size)
266 .sum();
267 Level {
268 level_idx,
269 level_type: LevelType::Nonoverlapping,
270 table_infos,
271 total_file_size,
272 sub_level_id: 0,
273 uncompressed_file_size,
274 ..Default::default()
275 }
276 }
277
278 pub fn generate_l0_nonoverlapping_sublevels(table_infos: Vec<SstableInfo>) -> OverlappingLevel {
281 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum::<u64>();
282 let uncompressed_file_size = table_infos
283 .iter()
284 .map(|table| table.uncompressed_file_size)
285 .sum::<u64>();
286 OverlappingLevel {
287 sub_levels: table_infos
288 .into_iter()
289 .enumerate()
290 .map(|(idx, table)| Level {
291 level_idx: 0,
292 level_type: LevelType::Nonoverlapping,
293 total_file_size: table.sst_size,
294 uncompressed_file_size: table.uncompressed_file_size,
295 sub_level_id: idx as u64,
296 table_infos: vec![table],
297 ..Default::default()
298 })
299 .collect_vec(),
300 total_file_size,
301 uncompressed_file_size,
302 }
303 }
304
305 pub fn generate_l0_nonoverlapping_multi_sublevels(
306 table_infos: Vec<Vec<SstableInfo>>,
307 ) -> OverlappingLevel {
308 let mut l0 = OverlappingLevel {
309 sub_levels: table_infos
310 .into_iter()
311 .enumerate()
312 .map(|(idx, table)| Level {
313 level_idx: 0,
314 level_type: LevelType::Nonoverlapping,
315 total_file_size: table.iter().map(|table| table.sst_size).sum::<u64>(),
316 uncompressed_file_size: table
317 .iter()
318 .map(|sst| sst.uncompressed_file_size)
319 .sum::<u64>(),
320 sub_level_id: idx as u64,
321 table_infos: table,
322 ..Default::default()
323 })
324 .collect_vec(),
325 total_file_size: 0,
326 uncompressed_file_size: 0,
327 };
328
329 l0.total_file_size = l0.sub_levels.iter().map(|l| l.total_file_size).sum::<u64>();
330 l0.uncompressed_file_size = l0
331 .sub_levels
332 .iter()
333 .map(|l| l.uncompressed_file_size)
334 .sum::<u64>();
335 l0
336 }
337
338 pub fn generate_l0_overlapping_sublevels(
341 table_infos: Vec<Vec<SstableInfo>>,
342 ) -> OverlappingLevel {
343 let mut l0 = OverlappingLevel {
344 sub_levels: table_infos
345 .into_iter()
346 .enumerate()
347 .map(|(idx, table)| Level {
348 level_idx: 0,
349 level_type: LevelType::Overlapping,
350 total_file_size: table.iter().map(|table| table.sst_size).sum::<u64>(),
351 sub_level_id: idx as u64,
352 table_infos: table.clone(),
353 uncompressed_file_size: table
354 .iter()
355 .map(|sst| sst.uncompressed_file_size)
356 .sum::<u64>(),
357 ..Default::default()
358 })
359 .collect_vec(),
360 total_file_size: 0,
361 uncompressed_file_size: 0,
362 };
363 l0.total_file_size = l0.sub_levels.iter().map(|l| l.total_file_size).sum::<u64>();
364 l0.uncompressed_file_size = l0
365 .sub_levels
366 .iter()
367 .map(|l| l.uncompressed_file_size)
368 .sum::<u64>();
369 l0
370 }
371
372 pub fn assert_compaction_task(compact_task: &CompactionTask, level_handlers: &[LevelHandler]) {
373 for i in &compact_task.input.input_levels {
374 for t in &i.table_infos {
375 assert!(level_handlers[i.level_idx as usize].is_pending_compact(&t.sst_id));
376 }
377 }
378 }
379}