risingwave_meta/hummock/compaction/selector/
mod.rs1mod emergency_selector;
21pub(crate) mod level_selector;
22mod manual_selector;
23mod space_reclaim_selector;
24mod tombstone_compaction_selector;
25mod ttl_selector;
26mod vnode_watermark_selector;
27
28use std::collections::{BTreeSet, HashMap};
29use std::sync::Arc;
30
31pub use emergency_selector::EmergencySelector;
32pub use level_selector::{DynamicLevelSelector, DynamicLevelSelectorCore};
33pub use manual_selector::{ManualCompactionOption, ManualCompactionSelector};
34use risingwave_common::catalog::{TableId, TableOption};
35use risingwave_hummock_sdk::level::Levels;
36use risingwave_hummock_sdk::table_watermark::TableWatermarks;
37use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
38use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
39use risingwave_pb::hummock::compact_task;
40pub use space_reclaim_selector::SpaceReclaimCompactionSelector;
41pub use tombstone_compaction_selector::TombstoneCompactionSelector;
42pub use ttl_selector::TtlCompactionSelector;
43pub use vnode_watermark_selector::VnodeWatermarkCompactionSelector;
44
45use super::picker::LocalPickerStatistic;
46use super::{
47 CompactionDeveloperConfig, LevelCompactionPicker, TierCompactionPicker, create_compaction_task,
48};
49use crate::hummock::compaction::CompactionTask;
50use crate::hummock::level_handler::LevelHandler;
51use crate::hummock::model::CompactionGroup;
52use crate::rpc::metrics::MetaMetrics;
53
54pub struct CompactionSelectorContext<'a> {
55 pub group: &'a CompactionGroup,
56 pub levels: &'a Levels,
57 pub member_table_ids: &'a BTreeSet<TableId>,
58 pub level_handlers: &'a mut [LevelHandler],
59 pub selector_stats: &'a mut LocalSelectorStatistic,
60 pub table_id_to_options: &'a HashMap<TableId, TableOption>,
61 pub developer_config: Arc<CompactionDeveloperConfig>,
62 pub table_watermarks: &'a HashMap<TableId, Arc<TableWatermarks>>,
63 pub state_table_info: &'a HummockVersionStateTableInfo,
64}
65
66pub trait CompactionSelector: Sync + Send {
67 fn pick_compaction(
68 &mut self,
69 task_id: HummockCompactionTaskId,
70 context: CompactionSelectorContext<'_>,
71 ) -> Option<CompactionTask>;
72
73 fn report_statistic_metrics(&self, _metrics: &MetaMetrics) {}
74
75 fn name(&self) -> &'static str;
76
77 fn task_type(&self) -> compact_task::TaskType;
78}
79
80pub fn default_compaction_selector() -> Box<dyn CompactionSelector> {
81 Box::<DynamicLevelSelector>::default()
82}
83
84#[derive(Default)]
85pub struct LocalSelectorStatistic {
86 skip_picker: Vec<(usize, usize, LocalPickerStatistic)>,
87}
88
89impl LocalSelectorStatistic {
90 pub fn report_to_metrics(&self, group_id: CompactionGroupId, metrics: &MetaMetrics) {
91 for (start_level, target_level, stats) in &self.skip_picker {
92 let level_label = format!("cg{}-{}-to-{}", group_id, start_level, target_level);
93 if stats.skip_by_write_amp_limit > 0 {
94 metrics
95 .compact_skip_frequency
96 .with_label_values(&[level_label.as_str(), "write-amp"])
97 .inc();
98 }
99 if stats.skip_by_count_limit > 0 {
100 metrics
101 .compact_skip_frequency
102 .with_label_values(&[level_label.as_str(), "count"])
103 .inc();
104 }
105 if stats.skip_by_pending_files > 0 {
106 metrics
107 .compact_skip_frequency
108 .with_label_values(&[level_label.as_str(), "pending-files"])
109 .inc();
110 }
111 if stats.skip_by_overlapping > 0 {
112 metrics
113 .compact_skip_frequency
114 .with_label_values(&[level_label.as_str(), "overlapping"])
115 .inc();
116 }
117 metrics
118 .compact_skip_frequency
119 .with_label_values(&[level_label.as_str(), "picker"])
120 .inc();
121 }
122 }
123}
124
125#[cfg(test)]
126pub mod tests {
127 use std::ops::Range;
128
129 use itertools::Itertools;
130 use risingwave_hummock_sdk::key_range::KeyRange;
131 use risingwave_hummock_sdk::level::{Level, OverlappingLevel};
132 use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
133 use risingwave_pb::hummock::LevelType;
134
135 use super::*;
136 use crate::hummock::test_utils::iterator_test_key_of_epoch;
137
138 pub fn push_table_level0_overlapping(levels: &mut Levels, sst: SstableInfo) {
139 levels.l0.total_file_size += sst.sst_size;
140 levels.l0.sub_levels.push(Level {
141 level_idx: 0,
142 level_type: LevelType::Overlapping,
143 total_file_size: sst.sst_size,
144 uncompressed_file_size: sst.uncompressed_file_size,
145 sub_level_id: sst.sst_id.as_raw_id(),
146 table_infos: vec![sst],
147 ..Default::default()
148 });
149 }
150
151 pub fn push_table_level0_nonoverlapping(levels: &mut Levels, sst: SstableInfo) {
152 push_table_level0_overlapping(levels, sst);
153 levels.l0.sub_levels.last_mut().unwrap().level_type = LevelType::Nonoverlapping;
154 }
155
156 pub fn push_tables_level0_nonoverlapping(levels: &mut Levels, table_infos: Vec<SstableInfo>) {
157 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum::<u64>();
158 let uncompressed_file_size = table_infos
159 .iter()
160 .map(|table| table.uncompressed_file_size)
161 .sum();
162 let sub_level_id = table_infos[0].sst_id.as_raw_id();
163 levels.l0.total_file_size += total_file_size;
164 levels.l0.sub_levels.push(Level {
165 level_idx: 0,
166 level_type: LevelType::Nonoverlapping,
167 total_file_size,
168 sub_level_id,
169 table_infos,
170 uncompressed_file_size,
171 ..Default::default()
172 });
173 }
174
175 pub fn generate_table(
176 id: u64,
177 table_prefix: u64,
178 left: usize,
179 right: usize,
180 epoch: u64,
181 ) -> SstableInfo {
182 generate_table_impl(id, table_prefix, left, right, epoch).into()
183 }
184
185 pub fn generate_table_impl(
186 id: u64,
187 table_prefix: u64,
188 left: usize,
189 right: usize,
190 epoch: u64,
191 ) -> SstableInfoInner {
192 let object_size = (right - left + 1) as u64;
193 SstableInfoInner {
194 object_id: id.into(),
195 sst_id: id.into(),
196 key_range: KeyRange {
197 left: iterator_test_key_of_epoch(table_prefix, left, epoch).into(),
198 right: iterator_test_key_of_epoch(table_prefix, right, epoch).into(),
199 right_exclusive: false,
200 },
201 file_size: object_size,
202 table_ids: vec![(table_prefix as u32).into()],
203 uncompressed_file_size: (right - left + 1) as u64,
204 total_key_count: (right - left + 1) as u64,
205 sst_size: object_size,
206 ..Default::default()
207 }
208 }
209
210 pub fn generate_table_with_ids_and_epochs(
211 id: u64,
212 table_prefix: u64,
213 left: usize,
214 right: usize,
215 epoch: u64,
216 table_ids: Vec<u32>,
217 min_epoch: u64,
218 max_epoch: u64,
219 ) -> SstableInfo {
220 let object_size = (right - left + 1) as u64;
221 SstableInfoInner {
222 object_id: id.into(),
223 sst_id: id.into(),
224 key_range: KeyRange {
225 left: iterator_test_key_of_epoch(table_prefix, left, epoch).into(),
226 right: iterator_test_key_of_epoch(table_prefix, right, epoch).into(),
227 right_exclusive: false,
228 },
229 file_size: object_size,
230 table_ids: table_ids.into_iter().map_into().collect(),
231 uncompressed_file_size: object_size,
232 min_epoch,
233 max_epoch,
234 sst_size: object_size,
235 ..Default::default()
236 }
237 .into()
238 }
239
240 pub fn generate_tables(
241 ids: Range<u64>,
242 keys: Range<usize>,
243 epoch: u64,
244 file_size: u64,
245 ) -> Vec<SstableInfo> {
246 let step = (keys.end - keys.start) / (ids.end - ids.start) as usize;
247 let mut start = keys.start;
248 let mut tables = vec![];
249 for id in ids {
250 let mut table = generate_table_impl(id, 1, start, start + step - 1, epoch);
251 table.file_size = file_size;
252 table.sst_size = file_size;
253 tables.push(table.into());
254 start += step;
255 }
256 tables
257 }
258
259 pub fn generate_level(level_idx: u32, table_infos: Vec<SstableInfo>) -> Level {
260 let total_file_size = table_infos.iter().map(|sst| sst.sst_size).sum();
261 let uncompressed_file_size = table_infos
262 .iter()
263 .map(|sst| sst.uncompressed_file_size)
264 .sum();
265 Level {
266 level_idx,
267 level_type: LevelType::Nonoverlapping,
268 table_infos,
269 total_file_size,
270 sub_level_id: 0,
271 uncompressed_file_size,
272 ..Default::default()
273 }
274 }
275
276 pub fn generate_l0_nonoverlapping_sublevels(table_infos: Vec<SstableInfo>) -> OverlappingLevel {
279 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum::<u64>();
280 let uncompressed_file_size = table_infos
281 .iter()
282 .map(|table| table.uncompressed_file_size)
283 .sum::<u64>();
284 OverlappingLevel {
285 sub_levels: table_infos
286 .into_iter()
287 .enumerate()
288 .map(|(idx, table)| Level {
289 level_idx: 0,
290 level_type: LevelType::Nonoverlapping,
291 total_file_size: table.sst_size,
292 uncompressed_file_size: table.uncompressed_file_size,
293 sub_level_id: idx as u64,
294 table_infos: vec![table],
295 ..Default::default()
296 })
297 .collect_vec(),
298 total_file_size,
299 uncompressed_file_size,
300 }
301 }
302
303 pub fn generate_l0_nonoverlapping_multi_sublevels(
304 table_infos: Vec<Vec<SstableInfo>>,
305 ) -> OverlappingLevel {
306 let mut l0 = OverlappingLevel {
307 sub_levels: table_infos
308 .into_iter()
309 .enumerate()
310 .map(|(idx, table)| Level {
311 level_idx: 0,
312 level_type: LevelType::Nonoverlapping,
313 total_file_size: table.iter().map(|table| table.sst_size).sum::<u64>(),
314 uncompressed_file_size: table
315 .iter()
316 .map(|sst| sst.uncompressed_file_size)
317 .sum::<u64>(),
318 sub_level_id: idx as u64,
319 table_infos: table,
320 ..Default::default()
321 })
322 .collect_vec(),
323 total_file_size: 0,
324 uncompressed_file_size: 0,
325 };
326
327 l0.total_file_size = l0.sub_levels.iter().map(|l| l.total_file_size).sum::<u64>();
328 l0.uncompressed_file_size = l0
329 .sub_levels
330 .iter()
331 .map(|l| l.uncompressed_file_size)
332 .sum::<u64>();
333 l0
334 }
335
336 pub fn generate_l0_overlapping_sublevels(
339 table_infos: Vec<Vec<SstableInfo>>,
340 ) -> OverlappingLevel {
341 let mut l0 = OverlappingLevel {
342 sub_levels: table_infos
343 .into_iter()
344 .enumerate()
345 .map(|(idx, table)| Level {
346 level_idx: 0,
347 level_type: LevelType::Overlapping,
348 total_file_size: table.iter().map(|table| table.sst_size).sum::<u64>(),
349 sub_level_id: idx as u64,
350 table_infos: table.clone(),
351 uncompressed_file_size: table
352 .iter()
353 .map(|sst| sst.uncompressed_file_size)
354 .sum::<u64>(),
355 ..Default::default()
356 })
357 .collect_vec(),
358 total_file_size: 0,
359 uncompressed_file_size: 0,
360 };
361 l0.total_file_size = l0.sub_levels.iter().map(|l| l.total_file_size).sum::<u64>();
362 l0.uncompressed_file_size = l0
363 .sub_levels
364 .iter()
365 .map(|l| l.uncompressed_file_size)
366 .sum::<u64>();
367 l0
368 }
369
370 pub fn assert_compaction_task(compact_task: &CompactionTask, level_handlers: &[LevelHandler]) {
371 for i in &compact_task.input.input_levels {
372 for t in &i.table_infos {
373 assert!(level_handlers[i.level_idx as usize].is_pending_compact(&t.sst_id));
374 }
375 }
376 }
377}