Skip to main content

risingwave_meta/hummock/compaction/picker/
min_overlap_compaction_picker.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_hummock_sdk::level::{InputLevel, Levels};
18use risingwave_hummock_sdk::sstable_info::SstableInfo;
19use risingwave_pb::hummock::LevelType;
20
21use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
22use crate::hummock::compaction::overlap_strategy::OverlapStrategy;
23use crate::hummock::level_handler::LevelHandler;
24
25pub struct MinOverlappingPicker {
26    level: usize,
27    target_level: usize,
28    max_select_bytes: u64,
29    vnode_partition_count: u32,
30    overlap_strategy: Arc<dyn OverlapStrategy>,
31}
32
33impl MinOverlappingPicker {
34    pub fn new(
35        level: usize,
36        target_level: usize,
37        max_select_bytes: u64,
38        vnode_partition_count: u32,
39        overlap_strategy: Arc<dyn OverlapStrategy>,
40    ) -> MinOverlappingPicker {
41        MinOverlappingPicker {
42            level,
43            target_level,
44            max_select_bytes,
45            vnode_partition_count,
46            overlap_strategy,
47        }
48    }
49
50    pub fn pick_tables(
51        &self,
52        select_tables: &[SstableInfo],
53        target_tables: &[SstableInfo],
54        level_handlers: &[LevelHandler],
55    ) -> (Vec<SstableInfo>, Vec<SstableInfo>) {
56        let mut select_file_ranges = vec![];
57        for (idx, sst) in select_tables.iter().enumerate() {
58            if level_handlers[self.level].is_pending_compact(&sst.sst_id) {
59                continue;
60            }
61            let mut overlap_info = self.overlap_strategy.create_overlap_info();
62            overlap_info.update(&sst.key_range);
63            let overlap_files_range = overlap_info.check_multiple_overlap(target_tables);
64
65            if overlap_files_range.is_empty() {
66                return (vec![sst.clone()], vec![]);
67            }
68            select_file_ranges.push((idx, overlap_files_range));
69        }
70        select_file_ranges.retain(|(_, range)| {
71            let mut pending_compact = false;
72            for other in &target_tables[range.clone()] {
73                if level_handlers[self.target_level].is_pending_compact(&other.sst_id) {
74                    pending_compact = true;
75                    break;
76                }
77            }
78            !pending_compact
79        });
80
81        let mut min_score = u64::MAX;
82        let mut min_score_select_range = 0..0;
83        let mut min_score_target_range = 0..0;
84        let mut min_score_select_file_size = 0;
85        for left in 0..select_file_ranges.len() {
86            let mut select_file_size = 0;
87            let mut target_level_overlap_range = select_file_ranges[left].1.clone();
88            let mut total_file_size = 0;
89            for other in &target_tables[target_level_overlap_range.clone()] {
90                total_file_size += other.sst_size;
91            }
92            let start_idx = select_file_ranges[left].0;
93            let mut end_idx = start_idx + 1;
94            for (idx, range) in select_file_ranges.iter().skip(left) {
95                if select_file_size > self.max_select_bytes
96                    || *idx > end_idx
97                    || range.start >= target_level_overlap_range.end
98                {
99                    break;
100                }
101                select_file_size += select_tables[*idx].sst_size;
102                if range.end > target_level_overlap_range.end {
103                    for other in &target_tables[target_level_overlap_range.end..range.end] {
104                        total_file_size += other.sst_size;
105                    }
106                    target_level_overlap_range.end = range.end;
107                }
108                let score = (total_file_size * 100)
109                    .checked_div(select_file_size)
110                    .unwrap_or(total_file_size);
111                end_idx = idx + 1;
112                if score < min_score
113                    || (score == min_score && select_file_size < min_score_select_file_size)
114                {
115                    min_score = score;
116                    min_score_select_range = start_idx..end_idx;
117                    min_score_target_range = target_level_overlap_range.clone();
118                    min_score_select_file_size = select_file_size;
119                }
120            }
121        }
122        if min_score == u64::MAX {
123            return (vec![], vec![]);
124        }
125        let select_input_ssts = select_tables[min_score_select_range].to_vec();
126        let target_input_ssts = target_tables[min_score_target_range].to_vec();
127        (select_input_ssts, target_input_ssts)
128    }
129}
130
131impl CompactionPicker for MinOverlappingPicker {
132    fn pick_compaction(
133        &mut self,
134        levels: &Levels,
135        level_handlers: &[LevelHandler],
136        stats: &mut LocalPickerStatistic,
137    ) -> Option<CompactionInput> {
138        assert!(self.level > 0);
139        let (select_input_ssts, target_input_ssts) = self.pick_tables(
140            &levels.get_level(self.level).table_infos,
141            &levels.get_level(self.target_level).table_infos,
142            level_handlers,
143        );
144        if select_input_ssts.is_empty() {
145            stats.skip_by_pending_files += 1;
146            return None;
147        }
148        Some(CompactionInput {
149            select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
150            target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
151            total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
152            input_levels: vec![
153                InputLevel {
154                    level_idx: self.level as u32,
155                    level_type: LevelType::Nonoverlapping,
156                    table_infos: select_input_ssts,
157                },
158                InputLevel {
159                    level_idx: self.target_level as u32,
160                    level_type: LevelType::Nonoverlapping,
161                    table_infos: target_input_ssts,
162                },
163            ],
164            target_level: self.target_level,
165            vnode_partition_count: self.vnode_partition_count,
166            ..Default::default()
167        })
168    }
169}
170
171#[cfg(test)]
172pub mod tests {
173    use risingwave_hummock_sdk::level::Level;
174
175    use super::*;
176    use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
177    use crate::hummock::compaction::selector::tests::{
178        generate_l0_nonoverlapping_sublevels, generate_table,
179    };
180
181    #[test]
182    fn test_compact_l1() {
183        let mut picker =
184            MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
185        let levels = vec![
186            Level {
187                level_idx: 1,
188                level_type: LevelType::Nonoverlapping,
189                table_infos: vec![
190                    generate_table(0, 1, 0, 100, 1),
191                    generate_table(1, 1, 101, 200, 1),
192                    generate_table(2, 1, 222, 300, 1),
193                ],
194                ..Default::default()
195            },
196            Level {
197                level_idx: 2,
198                level_type: LevelType::Nonoverlapping,
199                table_infos: vec![
200                    generate_table(4, 1, 0, 100, 1),
201                    generate_table(5, 1, 101, 150, 1),
202                    generate_table(6, 1, 151, 201, 1),
203                    generate_table(7, 1, 501, 800, 1),
204                    generate_table(8, 2, 301, 400, 1),
205                ],
206                ..Default::default()
207            },
208        ];
209        let levels = Levels {
210            levels,
211            l0: generate_l0_nonoverlapping_sublevels(vec![]),
212            ..Default::default()
213        };
214        let mut level_handlers = vec![
215            LevelHandler::new(0),
216            LevelHandler::new(1),
217            LevelHandler::new(2),
218        ];
219
220        // pick a non-overlapping files. It means that this file could be trivial move to next
221        // level.
222        let mut local_stats = LocalPickerStatistic::default();
223        let ret = picker
224            .pick_compaction(&levels, &level_handlers, &mut local_stats)
225            .unwrap();
226        assert_eq!(ret.input_levels[0].level_idx, 1);
227        assert_eq!(ret.target_level, 2);
228        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
229        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 2);
230        assert_eq!(ret.input_levels[1].table_infos.len(), 0);
231        ret.add_pending_task(0, &mut level_handlers);
232
233        let ret = picker
234            .pick_compaction(&levels, &level_handlers, &mut local_stats)
235            .unwrap();
236        assert_eq!(ret.input_levels[0].level_idx, 1);
237        assert_eq!(ret.target_level, 2);
238        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
239        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
240        assert_eq!(ret.input_levels[1].table_infos.len(), 1);
241        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
242        ret.add_pending_task(1, &mut level_handlers);
243
244        let ret = picker
245            .pick_compaction(&levels, &level_handlers, &mut local_stats)
246            .unwrap();
247        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
248        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 1);
249        assert_eq!(ret.input_levels[1].table_infos.len(), 2);
250        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
251    }
252
253    #[test]
254    fn test_expand_l1_files() {
255        let mut picker =
256            MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
257        let levels = vec![
258            Level {
259                level_idx: 1,
260                level_type: LevelType::Nonoverlapping,
261                table_infos: vec![
262                    generate_table(0, 1, 50, 99, 2),
263                    generate_table(1, 1, 100, 149, 2),
264                    generate_table(2, 1, 150, 249, 2),
265                ],
266                ..Default::default()
267            },
268            Level {
269                level_idx: 2,
270                level_type: LevelType::Nonoverlapping,
271                table_infos: vec![
272                    generate_table(4, 1, 50, 199, 1),
273                    generate_table(5, 1, 200, 399, 1),
274                ],
275                ..Default::default()
276            },
277        ];
278        let levels = Levels {
279            levels,
280            l0: generate_l0_nonoverlapping_sublevels(vec![]),
281            ..Default::default()
282        };
283        let levels_handler = vec![
284            LevelHandler::new(0),
285            LevelHandler::new(1),
286            LevelHandler::new(2),
287        ];
288
289        // pick a non-overlapping files. It means that this file could be trivial move to next
290        // level.
291        let ret = picker
292            .pick_compaction(
293                &levels,
294                &levels_handler,
295                &mut LocalPickerStatistic::default(),
296            )
297            .unwrap();
298        assert_eq!(ret.input_levels[0].level_idx, 1);
299        assert_eq!(ret.input_levels[1].level_idx, 2);
300
301        assert_eq!(ret.input_levels[0].table_infos.len(), 2);
302        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
303        assert_eq!(ret.input_levels[0].table_infos[1].sst_id, 1);
304
305        assert_eq!(ret.input_levels[1].table_infos.len(), 1);
306        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
307    }
308
309    #[test]
310    fn test_trivial_move_bug() {
311        let levels = [
312            Level {
313                level_idx: 1,
314                level_type: LevelType::Nonoverlapping,
315                table_infos: vec![generate_table(0, 1, 400, 500, 2)],
316                total_file_size: 100,
317                ..Default::default()
318            },
319            Level {
320                level_idx: 2,
321                level_type: LevelType::Nonoverlapping,
322                table_infos: vec![
323                    generate_table(1, 1, 100, 200, 1),
324                    generate_table(2, 1, 600, 700, 1),
325                ],
326                total_file_size: 200,
327                ..Default::default()
328            },
329            Level {
330                level_idx: 3,
331                level_type: LevelType::Nonoverlapping,
332                table_infos: vec![
333                    generate_table(3, 1, 100, 300, 2),
334                    generate_table(4, 1, 600, 800, 1),
335                ],
336                total_file_size: 400,
337                ..Default::default()
338            },
339        ];
340
341        let levels_handlers = vec![
342            LevelHandler::new(0),
343            LevelHandler::new(1),
344            LevelHandler::new(2),
345            LevelHandler::new(3),
346        ];
347        // no limit
348        let picker =
349            MinOverlappingPicker::new(2, 3, 1000, 0, Arc::new(RangeOverlapStrategy::default()));
350        let (select_files, target_files) = picker.pick_tables(
351            &levels[1].table_infos,
352            &levels[2].table_infos,
353            &levels_handlers,
354        );
355        let overlap_strategy = Arc::new(RangeOverlapStrategy::default());
356        let mut overlap_info = overlap_strategy.create_overlap_info();
357        for sst in &select_files {
358            overlap_info.update(&sst.key_range);
359        }
360        let range = overlap_info.check_multiple_overlap(&levels[0].table_infos);
361        assert!(range.is_empty());
362        assert_eq!(select_files.len(), 1);
363        assert_eq!(target_files.len(), 1);
364    }
365}