risingwave_meta/hummock/compaction/selector/
manual_selector.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20use std::collections::HashSet;
21
22use bytes::Bytes;
23use risingwave_hummock_sdk::compaction_group::StateTableId;
24use risingwave_hummock_sdk::key_range::KeyRange;
25use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockSstableId};
26use risingwave_pb::hummock::compact_task;
27
28use super::{CompactionSelector, DynamicLevelSelectorCore};
29use crate::hummock::compaction::picker::{
30    CompactionPicker, LocalPickerStatistic, ManualCompactionPicker,
31};
32use crate::hummock::compaction::selector::CompactionSelectorContext;
33use crate::hummock::compaction::{CompactionTask, create_compaction_task, create_overlap_strategy};
34
35#[derive(Clone, Debug, PartialEq)]
36pub struct ManualCompactionOption {
37    /// Filters out SSTs to pick. Has no effect if empty.
38    pub sst_ids: Vec<HummockSstableId>,
39    /// Filters out SSTs to pick.
40    pub key_range: KeyRange,
41    /// Filters out SSTs to pick. Has no effect if empty.
42    pub internal_table_id: HashSet<StateTableId>,
43    /// Input level.
44    pub level: usize,
45    /// When true, skip manual compaction if any task is pending in the compaction group.
46    pub exclusive: bool,
47}
48
49impl Default for ManualCompactionOption {
50    fn default() -> Self {
51        Self {
52            sst_ids: vec![],
53            key_range: KeyRange {
54                left: Bytes::default(),
55                right: Bytes::default(),
56                right_exclusive: false,
57            },
58            internal_table_id: HashSet::default(),
59            level: 1,
60            exclusive: false,
61        }
62    }
63}
64
65pub struct ManualCompactionSelector {
66    option: ManualCompactionOption,
67    blocked_by_pending: bool,
68}
69
70impl ManualCompactionSelector {
71    pub fn new(option: ManualCompactionOption) -> Self {
72        Self {
73            option,
74            blocked_by_pending: false,
75        }
76    }
77
78    pub fn blocked_by_pending(&self) -> bool {
79        self.blocked_by_pending
80    }
81}
82
83impl CompactionSelector for ManualCompactionSelector {
84    fn pick_compaction(
85        &mut self,
86        task_id: HummockCompactionTaskId,
87        context: CompactionSelectorContext<'_>,
88    ) -> Option<CompactionTask> {
89        let CompactionSelectorContext {
90            group,
91            levels,
92            level_handlers,
93            developer_config,
94            ..
95        } = context;
96        self.blocked_by_pending = false;
97
98        let dynamic_level_core =
99            DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
100        let overlap_strategy = create_overlap_strategy(group.compaction_config.compaction_mode());
101        let ctx = dynamic_level_core.calculate_level_base_size(levels);
102        let (mut picker, base_level) = {
103            let target_level = if self.option.level == 0 {
104                ctx.base_level
105            } else if self.option.level == group.compaction_config.max_level as usize {
106                self.option.level
107            } else {
108                self.option.level + 1
109            };
110            if self.option.level > 0 && self.option.level < ctx.base_level {
111                return None;
112            }
113            (
114                ManualCompactionPicker::new(overlap_strategy, self.option.clone(), target_level),
115                ctx.base_level,
116            )
117        };
118
119        let compaction_input =
120            picker.pick_compaction(levels, level_handlers, &mut LocalPickerStatistic::default());
121
122        if compaction_input.is_none()
123            && self.option.exclusive
124            && level_handlers
125                .iter()
126                .any(|level_handler| level_handler.pending_file_count() > 0)
127        {
128            self.blocked_by_pending = true;
129        }
130
131        let compaction_input = compaction_input?;
132        compaction_input.add_pending_task(task_id, level_handlers);
133
134        Some(create_compaction_task(
135            group.compaction_config.as_ref(),
136            compaction_input,
137            base_level,
138            self.task_type(),
139        ))
140    }
141
142    fn name(&self) -> &'static str {
143        "ManualCompactionSelector"
144    }
145
146    fn task_type(&self) -> compact_task::TaskType {
147        compact_task::TaskType::Manual
148    }
149}