Skip to main content

risingwave_meta/hummock/compaction/picker/
space_reclaim_compaction_picker.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::catalog::TableId;
18use risingwave_hummock_sdk::level::{InputLevel, Levels};
19use risingwave_hummock_sdk::sstable_info::SstableInfo;
20
21use super::CompactionInput;
22use crate::hummock::level_handler::LevelHandler;
23
24// The execution model of SpaceReclaimCompactionPicker scans through the last level of files by
25// key_range and selects the appropriate files to generate compaction
26pub struct SpaceReclaimCompactionPicker {
27    // config
28    pub _max_space_reclaim_bytes: u64,
29
30    // for filter
31    pub all_table_ids: HashSet<TableId>,
32}
33
34// According to the execution model of SpaceReclaimCompactionPicker, SpaceReclaimPickerState is
35// designed to record the state of each round of scanning
36#[derive(Default)]
37pub struct SpaceReclaimPickerState {
38    pub last_level: usize,
39}
40
41impl SpaceReclaimCompactionPicker {
42    pub fn new(max_space_reclaim_bytes: u64, all_table_ids: HashSet<TableId>) -> Self {
43        Self {
44            _max_space_reclaim_bytes: max_space_reclaim_bytes,
45            all_table_ids,
46        }
47    }
48
49    fn exist_table_count(&self, sst: &SstableInfo) -> usize {
50        // it means all the table exist , so we not need to pick this sst
51        sst.table_ids
52            .iter()
53            .filter(|id| self.all_table_ids.contains(*id))
54            .count()
55    }
56}
57
58impl SpaceReclaimCompactionPicker {
59    pub fn pick_compaction(
60        &mut self,
61        levels: &Levels,
62        level_handlers: &[LevelHandler],
63        state: &mut SpaceReclaimPickerState,
64    ) -> Option<CompactionInput> {
65        assert!(!levels.levels.is_empty());
66        let mut select_input_ssts = vec![];
67
68        if state.last_level == 0 {
69            let l0 = &levels.l0;
70            // only pick trivial reclaim sstables because this kind of task could be optimized and do not need send to compactor.
71            for level in &l0.sub_levels {
72                for sst in &level.table_infos {
73                    let exist_count = self.exist_table_count(sst);
74                    if exist_count == sst.table_ids.len()
75                        || level_handlers[0].is_pending_compact(&sst.sst_id)
76                    {
77                        if !select_input_ssts.is_empty() {
78                            break;
79                        }
80                    } else if exist_count == 0 {
81                        select_input_ssts.push(sst.clone());
82                    } else if !select_input_ssts.is_empty() {
83                        break;
84                    }
85                }
86                if !select_input_ssts.is_empty() {
87                    return Some(CompactionInput {
88                        select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
89                        total_file_count: select_input_ssts.len() as u64,
90                        input_levels: vec![
91                            InputLevel {
92                                level_idx: level.level_idx,
93                                level_type: level.level_type,
94                                table_infos: select_input_ssts,
95                            },
96                            InputLevel {
97                                level_idx: 0,
98                                level_type: level.level_type,
99                                table_infos: vec![],
100                            },
101                        ],
102                        target_level: level.level_idx as usize,
103                        target_sub_level_id: level.sub_level_id,
104                        ..Default::default()
105                    });
106                }
107            }
108            state.last_level = 1;
109        }
110        while state.last_level <= levels.levels.len() {
111            let mut is_trivial_task = true;
112            for sst in &levels.levels[state.last_level - 1].table_infos {
113                let exist_count = self.exist_table_count(sst);
114                let need_reclaim = exist_count < sst.table_ids.len();
115                let is_trivial_sst = exist_count == 0;
116                if !need_reclaim || level_handlers[state.last_level].is_pending_compact(&sst.sst_id)
117                {
118                    if !select_input_ssts.is_empty() {
119                        // Our goal is to pick as many complete layers of data as possible and keep
120                        // the picked files contiguous to avoid overlapping
121                        // key_ranges, so the strategy is to pick as many
122                        // contiguous files as possible (at least one)
123                        break;
124                    }
125                    continue;
126                }
127
128                if !is_trivial_sst {
129                    if !select_input_ssts.is_empty() {
130                        break;
131                    }
132                    is_trivial_task = false;
133                }
134
135                select_input_ssts.push(sst.clone());
136                if !is_trivial_task {
137                    break;
138                }
139            }
140
141            // turn to next_round
142            if !select_input_ssts.is_empty() {
143                return Some(CompactionInput {
144                    select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
145                    total_file_count: select_input_ssts.len() as u64,
146                    input_levels: vec![
147                        InputLevel {
148                            level_idx: state.last_level as u32,
149                            level_type: levels.levels[state.last_level - 1].level_type,
150                            table_infos: select_input_ssts,
151                        },
152                        InputLevel {
153                            level_idx: state.last_level as u32,
154                            level_type: levels.levels[state.last_level - 1].level_type,
155                            table_infos: vec![],
156                        },
157                    ],
158                    target_level: state.last_level,
159                    ..Default::default()
160                });
161            }
162            state.last_level += 1;
163        }
164        state.last_level = 0;
165        None
166    }
167}
168
169#[cfg(test)]
170mod test {
171
172    use std::collections::{BTreeSet, HashMap};
173    use std::sync::Arc;
174
175    use itertools::Itertools;
176    use risingwave_common::catalog::TableId;
177    use risingwave_hummock_sdk::key_range::KeyRange;
178    use risingwave_hummock_sdk::level::Level;
179    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
180    use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
181    pub use risingwave_pb::hummock::LevelType;
182    use risingwave_pb::hummock::compact_task;
183
184    use super::*;
185    use crate::hummock::compaction::CompactionDeveloperConfig;
186    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
187    use crate::hummock::compaction::selector::tests::{
188        assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level,
189        generate_table_with_ids_and_epochs,
190    };
191    use crate::hummock::compaction::selector::{
192        CompactionSelector, LocalSelectorStatistic, SpaceReclaimCompactionSelector,
193    };
194    use crate::hummock::model::CompactionGroup;
195    use crate::hummock::test_utils::compaction_selector_context;
196
197    #[test]
198    fn test_space_reclaim_compaction_selector() {
199        let max_space_reclaim_bytes = 400;
200        let config = CompactionConfigBuilder::new()
201            .max_level(4)
202            .max_space_reclaim_bytes(max_space_reclaim_bytes)
203            .build();
204        let group_config = CompactionGroup::new(1, config);
205
206        let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
207        assert_eq!(l0.sub_levels.len(), 0);
208        let mut levels = vec![
209            generate_level(1, vec![]),
210            generate_level(2, vec![]),
211            generate_level(
212                3,
213                vec![
214                    generate_table_with_ids_and_epochs(0, 1, 150, 151, 1, vec![0], 0, 0),
215                    generate_table_with_ids_and_epochs(1, 1, 250, 251, 1, vec![1], 0, 0),
216                ],
217            ),
218            Level {
219                level_idx: 4,
220                level_type: LevelType::Nonoverlapping,
221                table_infos: vec![
222                    generate_table_with_ids_and_epochs(2, 1, 0, 100, 1, vec![2], 0, 0),
223                    generate_table_with_ids_and_epochs(3, 1, 101, 200, 1, vec![3], 0, 0),
224                    generate_table_with_ids_and_epochs(4, 1, 222, 300, 1, vec![4], 0, 0),
225                    generate_table_with_ids_and_epochs(5, 1, 333, 400, 1, vec![5], 0, 0),
226                    generate_table_with_ids_and_epochs(6, 1, 444, 500, 1, vec![6], 0, 0),
227                    generate_table_with_ids_and_epochs(7, 1, 555, 600, 1, vec![7], 0, 0),
228                    generate_table_with_ids_and_epochs(8, 1, 666, 700, 1, vec![8], 0, 0),
229                    generate_table_with_ids_and_epochs(9, 1, 777, 800, 1, vec![9], 0, 0),
230                    generate_table_with_ids_and_epochs(10, 1, 888, 1600, 1, vec![10], 0, 0),
231                    generate_table_with_ids_and_epochs(11, 1, 1600, 1800, 1, vec![10], 0, 0),
232                ],
233                ..Default::default()
234            },
235        ];
236
237        {
238            let sst_10 = levels[3].table_infos.get_mut(8).unwrap();
239            assert_eq!(10, sst_10.sst_id);
240            *sst_10 = SstableInfoInner {
241                key_range: KeyRange {
242                    right_exclusive: true,
243                    ..sst_10.get_inner().key_range
244                },
245                ..sst_10.get_inner()
246            }
247            .into();
248        }
249
250        assert_eq!(levels.len(), 4);
251        let levels = Levels {
252            levels,
253            l0,
254            ..Default::default()
255        };
256        let mut member_table_ids = BTreeSet::new();
257        let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
258        let mut local_stats = LocalSelectorStatistic::default();
259
260        let mut selector = SpaceReclaimCompactionSelector::default();
261        {
262            // test max_pick_files limit
263
264            // pick space reclaim
265            let task = selector
266                .pick_compaction(
267                    1,
268                    compaction_selector_context(
269                        &group_config,
270                        &levels,
271                        &member_table_ids,
272                        &mut levels_handler,
273                        &mut local_stats,
274                        &HashMap::default(),
275                        Arc::new(CompactionDeveloperConfig::default()),
276                        &Default::default(),
277                        &HummockVersionStateTableInfo::empty(),
278                    ),
279                )
280                .unwrap();
281            assert_compaction_task(&task, &levels_handler);
282            assert_eq!(task.input.input_levels.len(), 2);
283            assert_eq!(task.input.input_levels[0].level_idx, 3);
284            assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
285            levels_handler[4].add_pending_task(0, 4, &levels.levels[3].table_infos[5..6]);
286            let task = selector
287                .pick_compaction(
288                    1,
289                    compaction_selector_context(
290                        &group_config,
291                        &levels,
292                        &member_table_ids,
293                        &mut levels_handler,
294                        &mut local_stats,
295                        &HashMap::default(),
296                        Arc::new(CompactionDeveloperConfig::default()),
297                        &Default::default(),
298                        &HummockVersionStateTableInfo::empty(),
299                    ),
300                )
301                .unwrap();
302            assert_eq!(task.input.input_levels.len(), 2);
303            assert_eq!(task.input.input_levels[0].level_idx, 4);
304            assert_eq!(task.input.input_levels[0].table_infos.len(), 5);
305
306            for (i, sst) in task.input.input_levels[0].table_infos.iter().enumerate() {
307                assert_eq!(2 + i as u64, sst.sst_id);
308            }
309
310            assert_eq!(task.input.input_levels[1].level_idx, 4);
311            assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
312            assert_eq!(task.input.target_level, 4);
313            assert!(matches!(
314                task.compaction_task_type,
315                compact_task::TaskType::SpaceReclaim
316            ));
317
318            // in this case, no files is pending, so it limit by max_space_reclaim_bytes
319            let select_file_size: u64 = task.input.input_levels[0]
320                .table_infos
321                .iter()
322                .map(|sst| sst.sst_size)
323                .sum();
324            assert!(select_file_size > max_space_reclaim_bytes);
325        }
326
327        {
328            // test pick next range
329            let task = selector
330                .pick_compaction(
331                    1,
332                    compaction_selector_context(
333                        &group_config,
334                        &levels,
335                        &member_table_ids,
336                        &mut levels_handler,
337                        &mut local_stats,
338                        &HashMap::default(),
339                        Arc::new(CompactionDeveloperConfig::default()),
340                        &Default::default(),
341                        &HummockVersionStateTableInfo::empty(),
342                    ),
343                )
344                .unwrap();
345            assert_compaction_task(&task, &levels_handler);
346            assert_eq!(task.input.input_levels.len(), 2);
347            assert_eq!(task.input.input_levels[0].level_idx, 4);
348            assert_eq!(task.input.input_levels[0].table_infos.len(), 4);
349            assert_eq!(task.input.target_level, 4);
350            assert!(matches!(
351                task.compaction_task_type,
352                compact_task::TaskType::SpaceReclaim
353            ));
354            for (i, sst) in task.input.input_levels[0].table_infos.iter().enumerate() {
355                assert_eq!(8 + i as u64, sst.sst_id);
356            }
357
358            assert!(
359                selector
360                    .pick_compaction(
361                        1,
362                        compaction_selector_context(
363                            &group_config,
364                            &levels,
365                            &member_table_ids,
366                            &mut levels_handler,
367                            &mut local_stats,
368                            &HashMap::default(),
369                            Arc::new(CompactionDeveloperConfig::default()),
370                            &Default::default(),
371                            &HummockVersionStateTableInfo::empty(),
372                        ),
373                    )
374                    .is_none()
375            )
376        }
377
378        {
379            // test state, after above 2
380
381            for level_handler in &mut levels_handler {
382                for pending_task_id in &level_handler.pending_tasks_ids() {
383                    level_handler.remove_task(*pending_task_id);
384                }
385            }
386
387            member_table_ids = BTreeSet::from_iter(
388                [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
389                    .into_iter()
390                    .map(TableId::new),
391            );
392            // pick space reclaim
393            let task = selector.pick_compaction(
394                1,
395                compaction_selector_context(
396                    &group_config,
397                    &levels,
398                    &member_table_ids,
399                    &mut levels_handler,
400                    &mut local_stats,
401                    &HashMap::default(),
402                    Arc::new(CompactionDeveloperConfig::default()),
403                    &Default::default(),
404                    &HummockVersionStateTableInfo::empty(),
405                ),
406            );
407            assert!(task.is_none());
408        }
409
410        {
411            for level_handler in &mut levels_handler {
412                for pending_task_id in &level_handler.pending_tasks_ids() {
413                    level_handler.remove_task(*pending_task_id);
414                }
415            }
416
417            member_table_ids =
418                BTreeSet::from_iter([2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(TableId::new));
419            // pick space reclaim
420            let task = selector
421                .pick_compaction(
422                    1,
423                    compaction_selector_context(
424                        &group_config,
425                        &levels,
426                        &member_table_ids,
427                        &mut levels_handler,
428                        &mut local_stats,
429                        &HashMap::default(),
430                        Arc::new(CompactionDeveloperConfig::default()),
431                        &Default::default(),
432                        &HummockVersionStateTableInfo::empty(),
433                    ),
434                )
435                .unwrap();
436            assert_compaction_task(&task, &levels_handler);
437            assert_eq!(task.input.input_levels.len(), 2);
438            assert_eq!(task.input.input_levels[0].level_idx, 3);
439            assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
440            assert_eq!(task.input.target_level, 3);
441            assert!(matches!(
442                task.compaction_task_type,
443                compact_task::TaskType::SpaceReclaim
444            ));
445        }
446
447        {
448            // test continuous file selection
449            for level_handler in &mut levels_handler {
450                for pending_task_id in &level_handler.pending_tasks_ids() {
451                    level_handler.remove_task(*pending_task_id);
452                }
453            }
454
455            // rebuild selector
456            selector = SpaceReclaimCompactionSelector::default();
457            // cut range [3,4] [6] [8,9,10]
458            member_table_ids = BTreeSet::from_iter([0, 1, 2, 5, 7].into_iter().map(TableId::new));
459            let expect_task_file_count = [2, 1, 4];
460            let expect_task_sst_id_range = [vec![3, 4], vec![6], vec![8, 9, 10, 11]];
461            for (index, x) in expect_task_file_count.iter().enumerate() {
462                // // pick space reclaim
463                let task = selector
464                    .pick_compaction(
465                        1,
466                        compaction_selector_context(
467                            &group_config,
468                            &levels,
469                            &member_table_ids,
470                            &mut levels_handler,
471                            &mut local_stats,
472                            &HashMap::default(),
473                            Arc::new(CompactionDeveloperConfig::default()),
474                            &Default::default(),
475                            &HummockVersionStateTableInfo::empty(),
476                        ),
477                    )
478                    .unwrap();
479
480                assert_compaction_task(&task, &levels_handler);
481                assert_eq!(task.input.input_levels.len(), 2);
482                assert_eq!(task.input.input_levels[0].level_idx, 4);
483
484                assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
485                let select_sst = &task.input.input_levels[0]
486                    .table_infos
487                    .iter()
488                    .map(|sst| sst.sst_id)
489                    .collect_vec();
490                assert!(select_sst.is_sorted());
491                assert_eq!(expect_task_sst_id_range[index], *select_sst);
492
493                assert_eq!(task.input.input_levels[1].level_idx, 4);
494                assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
495                assert_eq!(task.input.target_level, 4);
496                assert!(matches!(
497                    task.compaction_task_type,
498                    compact_task::TaskType::SpaceReclaim
499                ));
500            }
501        }
502
503        {
504            // test continuous file selection with filter change
505            for level_handler in &mut levels_handler {
506                for pending_task_id in &level_handler.pending_tasks_ids() {
507                    level_handler.remove_task(*pending_task_id);
508                }
509            }
510
511            // rebuild selector
512            selector = SpaceReclaimCompactionSelector::default();
513            // cut range [3,4] [6] [8,9,10]
514
515            member_table_ids = BTreeSet::from_iter([0, 1, 2, 5, 7].into_iter().map(TableId::new));
516            let expect_task_file_count = [2, 1, 5];
517            let expect_task_sst_id_range = [vec![3, 4], vec![6], vec![7, 8, 9, 10, 11]];
518            for (index, x) in expect_task_file_count.iter().enumerate() {
519                if index == expect_task_file_count.len() - 1 {
520                    member_table_ids = BTreeSet::from_iter([2, 5].into_iter().map(TableId::new));
521                }
522
523                // // pick space reclaim
524                let task = selector
525                    .pick_compaction(
526                        1,
527                        compaction_selector_context(
528                            &group_config,
529                            &levels,
530                            &member_table_ids,
531                            &mut levels_handler,
532                            &mut local_stats,
533                            &HashMap::default(),
534                            Arc::new(CompactionDeveloperConfig::default()),
535                            &Default::default(),
536                            &HummockVersionStateTableInfo::empty(),
537                        ),
538                    )
539                    .unwrap();
540
541                assert_compaction_task(&task, &levels_handler);
542                assert_eq!(task.input.input_levels.len(), 2);
543                assert_eq!(task.input.input_levels[0].level_idx, 4);
544
545                assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
546                let select_sst = &task.input.input_levels[0]
547                    .table_infos
548                    .iter()
549                    .map(|sst| sst.sst_id)
550                    .collect_vec();
551                assert!(select_sst.is_sorted());
552                assert_eq!(expect_task_sst_id_range[index], *select_sst);
553
554                assert_eq!(task.input.input_levels[1].level_idx, 4);
555                assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
556                assert_eq!(task.input.target_level, 4);
557                assert!(matches!(
558                    task.compaction_task_type,
559                    compact_task::TaskType::SpaceReclaim
560                ));
561            }
562        }
563    }
564}