risingwave_meta/hummock/compaction/picker/
tombstone_reclaim_compaction_picker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_hummock_sdk::level::{InputLevel, Levels};
18
19use crate::hummock::compaction::overlap_strategy::OverlapStrategy;
20use crate::hummock::compaction::picker::CompactionInput;
21use crate::hummock::level_handler::LevelHandler;
22
23pub struct TombstoneReclaimCompactionPicker {
24    overlap_strategy: Arc<dyn OverlapStrategy>,
25    delete_ratio: u64,
26    range_delete_ratio: u64,
27}
28
29#[derive(Default)]
30pub struct TombstoneReclaimPickerState {
31    pub last_level: usize,
32}
33
34impl TombstoneReclaimCompactionPicker {
35    pub fn new(
36        overlap_strategy: Arc<dyn OverlapStrategy>,
37        delete_ratio: u64,
38        range_delete_ratio: u64,
39    ) -> Self {
40        Self {
41            overlap_strategy,
42            delete_ratio,
43            range_delete_ratio,
44        }
45    }
46
47    pub fn pick_compaction(
48        &self,
49        levels: &Levels,
50        level_handlers: &[LevelHandler],
51        state: &mut TombstoneReclaimPickerState,
52    ) -> Option<CompactionInput> {
53        assert!(!levels.levels.is_empty());
54        if state.last_level == 0 {
55            state.last_level = 1;
56        }
57
58        while state.last_level <= levels.levels.len() {
59            let mut select_input_ssts = vec![];
60            for sst in &levels.levels[state.last_level - 1].table_infos {
61                let need_reclaim = (sst.range_tombstone_count * 100
62                    >= sst.total_key_count * self.range_delete_ratio)
63                    || (sst.stale_key_count * 100 >= sst.total_key_count * self.delete_ratio);
64                if !need_reclaim || level_handlers[state.last_level].is_pending_compact(&sst.sst_id)
65                {
66                    continue;
67                }
68
69                select_input_ssts.push(sst.clone());
70                break;
71            }
72
73            // turn to next_round
74            if !select_input_ssts.is_empty() {
75                let target_level = if state.last_level
76                    == levels.levels.last().unwrap().level_idx as usize
77                {
78                    InputLevel {
79                        level_idx: state.last_level as u32,
80                        level_type: levels.levels[state.last_level - 1].level_type,
81                        table_infos: vec![],
82                    }
83                } else {
84                    let target_table_infos = self.overlap_strategy.check_base_level_overlap(
85                        &select_input_ssts,
86                        &levels.levels[state.last_level].table_infos,
87                    );
88                    let mut pending_compact = false;
89                    for sst in &target_table_infos {
90                        if level_handlers[state.last_level + 1].is_pending_compact(&sst.sst_id) {
91                            pending_compact = true;
92                            break;
93                        }
94                    }
95                    if pending_compact {
96                        state.last_level += 1;
97                        continue;
98                    }
99                    InputLevel {
100                        level_idx: (state.last_level + 1) as u32,
101                        level_type: levels.levels[state.last_level].level_type,
102                        table_infos: target_table_infos,
103                    }
104                };
105                return Some(CompactionInput {
106                    select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
107                    target_input_size: target_level
108                        .table_infos
109                        .iter()
110                        .map(|sst| sst.sst_size)
111                        .sum(),
112                    total_file_count: (select_input_ssts.len() + target_level.table_infos.len())
113                        as u64,
114                    target_level: target_level.level_idx as usize,
115                    input_levels: vec![
116                        InputLevel {
117                            level_idx: state.last_level as u32,
118                            level_type: levels.levels[state.last_level - 1].level_type,
119                            table_infos: select_input_ssts,
120                        },
121                        target_level,
122                    ],
123                    ..Default::default()
124                });
125            }
126            state.last_level += 1;
127        }
128        state.last_level = 0;
129        None
130    }
131}
132
133#[cfg(test)]
134pub mod tests {
135    use super::*;
136    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
137    use crate::hummock::compaction::create_overlap_strategy;
138    use crate::hummock::compaction::selector::tests::{
139        generate_level, generate_table, generate_table_impl,
140    };
141
142    #[test]
143    fn test_basic() {
144        let mut levels = Levels {
145            levels: vec![
146                generate_level(1, vec![]),
147                generate_level(
148                    2,
149                    vec![
150                        generate_table(1, 1, 1, 100, 1),
151                        generate_table(2, 1, 101, 200, 1),
152                    ],
153                ),
154            ],
155            ..Default::default()
156        };
157        let levels_handler = vec![
158            LevelHandler::new(0),
159            LevelHandler::new(1),
160            LevelHandler::new(2),
161        ];
162        let mut state = TombstoneReclaimPickerState::default();
163
164        let config = Arc::new(CompactionConfigBuilder::new().build());
165
166        let strategy = create_overlap_strategy(config.compaction_mode());
167        let picker = TombstoneReclaimCompactionPicker::new(strategy.clone(), 40, 20);
168        let ret = picker.pick_compaction(&levels, &levels_handler, &mut state);
169        assert!(ret.is_none());
170        let mut sst = generate_table_impl(3, 1, 201, 300, 1);
171        sst.stale_key_count = 40;
172        sst.total_key_count = 100;
173        levels.levels[1].table_infos.push(sst.into());
174
175        let ret = picker
176            .pick_compaction(&levels, &levels_handler, &mut state)
177            .unwrap();
178        assert_eq!(2, ret.input_levels.len());
179        assert_eq!(3, ret.input_levels[0].table_infos[0].sst_id);
180        let mut sst = generate_table_impl(4, 1, 1, 100, 1);
181        sst.stale_key_count = 30;
182        sst.range_tombstone_count = 30;
183        sst.total_key_count = 100;
184        levels.levels[0].table_infos.push(sst.into());
185        let picker = TombstoneReclaimCompactionPicker::new(strategy, 50, 10);
186        let mut state = TombstoneReclaimPickerState::default();
187        let ret = picker
188            .pick_compaction(&levels, &levels_handler, &mut state)
189            .unwrap();
190        assert_eq!(2, ret.input_levels.len());
191        assert_eq!(4, ret.input_levels[0].table_infos[0].sst_id);
192        assert_eq!(1, ret.input_levels[1].table_infos[0].sst_id);
193    }
194}