risingwave_meta/hummock/compaction/selector/
manual_selector.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20use std::collections::HashSet;
21
22use bytes::Bytes;
23use risingwave_hummock_sdk::compaction_group::StateTableId;
24use risingwave_hummock_sdk::key_range::KeyRange;
25use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockSstableId};
26use risingwave_pb::hummock::compact_task;
27
28use super::{CompactionSelector, DynamicLevelSelectorCore};
29use crate::hummock::compaction::picker::{
30    CompactionPicker, LocalPickerStatistic, ManualCompactionPicker,
31};
32use crate::hummock::compaction::selector::CompactionSelectorContext;
33use crate::hummock::compaction::{CompactionTask, create_compaction_task, create_overlap_strategy};
34
35#[derive(Clone, Debug, PartialEq)]
36pub struct ManualCompactionOption {
37    /// Filters out SSTs to pick. Has no effect if empty.
38    pub sst_ids: Vec<HummockSstableId>,
39    /// Filters out SSTs to pick.
40    pub key_range: KeyRange,
41    /// Filters out SSTs to pick. Has no effect if empty.
42    pub internal_table_id: HashSet<StateTableId>,
43    /// Input level.
44    pub level: usize,
45    /// Output level. Defaults to the implicit manual compaction target if unset.
46    pub target_level: Option<usize>,
47    /// When true, skip manual compaction if any task is pending in the compaction group.
48    pub exclusive: bool,
49}
50
51impl Default for ManualCompactionOption {
52    fn default() -> Self {
53        Self {
54            sst_ids: vec![],
55            key_range: KeyRange {
56                left: Bytes::default(),
57                right: Bytes::default(),
58                right_exclusive: false,
59            },
60            internal_table_id: HashSet::default(),
61            level: 1,
62            target_level: None,
63            exclusive: false,
64        }
65    }
66}
67
68pub struct ManualCompactionSelector {
69    option: ManualCompactionOption,
70    blocked_by_pending: bool,
71    validation_error: Option<String>,
72}
73
74impl ManualCompactionSelector {
75    pub fn new(option: ManualCompactionOption) -> Self {
76        Self {
77            option,
78            blocked_by_pending: false,
79            validation_error: None,
80        }
81    }
82
83    pub fn blocked_by_pending(&self) -> bool {
84        self.blocked_by_pending
85    }
86
87    pub fn validation_error(&self) -> Option<&str> {
88        self.validation_error.as_deref()
89    }
90}
91
92impl CompactionSelector for ManualCompactionSelector {
93    fn pick_compaction(
94        &mut self,
95        task_id: HummockCompactionTaskId,
96        context: CompactionSelectorContext<'_>,
97    ) -> Option<CompactionTask> {
98        let CompactionSelectorContext {
99            group,
100            levels,
101            level_handlers,
102            developer_config,
103            ..
104        } = context;
105        self.blocked_by_pending = false;
106        self.validation_error = None;
107
108        let dynamic_level_core =
109            DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
110        let overlap_strategy = create_overlap_strategy(group.compaction_config.compaction_mode());
111        let ctx = dynamic_level_core.calculate_level_base_size(levels);
112        let (mut picker, base_level) = {
113            let max_level = group.compaction_config.max_level as usize;
114            if self.option.level > max_level {
115                self.validation_error = Some(format!(
116                    "level {} exceeds max_level {}",
117                    self.option.level, max_level
118                ));
119                return None;
120            }
121            let target_level = self.option.target_level.unwrap_or_else(|| {
122                if self.option.level == 0 {
123                    if self.option.sst_ids.is_empty() {
124                        ctx.base_level
125                    } else {
126                        0
127                    }
128                } else if self.option.level == group.compaction_config.max_level as usize {
129                    self.option.level
130                } else {
131                    self.option.level + 1
132                }
133            });
134            if target_level > max_level {
135                self.validation_error = Some(format!(
136                    "target_level {} exceeds max_level {}",
137                    target_level, max_level
138                ));
139                return None;
140            }
141            if self.option.level == 0 {
142                let expected_target_level = if self.option.sst_ids.is_empty() {
143                    ctx.base_level
144                } else {
145                    0
146                };
147                if target_level != expected_target_level {
148                    self.validation_error = Some(format!(
149                        "target_level for L0 must be {}, got {}",
150                        expected_target_level, target_level
151                    ));
152                    return None;
153                }
154            }
155            if self.option.level > 0
156                && target_level != self.option.level
157                && target_level != self.option.level + 1
158            {
159                self.validation_error = Some(format!(
160                    "target_level for L{} must be {} or {}, got {}",
161                    self.option.level,
162                    self.option.level,
163                    self.option.level + 1,
164                    target_level
165                ));
166                return None;
167            }
168            if self.option.level > 0 && self.option.level < ctx.base_level {
169                return None;
170            }
171            (
172                ManualCompactionPicker::new(overlap_strategy, self.option.clone(), target_level),
173                ctx.base_level,
174            )
175        };
176
177        let compaction_input =
178            picker.pick_compaction(levels, level_handlers, &mut LocalPickerStatistic::default());
179
180        if compaction_input.is_none()
181            && self.option.exclusive
182            && level_handlers
183                .iter()
184                .any(|level_handler| level_handler.pending_file_count() > 0)
185        {
186            self.blocked_by_pending = true;
187        }
188
189        let compaction_input = compaction_input?;
190        compaction_input.add_pending_task(task_id, level_handlers);
191
192        Some(create_compaction_task(
193            group.compaction_config.as_ref(),
194            compaction_input,
195            base_level,
196            self.task_type(),
197        ))
198    }
199
200    fn name(&self) -> &'static str {
201        "ManualCompactionSelector"
202    }
203
204    fn task_type(&self) -> compact_task::TaskType {
205        compact_task::TaskType::Manual
206    }
207}