risingwave_meta/hummock/compaction/picker/
tier_compaction_picker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_hummock_sdk::level::{InputLevel, Levels, OverlappingLevel};
18use risingwave_pb::hummock::{CompactionConfig, LevelType};
19
20use super::{
21    CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
22    ValidationRuleType,
23};
24use crate::hummock::level_handler::LevelHandler;
25
26pub struct TierCompactionPicker {
27    config: Arc<CompactionConfig>,
28    compaction_task_validator: Arc<CompactionTaskValidator>,
29}
30
31impl TierCompactionPicker {
32    #[cfg(test)]
33    pub fn new(config: Arc<CompactionConfig>) -> TierCompactionPicker {
34        TierCompactionPicker {
35            compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
36            config,
37        }
38    }
39
40    pub fn new_with_validator(
41        config: Arc<CompactionConfig>,
42        compaction_task_validator: Arc<CompactionTaskValidator>,
43    ) -> TierCompactionPicker {
44        TierCompactionPicker {
45            config,
46            compaction_task_validator,
47        }
48    }
49
50    fn pick_overlapping_level(
51        &self,
52        l0: &OverlappingLevel,
53        level_handler: &LevelHandler,
54        mut vnode_partition_count: u32,
55        stats: &mut LocalPickerStatistic,
56    ) -> Option<CompactionInput> {
57        for (idx, level) in l0.sub_levels.iter().enumerate() {
58            if level.level_type != LevelType::Overlapping {
59                continue;
60            }
61
62            if level.table_infos.is_empty() {
63                continue;
64            }
65
66            if level_handler.is_level_pending_compact(level) {
67                continue;
68            }
69
70            let input_level = InputLevel {
71                level_idx: 0,
72                level_type: level.level_type,
73                table_infos: level.table_infos.clone(),
74            };
75
76            let mut select_level_inputs = vec![input_level];
77
78            // We assume that the maximum size of each sub_level is sub_level_max_compaction_bytes,
79            // so the design here wants to merge multiple overlapping-levels in one compaction
80            let max_compaction_bytes = std::cmp::min(
81                self.config.max_compaction_bytes,
82                self.config.sub_level_max_compaction_bytes
83                    * self.config.level0_overlapping_sub_level_compact_level_count as u64,
84            );
85
86            let mut compaction_bytes = level.total_file_size;
87            let mut compact_file_count = level.table_infos.len() as u64;
88            // Limit sstable file count to avoid using too much memory.
89            let overlapping_max_compact_file_numer = self.config.level0_max_compact_file_number;
90
91            for other in &l0.sub_levels[idx + 1..] {
92                if compaction_bytes > max_compaction_bytes {
93                    break;
94                }
95
96                if compact_file_count > overlapping_max_compact_file_numer {
97                    break;
98                }
99
100                if level_handler.is_level_pending_compact(other) {
101                    break;
102                }
103
104                compaction_bytes += other.total_file_size;
105                compact_file_count += other.table_infos.len() as u64;
106                select_level_inputs.push(InputLevel {
107                    level_idx: 0,
108                    level_type: other.level_type,
109                    table_infos: other.table_infos.clone(),
110                });
111            }
112
113            select_level_inputs.reverse();
114            if compaction_bytes < self.config.sub_level_max_compaction_bytes / 2 {
115                vnode_partition_count = 0;
116            }
117
118            let result = CompactionInput {
119                input_levels: select_level_inputs,
120                target_level: 0,
121                target_sub_level_id: level.sub_level_id,
122                select_input_size: compaction_bytes,
123                target_input_size: 0,
124                total_file_count: compact_file_count,
125                vnode_partition_count,
126            };
127
128            if !self.compaction_task_validator.valid_compact_task(
129                &result,
130                ValidationRuleType::Tier,
131                stats,
132            ) {
133                continue;
134            }
135
136            return Some(result);
137        }
138        None
139    }
140}
141
142impl CompactionPicker for TierCompactionPicker {
143    fn pick_compaction(
144        &mut self,
145        levels: &Levels,
146        level_handlers: &[LevelHandler],
147        stats: &mut LocalPickerStatistic,
148    ) -> Option<CompactionInput> {
149        let l0 = &levels.l0;
150        if l0.sub_levels.is_empty() {
151            return None;
152        }
153
154        self.pick_overlapping_level(
155            l0,
156            &level_handlers[0],
157            self.config.split_weight_by_vnode,
158            stats,
159        )
160    }
161}
162
163#[cfg(test)]
164pub mod tests {
165    use std::sync::Arc;
166
167    use risingwave_hummock_sdk::compaction_group::hummock_version_ext::new_sub_level;
168    use risingwave_hummock_sdk::level::{Levels, OverlappingLevel};
169    use risingwave_pb::hummock::LevelType;
170
171    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
172    use crate::hummock::compaction::picker::{
173        CompactionPicker, LocalPickerStatistic, TierCompactionPicker,
174    };
175    use crate::hummock::compaction::selector::tests::{
176        generate_l0_overlapping_sublevels, generate_table, push_table_level0_overlapping,
177    };
178    use crate::hummock::level_handler::LevelHandler;
179
180    #[test]
181    fn test_pick_whole_level_basic() {
182        let l0 = generate_l0_overlapping_sublevels(vec![
183            vec![
184                generate_table(1, 1, 100, 200, 1),
185                generate_table(2, 1, 150, 250, 1),
186            ],
187            vec![generate_table(3, 1, 10, 90, 1)],
188            vec![
189                generate_table(4, 1, 100, 200, 1),
190                generate_table(5, 1, 50, 150, 1),
191            ],
192            vec![
193                generate_table(6, 1, 100, 200, 1),
194                generate_table(7, 1, 50, 150, 1),
195            ],
196        ]);
197        let levels = Levels {
198            l0,
199            levels: vec![],
200            ..Default::default()
201        };
202        let levels_handler = vec![LevelHandler::new(0)];
203        let config = Arc::new(
204            CompactionConfigBuilder::new()
205                .level0_tier_compact_file_number(2)
206                .level0_sub_level_compact_level_count(2)
207                .level0_overlapping_sub_level_compact_level_count(4)
208                .build(),
209        );
210        let mut picker = TierCompactionPicker::new(config);
211        let mut local_stats = LocalPickerStatistic::default();
212        let ret = picker
213            .pick_compaction(&levels, &levels_handler, &mut local_stats)
214            .unwrap();
215        assert_eq!(ret.input_levels.len(), 4);
216        assert_eq!(
217            ret.input_levels
218                .iter()
219                .map(|i| i.table_infos.len())
220                .sum::<usize>(),
221            7
222        );
223
224        let empty_level = Levels {
225            l0: generate_l0_overlapping_sublevels(vec![]),
226            levels: vec![],
227            ..Default::default()
228        };
229        assert!(
230            picker
231                .pick_compaction(&empty_level, &levels_handler, &mut local_stats)
232                .is_none()
233        );
234    }
235
236    #[test]
237    fn test_pick_whole_level_skip_sublevel() {
238        let l0 = generate_l0_overlapping_sublevels(vec![
239            vec![
240                generate_table(4, 1, 10, 90, 1),
241                generate_table(5, 1, 200, 220, 1),
242            ],
243            vec![generate_table(6, 1, 1, 100, 1)],
244            vec![generate_table(7, 1, 1, 100, 1)],
245        ]);
246
247        let levels = Levels {
248            l0,
249            levels: vec![],
250            ..Default::default()
251        };
252        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
253        let config = Arc::new(
254            CompactionConfigBuilder::new()
255                .level0_tier_compact_file_number(2)
256                .sub_level_max_compaction_bytes(500)
257                .max_compaction_bytes(500000)
258                .level0_sub_level_compact_level_count(2)
259                .level0_overlapping_sub_level_compact_level_count(4)
260                .build(),
261        );
262
263        let mut local_stats = LocalPickerStatistic::default();
264        // sub-level 0 is excluded because it's nonoverlapping and violating
265        // sub_level_max_compaction_bytes.
266        let mut picker = TierCompactionPicker::new(config);
267        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
268        assert!(ret.is_none());
269    }
270
271    #[test]
272    fn test_write_amp_bug_skip() {
273        let l1 = new_sub_level(
274            1,
275            LevelType::Nonoverlapping,
276            vec![
277                generate_table(3, 1, 1, 50, 1),
278                generate_table(4, 1, 51, 100, 1),
279            ],
280        );
281        let l2 = new_sub_level(
282            2,
283            LevelType::Nonoverlapping,
284            vec![
285                generate_table(3, 1, 1, 50, 1),
286                generate_table(4, 1, 51, 200, 1),
287            ],
288        );
289        let levels = Levels {
290            l0: OverlappingLevel {
291                total_file_size: l1.total_file_size + l2.total_file_size,
292                uncompressed_file_size: l1.total_file_size + l2.total_file_size,
293                sub_levels: vec![l1, l2],
294            },
295            levels: vec![],
296            ..Default::default()
297        };
298        let config = Arc::new(
299            CompactionConfigBuilder::new()
300                .level0_tier_compact_file_number(4)
301                .sub_level_max_compaction_bytes(100)
302                .max_compaction_bytes(500000)
303                .build(),
304        );
305        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
306        let mut local_stats = LocalPickerStatistic::default();
307        let mut picker = TierCompactionPicker::new(config);
308        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
309        assert!(ret.is_none());
310    }
311
312    #[test]
313    fn test_pick_overlapping_sublevel_more_than_max_compact_file_number() {
314        let l0 = generate_l0_overlapping_sublevels(vec![vec![
315            generate_table(4, 1, 10, 90, 1),
316            generate_table(5, 1, 200, 220, 1),
317            generate_table(6, 1, 1, 100, 1),
318            generate_table(7, 1, 1, 100, 1),
319            generate_table(8, 1, 1, 100, 1),
320            generate_table(9, 1, 1, 100, 1),
321            generate_table(10, 1, 1, 100, 1),
322        ]]);
323        let mut levels = Levels {
324            l0,
325            levels: vec![],
326            ..Default::default()
327        };
328        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
329        let config = Arc::new(
330            CompactionConfigBuilder::new()
331                .level0_tier_compact_file_number(2)
332                .sub_level_max_compaction_bytes(100)
333                .max_compaction_bytes(500000)
334                .level0_sub_level_compact_level_count(2)
335                .level0_max_compact_file_number(3)
336                .build(),
337        );
338
339        let mut local_stats = LocalPickerStatistic::default();
340        let mut picker = TierCompactionPicker::new(config);
341        let ret = picker
342            .pick_compaction(&levels, &levels_handler, &mut local_stats)
343            .unwrap();
344        assert_eq!(1, ret.input_levels.len());
345
346        push_table_level0_overlapping(&mut levels, generate_table(11, 1, 1, 100, 1));
347        let ret = picker
348            .pick_compaction(&levels, &levels_handler, &mut local_stats)
349            .unwrap();
350        assert_eq!(1, ret.input_levels.len());
351    }
352}