risingwave_meta/hummock/compaction/picker/
min_overlap_compaction_picker.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_hummock_sdk::level::{InputLevel, Levels};
18use risingwave_hummock_sdk::sstable_info::SstableInfo;
19use risingwave_pb::hummock::LevelType;
20
21use super::{CompactionInput, CompactionPicker, LocalPickerStatistic};
22use crate::hummock::compaction::overlap_strategy::OverlapStrategy;
23use crate::hummock::level_handler::LevelHandler;
24
25pub struct MinOverlappingPicker {
26    level: usize,
27    target_level: usize,
28    max_select_bytes: u64,
29    vnode_partition_count: u32,
30    overlap_strategy: Arc<dyn OverlapStrategy>,
31}
32
33impl MinOverlappingPicker {
34    pub fn new(
35        level: usize,
36        target_level: usize,
37        max_select_bytes: u64,
38        vnode_partition_count: u32,
39        overlap_strategy: Arc<dyn OverlapStrategy>,
40    ) -> MinOverlappingPicker {
41        MinOverlappingPicker {
42            level,
43            target_level,
44            max_select_bytes,
45            vnode_partition_count,
46            overlap_strategy,
47        }
48    }
49
50    pub fn pick_tables(
51        &self,
52        select_tables: &[SstableInfo],
53        target_tables: &[SstableInfo],
54        level_handlers: &[LevelHandler],
55    ) -> (Vec<SstableInfo>, Vec<SstableInfo>) {
56        let mut select_file_ranges = vec![];
57        for (idx, sst) in select_tables.iter().enumerate() {
58            if level_handlers[self.level].is_pending_compact(&sst.sst_id) {
59                continue;
60            }
61            let mut overlap_info = self.overlap_strategy.create_overlap_info();
62            overlap_info.update(&sst.key_range);
63            let overlap_files_range = overlap_info.check_multiple_overlap(target_tables);
64
65            if overlap_files_range.is_empty() {
66                return (vec![sst.clone()], vec![]);
67            }
68            select_file_ranges.push((idx, overlap_files_range));
69        }
70        select_file_ranges.retain(|(_, range)| {
71            let mut pending_compact = false;
72            for other in &target_tables[range.clone()] {
73                if level_handlers[self.target_level].is_pending_compact(&other.sst_id) {
74                    pending_compact = true;
75                    break;
76                }
77            }
78            !pending_compact
79        });
80
81        let mut min_score = u64::MAX;
82        let mut min_score_select_range = 0..0;
83        let mut min_score_target_range = 0..0;
84        let mut min_score_select_file_size = 0;
85        for left in 0..select_file_ranges.len() {
86            let mut select_file_size = 0;
87            let mut target_level_overlap_range = select_file_ranges[left].1.clone();
88            let mut total_file_size = 0;
89            for other in &target_tables[target_level_overlap_range.clone()] {
90                total_file_size += other.sst_size;
91            }
92            let start_idx = select_file_ranges[left].0;
93            let mut end_idx = start_idx + 1;
94            for (idx, range) in select_file_ranges.iter().skip(left) {
95                if select_file_size > self.max_select_bytes
96                    || *idx > end_idx
97                    || range.start >= target_level_overlap_range.end
98                {
99                    break;
100                }
101                select_file_size += select_tables[*idx].sst_size;
102                if range.end > target_level_overlap_range.end {
103                    for other in &target_tables[target_level_overlap_range.end..range.end] {
104                        total_file_size += other.sst_size;
105                    }
106                    target_level_overlap_range.end = range.end;
107                }
108                let score = if select_file_size == 0 {
109                    total_file_size
110                } else {
111                    total_file_size * 100 / select_file_size
112                };
113                end_idx = idx + 1;
114                if score < min_score
115                    || (score == min_score && select_file_size < min_score_select_file_size)
116                {
117                    min_score = score;
118                    min_score_select_range = start_idx..end_idx;
119                    min_score_target_range = target_level_overlap_range.clone();
120                    min_score_select_file_size = select_file_size;
121                }
122            }
123        }
124        if min_score == u64::MAX {
125            return (vec![], vec![]);
126        }
127        let select_input_ssts = select_tables[min_score_select_range].to_vec();
128        let target_input_ssts = target_tables[min_score_target_range].to_vec();
129        (select_input_ssts, target_input_ssts)
130    }
131}
132
133impl CompactionPicker for MinOverlappingPicker {
134    fn pick_compaction(
135        &mut self,
136        levels: &Levels,
137        level_handlers: &[LevelHandler],
138        stats: &mut LocalPickerStatistic,
139    ) -> Option<CompactionInput> {
140        assert!(self.level > 0);
141        let (select_input_ssts, target_input_ssts) = self.pick_tables(
142            &levels.get_level(self.level).table_infos,
143            &levels.get_level(self.target_level).table_infos,
144            level_handlers,
145        );
146        if select_input_ssts.is_empty() {
147            stats.skip_by_pending_files += 1;
148            return None;
149        }
150        Some(CompactionInput {
151            select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
152            target_input_size: target_input_ssts.iter().map(|sst| sst.sst_size).sum(),
153            total_file_count: (select_input_ssts.len() + target_input_ssts.len()) as u64,
154            input_levels: vec![
155                InputLevel {
156                    level_idx: self.level as u32,
157                    level_type: LevelType::Nonoverlapping,
158                    table_infos: select_input_ssts,
159                },
160                InputLevel {
161                    level_idx: self.target_level as u32,
162                    level_type: LevelType::Nonoverlapping,
163                    table_infos: target_input_ssts,
164                },
165            ],
166            target_level: self.target_level,
167            vnode_partition_count: self.vnode_partition_count,
168            ..Default::default()
169        })
170    }
171}
172
173#[cfg(test)]
174pub mod tests {
175    use risingwave_hummock_sdk::level::Level;
176
177    use super::*;
178    use crate::hummock::compaction::overlap_strategy::RangeOverlapStrategy;
179    use crate::hummock::compaction::selector::tests::{
180        generate_l0_nonoverlapping_sublevels, generate_table,
181    };
182
183    #[test]
184    fn test_compact_l1() {
185        let mut picker =
186            MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
187        let levels = vec![
188            Level {
189                level_idx: 1,
190                level_type: LevelType::Nonoverlapping,
191                table_infos: vec![
192                    generate_table(0, 1, 0, 100, 1),
193                    generate_table(1, 1, 101, 200, 1),
194                    generate_table(2, 1, 222, 300, 1),
195                ],
196                ..Default::default()
197            },
198            Level {
199                level_idx: 2,
200                level_type: LevelType::Nonoverlapping,
201                table_infos: vec![
202                    generate_table(4, 1, 0, 100, 1),
203                    generate_table(5, 1, 101, 150, 1),
204                    generate_table(6, 1, 151, 201, 1),
205                    generate_table(7, 1, 501, 800, 1),
206                    generate_table(8, 2, 301, 400, 1),
207                ],
208                ..Default::default()
209            },
210        ];
211        let levels = Levels {
212            levels,
213            l0: generate_l0_nonoverlapping_sublevels(vec![]),
214            ..Default::default()
215        };
216        let mut level_handlers = vec![
217            LevelHandler::new(0),
218            LevelHandler::new(1),
219            LevelHandler::new(2),
220        ];
221
222        // pick a non-overlapping files. It means that this file could be trivial move to next
223        // level.
224        let mut local_stats = LocalPickerStatistic::default();
225        let ret = picker
226            .pick_compaction(&levels, &level_handlers, &mut local_stats)
227            .unwrap();
228        assert_eq!(ret.input_levels[0].level_idx, 1);
229        assert_eq!(ret.target_level, 2);
230        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
231        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 2);
232        assert_eq!(ret.input_levels[1].table_infos.len(), 0);
233        ret.add_pending_task(0, &mut level_handlers);
234
235        let ret = picker
236            .pick_compaction(&levels, &level_handlers, &mut local_stats)
237            .unwrap();
238        assert_eq!(ret.input_levels[0].level_idx, 1);
239        assert_eq!(ret.target_level, 2);
240        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
241        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
242        assert_eq!(ret.input_levels[1].table_infos.len(), 1);
243        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
244        ret.add_pending_task(1, &mut level_handlers);
245
246        let ret = picker
247            .pick_compaction(&levels, &level_handlers, &mut local_stats)
248            .unwrap();
249        assert_eq!(ret.input_levels[0].table_infos.len(), 1);
250        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 1);
251        assert_eq!(ret.input_levels[1].table_infos.len(), 2);
252        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
253    }
254
255    #[test]
256    fn test_expand_l1_files() {
257        let mut picker =
258            MinOverlappingPicker::new(1, 2, 10000, 0, Arc::new(RangeOverlapStrategy::default()));
259        let levels = vec![
260            Level {
261                level_idx: 1,
262                level_type: LevelType::Nonoverlapping,
263                table_infos: vec![
264                    generate_table(0, 1, 50, 99, 2),
265                    generate_table(1, 1, 100, 149, 2),
266                    generate_table(2, 1, 150, 249, 2),
267                ],
268                ..Default::default()
269            },
270            Level {
271                level_idx: 2,
272                level_type: LevelType::Nonoverlapping,
273                table_infos: vec![
274                    generate_table(4, 1, 50, 199, 1),
275                    generate_table(5, 1, 200, 399, 1),
276                ],
277                ..Default::default()
278            },
279        ];
280        let levels = Levels {
281            levels,
282            l0: generate_l0_nonoverlapping_sublevels(vec![]),
283            ..Default::default()
284        };
285        let levels_handler = vec![
286            LevelHandler::new(0),
287            LevelHandler::new(1),
288            LevelHandler::new(2),
289        ];
290
291        // pick a non-overlapping files. It means that this file could be trivial move to next
292        // level.
293        let ret = picker
294            .pick_compaction(
295                &levels,
296                &levels_handler,
297                &mut LocalPickerStatistic::default(),
298            )
299            .unwrap();
300        assert_eq!(ret.input_levels[0].level_idx, 1);
301        assert_eq!(ret.input_levels[1].level_idx, 2);
302
303        assert_eq!(ret.input_levels[0].table_infos.len(), 2);
304        assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 0);
305        assert_eq!(ret.input_levels[0].table_infos[1].sst_id, 1);
306
307        assert_eq!(ret.input_levels[1].table_infos.len(), 1);
308        assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 4);
309    }
310
311    #[test]
312    fn test_trivial_move_bug() {
313        let levels = [
314            Level {
315                level_idx: 1,
316                level_type: LevelType::Nonoverlapping,
317                table_infos: vec![generate_table(0, 1, 400, 500, 2)],
318                total_file_size: 100,
319                ..Default::default()
320            },
321            Level {
322                level_idx: 2,
323                level_type: LevelType::Nonoverlapping,
324                table_infos: vec![
325                    generate_table(1, 1, 100, 200, 1),
326                    generate_table(2, 1, 600, 700, 1),
327                ],
328                total_file_size: 200,
329                ..Default::default()
330            },
331            Level {
332                level_idx: 3,
333                level_type: LevelType::Nonoverlapping,
334                table_infos: vec![
335                    generate_table(3, 1, 100, 300, 2),
336                    generate_table(4, 1, 600, 800, 1),
337                ],
338                total_file_size: 400,
339                ..Default::default()
340            },
341        ];
342
343        let levels_handlers = vec![
344            LevelHandler::new(0),
345            LevelHandler::new(1),
346            LevelHandler::new(2),
347            LevelHandler::new(3),
348        ];
349        // no limit
350        let picker =
351            MinOverlappingPicker::new(2, 3, 1000, 0, Arc::new(RangeOverlapStrategy::default()));
352        let (select_files, target_files) = picker.pick_tables(
353            &levels[1].table_infos,
354            &levels[2].table_infos,
355            &levels_handlers,
356        );
357        let overlap_strategy = Arc::new(RangeOverlapStrategy::default());
358        let mut overlap_info = overlap_strategy.create_overlap_info();
359        for sst in &select_files {
360            overlap_info.update(&sst.key_range);
361        }
362        let range = overlap_info.check_multiple_overlap(&levels[0].table_infos);
363        assert!(range.is_empty());
364        assert_eq!(select_files.len(), 1);
365        assert_eq!(target_files.len(), 1);
366    }
367}