Skip to main content

risingwave_meta/hummock/compaction/selector/
space_reclaim_selector.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20use std::collections::HashMap;
21
22use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
23use risingwave_pb::hummock::compact_task;
24
25use super::{CompactionSelector, DynamicLevelSelectorCore};
26use crate::hummock::compaction::picker::{SpaceReclaimCompactionPicker, SpaceReclaimPickerState};
27use crate::hummock::compaction::selector::CompactionSelectorContext;
28use crate::hummock::compaction::{CompactionTask, create_compaction_task};
29
30#[derive(Default)]
31pub struct SpaceReclaimCompactionSelector {
32    state: HashMap<CompactionGroupId, SpaceReclaimPickerState>,
33}
34
35impl CompactionSelector for SpaceReclaimCompactionSelector {
36    fn pick_compaction(
37        &mut self,
38        task_id: HummockCompactionTaskId,
39        context: CompactionSelectorContext<'_>,
40    ) -> Option<CompactionTask> {
41        let CompactionSelectorContext {
42            group,
43            levels,
44            member_table_ids,
45            level_handlers,
46            developer_config,
47            in_progress_compactions,
48            ..
49        } = context;
50        let dynamic_level_core =
51            DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
52        let mut picker = SpaceReclaimCompactionPicker::new(
53            group.compaction_config.max_space_reclaim_bytes,
54            member_table_ids.iter().copied().collect(),
55        );
56        let ctx = dynamic_level_core.calculate_level_base_size(levels);
57        let state = self.state.entry(group.group_id).or_default();
58
59        let compaction_input = picker.pick_compaction(levels, level_handlers, state)?;
60        if !compaction_input.skip_target_range_conflict_check
61            && in_progress_compactions.has_conflict_with_input(&compaction_input)
62        {
63            return None;
64        }
65        compaction_input.add_pending_task(task_id, level_handlers);
66
67        Some(create_compaction_task(
68            dynamic_level_core.get_config(),
69            compaction_input,
70            ctx.base_level,
71            self.task_type(),
72        ))
73    }
74
75    fn name(&self) -> &'static str {
76        "SpaceReclaimCompaction"
77    }
78
79    fn task_type(&self) -> compact_task::TaskType {
80        compact_task::TaskType::SpaceReclaim
81    }
82}