risingwave_meta/hummock/compaction/selector/
mod.rs

1//  Copyright 2025 RisingWave Labs
2//
3//  Licensed under the Apache License, Version 2.0 (the "License");
4//  you may not use this file except in compliance with the License.
5//  You may obtain a copy of the License at
6//
7//  http://www.apache.org/licenses/LICENSE-2.0
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14//
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20mod emergency_selector;
21pub(crate) mod level_selector;
22mod manual_selector;
23mod space_reclaim_selector;
24mod tombstone_compaction_selector;
25mod ttl_selector;
26mod vnode_watermark_selector;
27
28use std::collections::{BTreeSet, HashMap};
29use std::sync::Arc;
30
31pub use emergency_selector::EmergencySelector;
32pub use level_selector::{DynamicLevelSelector, DynamicLevelSelectorCore};
33pub use manual_selector::{ManualCompactionOption, ManualCompactionSelector};
34use risingwave_common::catalog::{TableId, TableOption};
35use risingwave_hummock_sdk::HummockCompactionTaskId;
36use risingwave_hummock_sdk::level::Levels;
37use risingwave_hummock_sdk::table_watermark::TableWatermarks;
38use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
39use risingwave_pb::hummock::compact_task;
40pub use space_reclaim_selector::SpaceReclaimCompactionSelector;
41pub use tombstone_compaction_selector::TombstoneCompactionSelector;
42pub use ttl_selector::TtlCompactionSelector;
43pub use vnode_watermark_selector::VnodeWatermarkCompactionSelector;
44
45use super::picker::LocalPickerStatistic;
46use super::{
47    CompactionDeveloperConfig, LevelCompactionPicker, TierCompactionPicker, create_compaction_task,
48};
49use crate::hummock::compaction::CompactionTask;
50use crate::hummock::level_handler::LevelHandler;
51use crate::hummock::model::CompactionGroup;
52use crate::rpc::metrics::MetaMetrics;
53
54pub struct CompactionSelectorContext<'a> {
55    pub group: &'a CompactionGroup,
56    pub levels: &'a Levels,
57    pub member_table_ids: &'a BTreeSet<TableId>,
58    pub level_handlers: &'a mut [LevelHandler],
59    pub selector_stats: &'a mut LocalSelectorStatistic,
60    pub table_id_to_options: &'a HashMap<u32, TableOption>,
61    pub developer_config: Arc<CompactionDeveloperConfig>,
62    pub table_watermarks: &'a HashMap<TableId, Arc<TableWatermarks>>,
63    pub state_table_info: &'a HummockVersionStateTableInfo,
64}
65
66pub trait CompactionSelector: Sync + Send {
67    fn pick_compaction(
68        &mut self,
69        task_id: HummockCompactionTaskId,
70        context: CompactionSelectorContext<'_>,
71    ) -> Option<CompactionTask>;
72
73    fn report_statistic_metrics(&self, _metrics: &MetaMetrics) {}
74
75    fn name(&self) -> &'static str;
76
77    fn task_type(&self) -> compact_task::TaskType;
78}
79
80pub fn default_compaction_selector() -> Box<dyn CompactionSelector> {
81    Box::<DynamicLevelSelector>::default()
82}
83
84#[derive(Default)]
85pub struct LocalSelectorStatistic {
86    skip_picker: Vec<(usize, usize, LocalPickerStatistic)>,
87}
88
89impl LocalSelectorStatistic {
90    pub fn report_to_metrics(&self, group_id: u64, metrics: &MetaMetrics) {
91        for (start_level, target_level, stats) in &self.skip_picker {
92            let level_label = format!("cg{}-{}-to-{}", group_id, start_level, target_level);
93            if stats.skip_by_write_amp_limit > 0 {
94                metrics
95                    .compact_skip_frequency
96                    .with_label_values(&[level_label.as_str(), "write-amp"])
97                    .inc();
98            }
99            if stats.skip_by_count_limit > 0 {
100                metrics
101                    .compact_skip_frequency
102                    .with_label_values(&[level_label.as_str(), "count"])
103                    .inc();
104            }
105            if stats.skip_by_pending_files > 0 {
106                metrics
107                    .compact_skip_frequency
108                    .with_label_values(&[level_label.as_str(), "pending-files"])
109                    .inc();
110            }
111            if stats.skip_by_overlapping > 0 {
112                metrics
113                    .compact_skip_frequency
114                    .with_label_values(&[level_label.as_str(), "overlapping"])
115                    .inc();
116            }
117            metrics
118                .compact_skip_frequency
119                .with_label_values(&[level_label.as_str(), "picker"])
120                .inc();
121        }
122    }
123}
124
125#[cfg(test)]
126pub mod tests {
127    use std::ops::Range;
128
129    use itertools::Itertools;
130    use risingwave_hummock_sdk::key_range::KeyRange;
131    use risingwave_hummock_sdk::level::{Level, OverlappingLevel};
132    use risingwave_hummock_sdk::sstable_info::{SstableInfo, SstableInfoInner};
133    use risingwave_pb::hummock::LevelType;
134
135    use super::*;
136    use crate::hummock::test_utils::iterator_test_key_of_epoch;
137
138    pub fn push_table_level0_overlapping(levels: &mut Levels, sst: SstableInfo) {
139        levels.l0.total_file_size += sst.sst_size;
140        levels.l0.sub_levels.push(Level {
141            level_idx: 0,
142            level_type: LevelType::Overlapping,
143            total_file_size: sst.sst_size,
144            uncompressed_file_size: sst.uncompressed_file_size,
145            sub_level_id: sst.sst_id,
146            table_infos: vec![sst],
147            ..Default::default()
148        });
149    }
150
151    pub fn push_table_level0_nonoverlapping(levels: &mut Levels, sst: SstableInfo) {
152        push_table_level0_overlapping(levels, sst);
153        levels.l0.sub_levels.last_mut().unwrap().level_type = LevelType::Nonoverlapping;
154    }
155
156    pub fn push_tables_level0_nonoverlapping(levels: &mut Levels, table_infos: Vec<SstableInfo>) {
157        let total_file_size = table_infos.iter().map(|table| table.sst_size).sum::<u64>();
158        let uncompressed_file_size = table_infos
159            .iter()
160            .map(|table| table.uncompressed_file_size)
161            .sum();
162        let sub_level_id = table_infos[0].sst_id;
163        levels.l0.total_file_size += total_file_size;
164        levels.l0.sub_levels.push(Level {
165            level_idx: 0,
166            level_type: LevelType::Nonoverlapping,
167            total_file_size,
168            sub_level_id,
169            table_infos,
170            uncompressed_file_size,
171            ..Default::default()
172        });
173    }
174
175    pub fn generate_table(
176        id: u64,
177        table_prefix: u64,
178        left: usize,
179        right: usize,
180        epoch: u64,
181    ) -> SstableInfo {
182        generate_table_impl(id, table_prefix, left, right, epoch).into()
183    }
184
185    pub fn generate_table_impl(
186        id: u64,
187        table_prefix: u64,
188        left: usize,
189        right: usize,
190        epoch: u64,
191    ) -> SstableInfoInner {
192        let object_size = (right - left + 1) as u64;
193        SstableInfoInner {
194            object_id: id,
195            sst_id: id,
196            key_range: KeyRange {
197                left: iterator_test_key_of_epoch(table_prefix, left, epoch).into(),
198                right: iterator_test_key_of_epoch(table_prefix, right, epoch).into(),
199                right_exclusive: false,
200            },
201            file_size: object_size,
202            table_ids: vec![table_prefix as u32],
203            uncompressed_file_size: (right - left + 1) as u64,
204            total_key_count: (right - left + 1) as u64,
205            sst_size: object_size,
206            ..Default::default()
207        }
208    }
209
210    #[allow(clippy::too_many_arguments)]
211    pub fn generate_table_with_ids_and_epochs(
212        id: u64,
213        table_prefix: u64,
214        left: usize,
215        right: usize,
216        epoch: u64,
217        table_ids: Vec<u32>,
218        min_epoch: u64,
219        max_epoch: u64,
220    ) -> SstableInfo {
221        let object_size = (right - left + 1) as u64;
222        SstableInfoInner {
223            object_id: id,
224            sst_id: id,
225            key_range: KeyRange {
226                left: iterator_test_key_of_epoch(table_prefix, left, epoch).into(),
227                right: iterator_test_key_of_epoch(table_prefix, right, epoch).into(),
228                right_exclusive: false,
229            },
230            file_size: object_size,
231            table_ids,
232            uncompressed_file_size: object_size,
233            min_epoch,
234            max_epoch,
235            sst_size: object_size,
236            ..Default::default()
237        }
238        .into()
239    }
240
241    pub fn generate_tables(
242        ids: Range<u64>,
243        keys: Range<usize>,
244        epoch: u64,
245        file_size: u64,
246    ) -> Vec<SstableInfo> {
247        let step = (keys.end - keys.start) / (ids.end - ids.start) as usize;
248        let mut start = keys.start;
249        let mut tables = vec![];
250        for id in ids {
251            let mut table = generate_table_impl(id, 1, start, start + step - 1, epoch);
252            table.file_size = file_size;
253            table.sst_size = file_size;
254            tables.push(table.into());
255            start += step;
256        }
257        tables
258    }
259
260    pub fn generate_level(level_idx: u32, table_infos: Vec<SstableInfo>) -> Level {
261        let total_file_size = table_infos.iter().map(|sst| sst.sst_size).sum();
262        let uncompressed_file_size = table_infos
263            .iter()
264            .map(|sst| sst.uncompressed_file_size)
265            .sum();
266        Level {
267            level_idx,
268            level_type: LevelType::Nonoverlapping,
269            table_infos,
270            total_file_size,
271            sub_level_id: 0,
272            uncompressed_file_size,
273            ..Default::default()
274        }
275    }
276
277    /// Returns a `OverlappingLevel`, with each `table_infos`'s element placed in a nonoverlapping
278    /// sub-level.
279    pub fn generate_l0_nonoverlapping_sublevels(table_infos: Vec<SstableInfo>) -> OverlappingLevel {
280        let total_file_size = table_infos.iter().map(|table| table.sst_size).sum::<u64>();
281        let uncompressed_file_size = table_infos
282            .iter()
283            .map(|table| table.uncompressed_file_size)
284            .sum::<u64>();
285        OverlappingLevel {
286            sub_levels: table_infos
287                .into_iter()
288                .enumerate()
289                .map(|(idx, table)| Level {
290                    level_idx: 0,
291                    level_type: LevelType::Nonoverlapping,
292                    total_file_size: table.sst_size,
293                    uncompressed_file_size: table.uncompressed_file_size,
294                    sub_level_id: idx as u64,
295                    table_infos: vec![table],
296                    ..Default::default()
297                })
298                .collect_vec(),
299            total_file_size,
300            uncompressed_file_size,
301        }
302    }
303
304    pub fn generate_l0_nonoverlapping_multi_sublevels(
305        table_infos: Vec<Vec<SstableInfo>>,
306    ) -> OverlappingLevel {
307        let mut l0 = OverlappingLevel {
308            sub_levels: table_infos
309                .into_iter()
310                .enumerate()
311                .map(|(idx, table)| Level {
312                    level_idx: 0,
313                    level_type: LevelType::Nonoverlapping,
314                    total_file_size: table.iter().map(|table| table.sst_size).sum::<u64>(),
315                    uncompressed_file_size: table
316                        .iter()
317                        .map(|sst| sst.uncompressed_file_size)
318                        .sum::<u64>(),
319                    sub_level_id: idx as u64,
320                    table_infos: table,
321                    ..Default::default()
322                })
323                .collect_vec(),
324            total_file_size: 0,
325            uncompressed_file_size: 0,
326        };
327
328        l0.total_file_size = l0.sub_levels.iter().map(|l| l.total_file_size).sum::<u64>();
329        l0.uncompressed_file_size = l0
330            .sub_levels
331            .iter()
332            .map(|l| l.uncompressed_file_size)
333            .sum::<u64>();
334        l0
335    }
336
337    /// Returns a `OverlappingLevel`, with each `table_infos`'s element placed in a overlapping
338    /// sub-level.
339    pub fn generate_l0_overlapping_sublevels(
340        table_infos: Vec<Vec<SstableInfo>>,
341    ) -> OverlappingLevel {
342        let mut l0 = OverlappingLevel {
343            sub_levels: table_infos
344                .into_iter()
345                .enumerate()
346                .map(|(idx, table)| Level {
347                    level_idx: 0,
348                    level_type: LevelType::Overlapping,
349                    total_file_size: table.iter().map(|table| table.sst_size).sum::<u64>(),
350                    sub_level_id: idx as u64,
351                    table_infos: table.clone(),
352                    uncompressed_file_size: table
353                        .iter()
354                        .map(|sst| sst.uncompressed_file_size)
355                        .sum::<u64>(),
356                    ..Default::default()
357                })
358                .collect_vec(),
359            total_file_size: 0,
360            uncompressed_file_size: 0,
361        };
362        l0.total_file_size = l0.sub_levels.iter().map(|l| l.total_file_size).sum::<u64>();
363        l0.uncompressed_file_size = l0
364            .sub_levels
365            .iter()
366            .map(|l| l.uncompressed_file_size)
367            .sum::<u64>();
368        l0
369    }
370
371    pub fn assert_compaction_task(compact_task: &CompactionTask, level_handlers: &[LevelHandler]) {
372        for i in &compact_task.input.input_levels {
373            for t in &i.table_infos {
374                assert!(level_handlers[i.level_idx as usize].is_pending_compact(&t.sst_id));
375            }
376        }
377    }
378}