Skip to main content

risingwave_meta/hummock/compaction/picker/
tier_compaction_picker.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_hummock_sdk::level::{InputLevel, Levels, OverlappingLevel};
18use risingwave_pb::hummock::{CompactionConfig, LevelType};
19
20use super::{
21    CompactionInput, CompactionPicker, CompactionTaskValidator, LocalPickerStatistic,
22    ValidationRuleType,
23};
24use crate::hummock::level_handler::LevelHandler;
25
26pub struct TierCompactionPicker {
27    config: Arc<CompactionConfig>,
28    compaction_task_validator: Arc<CompactionTaskValidator>,
29}
30
31impl TierCompactionPicker {
32    #[cfg(test)]
33    pub fn new(config: Arc<CompactionConfig>) -> TierCompactionPicker {
34        TierCompactionPicker {
35            compaction_task_validator: Arc::new(CompactionTaskValidator::new(config.clone())),
36            config,
37        }
38    }
39
40    pub fn new_with_validator(
41        config: Arc<CompactionConfig>,
42        compaction_task_validator: Arc<CompactionTaskValidator>,
43    ) -> TierCompactionPicker {
44        TierCompactionPicker {
45            config,
46            compaction_task_validator,
47        }
48    }
49
50    fn pick_overlapping_level(
51        &self,
52        l0: &OverlappingLevel,
53        level_handler: &LevelHandler,
54        mut vnode_partition_count: u32,
55        stats: &mut LocalPickerStatistic,
56    ) -> Option<CompactionInput> {
57        for (idx, level) in l0.sub_levels.iter().enumerate() {
58            if level.level_type != LevelType::Overlapping {
59                continue;
60            }
61
62            if level.table_infos.is_empty() {
63                continue;
64            }
65
66            if level_handler.is_level_pending_compact(level) {
67                continue;
68            }
69
70            let input_level = InputLevel {
71                level_idx: 0,
72                level_type: level.level_type,
73                table_infos: level.table_infos.clone(),
74            };
75
76            let mut select_level_inputs = vec![input_level];
77
78            // We assume that the maximum size of each sub_level is sub_level_max_compaction_bytes,
79            // so the design here wants to merge multiple overlapping-levels in one compaction
80            let max_compaction_bytes = std::cmp::min(
81                self.config.max_compaction_bytes,
82                self.config.sub_level_max_compaction_bytes
83                    * self.config.level0_overlapping_sub_level_compact_level_count as u64,
84            );
85
86            let mut compaction_bytes = level.total_file_size;
87            let mut compact_file_count = level.table_infos.len() as u64;
88            // Limit sstable file count to avoid using too much memory.
89            let overlapping_max_compact_file_numer = self.config.level0_max_compact_file_number;
90
91            for other in &l0.sub_levels[idx + 1..] {
92                if compaction_bytes > max_compaction_bytes {
93                    break;
94                }
95
96                if compact_file_count > overlapping_max_compact_file_numer {
97                    break;
98                }
99
100                if level_handler.is_level_pending_compact(other) {
101                    break;
102                }
103
104                compaction_bytes += other.total_file_size;
105                compact_file_count += other.table_infos.len() as u64;
106                select_level_inputs.push(InputLevel {
107                    level_idx: 0,
108                    level_type: other.level_type,
109                    table_infos: other.table_infos.clone(),
110                });
111            }
112
113            select_level_inputs.reverse();
114            if compaction_bytes < self.config.sub_level_max_compaction_bytes / 2 {
115                vnode_partition_count = 0;
116            }
117
118            let result = CompactionInput {
119                input_levels: select_level_inputs,
120                target_level: 0,
121                target_sub_level_id: level.sub_level_id,
122                select_input_size: compaction_bytes,
123                target_input_size: 0,
124                total_file_count: compact_file_count,
125                vnode_partition_count,
126                skip_target_range_conflict_check: true,
127            };
128
129            if !self.compaction_task_validator.valid_compact_task(
130                &result,
131                ValidationRuleType::Tier,
132                stats,
133            ) {
134                continue;
135            }
136
137            return Some(result);
138        }
139        None
140    }
141}
142
143impl CompactionPicker for TierCompactionPicker {
144    fn pick_compaction(
145        &mut self,
146        levels: &Levels,
147        level_handlers: &[LevelHandler],
148        stats: &mut LocalPickerStatistic,
149    ) -> Option<CompactionInput> {
150        let l0 = &levels.l0;
151        if l0.sub_levels.is_empty() {
152            return None;
153        }
154
155        self.pick_overlapping_level(
156            l0,
157            &level_handlers[0],
158            self.config.split_weight_by_vnode,
159            stats,
160        )
161    }
162}
163
164#[cfg(test)]
165pub mod tests {
166    use std::sync::Arc;
167
168    use risingwave_hummock_sdk::compaction_group::hummock_version_ext::new_sub_level;
169    use risingwave_hummock_sdk::level::{Levels, OverlappingLevel};
170    use risingwave_pb::hummock::LevelType;
171
172    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
173    use crate::hummock::compaction::picker::{
174        CompactionPicker, LocalPickerStatistic, TierCompactionPicker,
175    };
176    use crate::hummock::compaction::selector::tests::{
177        generate_l0_overlapping_sublevels, generate_table, push_table_level0_overlapping,
178    };
179    use crate::hummock::level_handler::LevelHandler;
180
181    #[test]
182    fn test_pick_whole_level_basic() {
183        let l0 = generate_l0_overlapping_sublevels(vec![
184            vec![
185                generate_table(1, 1, 100, 200, 1),
186                generate_table(2, 1, 150, 250, 1),
187            ],
188            vec![generate_table(3, 1, 10, 90, 1)],
189            vec![
190                generate_table(4, 1, 100, 200, 1),
191                generate_table(5, 1, 50, 150, 1),
192            ],
193            vec![
194                generate_table(6, 1, 100, 200, 1),
195                generate_table(7, 1, 50, 150, 1),
196            ],
197        ]);
198        let levels = Levels {
199            l0,
200            levels: vec![],
201            ..Default::default()
202        };
203        let levels_handler = vec![LevelHandler::new(0)];
204        let config = Arc::new(
205            CompactionConfigBuilder::new()
206                .level0_tier_compact_file_number(2)
207                .level0_sub_level_compact_level_count(2)
208                .level0_overlapping_sub_level_compact_level_count(4)
209                .build(),
210        );
211        let mut picker = TierCompactionPicker::new(config);
212        let mut local_stats = LocalPickerStatistic::default();
213        let ret = picker
214            .pick_compaction(&levels, &levels_handler, &mut local_stats)
215            .unwrap();
216        assert_eq!(ret.input_levels.len(), 4);
217        assert_eq!(
218            ret.input_levels
219                .iter()
220                .map(|i| i.table_infos.len())
221                .sum::<usize>(),
222            7
223        );
224
225        let empty_level = Levels {
226            l0: generate_l0_overlapping_sublevels(vec![]),
227            levels: vec![],
228            ..Default::default()
229        };
230        assert!(
231            picker
232                .pick_compaction(&empty_level, &levels_handler, &mut local_stats)
233                .is_none()
234        );
235    }
236
237    #[test]
238    fn test_pick_whole_level_skip_sublevel() {
239        let l0 = generate_l0_overlapping_sublevels(vec![
240            vec![
241                generate_table(4, 1, 10, 90, 1),
242                generate_table(5, 1, 200, 220, 1),
243            ],
244            vec![generate_table(6, 1, 1, 100, 1)],
245            vec![generate_table(7, 1, 1, 100, 1)],
246        ]);
247
248        let levels = Levels {
249            l0,
250            levels: vec![],
251            ..Default::default()
252        };
253        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
254        let config = Arc::new(
255            CompactionConfigBuilder::new()
256                .level0_tier_compact_file_number(2)
257                .sub_level_max_compaction_bytes(500)
258                .max_compaction_bytes(500000)
259                .level0_sub_level_compact_level_count(2)
260                .level0_overlapping_sub_level_compact_level_count(4)
261                .build(),
262        );
263
264        let mut local_stats = LocalPickerStatistic::default();
265        // sub-level 0 is excluded because it's nonoverlapping and violating
266        // sub_level_max_compaction_bytes.
267        let mut picker = TierCompactionPicker::new(config);
268        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
269        assert!(ret.is_none());
270    }
271
272    #[test]
273    fn test_write_amp_bug_skip() {
274        let l1 = new_sub_level(
275            1,
276            LevelType::Nonoverlapping,
277            vec![
278                generate_table(3, 1, 1, 50, 1),
279                generate_table(4, 1, 51, 100, 1),
280            ],
281        );
282        let l2 = new_sub_level(
283            2,
284            LevelType::Nonoverlapping,
285            vec![
286                generate_table(3, 1, 1, 50, 1),
287                generate_table(4, 1, 51, 200, 1),
288            ],
289        );
290        let levels = Levels {
291            l0: OverlappingLevel {
292                total_file_size: l1.total_file_size + l2.total_file_size,
293                uncompressed_file_size: l1.total_file_size + l2.total_file_size,
294                sub_levels: vec![l1, l2],
295            },
296            levels: vec![],
297            ..Default::default()
298        };
299        let config = Arc::new(
300            CompactionConfigBuilder::new()
301                .level0_tier_compact_file_number(4)
302                .sub_level_max_compaction_bytes(100)
303                .max_compaction_bytes(500000)
304                .build(),
305        );
306        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
307        let mut local_stats = LocalPickerStatistic::default();
308        let mut picker = TierCompactionPicker::new(config);
309        let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
310        assert!(ret.is_none());
311    }
312
313    #[test]
314    fn test_pick_overlapping_sublevel_more_than_max_compact_file_number() {
315        let l0 = generate_l0_overlapping_sublevels(vec![vec![
316            generate_table(4, 1, 10, 90, 1),
317            generate_table(5, 1, 200, 220, 1),
318            generate_table(6, 1, 1, 100, 1),
319            generate_table(7, 1, 1, 100, 1),
320            generate_table(8, 1, 1, 100, 1),
321            generate_table(9, 1, 1, 100, 1),
322            generate_table(10, 1, 1, 100, 1),
323        ]]);
324        let mut levels = Levels {
325            l0,
326            levels: vec![],
327            ..Default::default()
328        };
329        let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];
330        let config = Arc::new(
331            CompactionConfigBuilder::new()
332                .level0_tier_compact_file_number(2)
333                .sub_level_max_compaction_bytes(100)
334                .max_compaction_bytes(500000)
335                .level0_sub_level_compact_level_count(2)
336                .level0_max_compact_file_number(3)
337                .build(),
338        );
339
340        let mut local_stats = LocalPickerStatistic::default();
341        let mut picker = TierCompactionPicker::new(config);
342        let ret = picker
343            .pick_compaction(&levels, &levels_handler, &mut local_stats)
344            .unwrap();
345        assert_eq!(1, ret.input_levels.len());
346
347        push_table_level0_overlapping(&mut levels, generate_table(11, 1, 1, 100, 1));
348        let ret = picker
349            .pick_compaction(&levels, &levels_handler, &mut local_stats)
350            .unwrap();
351        assert_eq!(1, ret.input_levels.len());
352    }
353}