Skip to main content

risingwave_meta/hummock/compaction/selector/
manual_selector.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
16// This source code is licensed under both the GPLv2 (found in the
17// COPYING file in the root directory) and Apache 2.0 License
18// (found in the LICENSE.Apache file in the root directory).
19
20use std::collections::HashSet;
21
22use bytes::Bytes;
23use risingwave_hummock_sdk::compaction_group::StateTableId;
24use risingwave_hummock_sdk::key_range::KeyRange;
25use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockSstableId};
26use risingwave_pb::hummock::compact_task;
27
28use super::{CompactionSelector, DynamicLevelSelectorCore};
29use crate::hummock::compaction::picker::{
30    CompactionPicker, LocalPickerStatistic, ManualCompactionPicker,
31};
32use crate::hummock::compaction::selector::CompactionSelectorContext;
33use crate::hummock::compaction::{CompactionTask, create_compaction_task, create_overlap_strategy};
34
35#[derive(Clone, Debug, PartialEq)]
36pub struct ManualCompactionOption {
37    /// Filters out SSTs to pick. Has no effect if empty.
38    pub sst_ids: Vec<HummockSstableId>,
39    /// Filters out SSTs to pick.
40    pub key_range: KeyRange,
41    /// Filters out SSTs to pick. Has no effect if empty.
42    pub internal_table_id: HashSet<StateTableId>,
43    /// Input level.
44    pub level: usize,
45    /// Output level. Defaults to the implicit manual compaction target if unset.
46    pub target_level: Option<usize>,
47    /// When true, skip manual compaction if any task is pending in the compaction group.
48    pub exclusive: bool,
49}
50
51impl Default for ManualCompactionOption {
52    fn default() -> Self {
53        Self {
54            sst_ids: vec![],
55            key_range: KeyRange {
56                left: Bytes::default(),
57                right: Bytes::default(),
58                right_exclusive: false,
59            },
60            internal_table_id: HashSet::default(),
61            level: 1,
62            target_level: None,
63            exclusive: false,
64        }
65    }
66}
67
68pub struct ManualCompactionSelector {
69    option: ManualCompactionOption,
70    blocked_by_pending: bool,
71    validation_error: Option<String>,
72}
73
74impl ManualCompactionSelector {
75    pub fn new(option: ManualCompactionOption) -> Self {
76        Self {
77            option,
78            blocked_by_pending: false,
79            validation_error: None,
80        }
81    }
82
83    pub fn blocked_by_pending(&self) -> bool {
84        self.blocked_by_pending
85    }
86
87    pub fn validation_error(&self) -> Option<&str> {
88        self.validation_error.as_deref()
89    }
90}
91
92impl CompactionSelector for ManualCompactionSelector {
93    fn pick_compaction(
94        &mut self,
95        task_id: HummockCompactionTaskId,
96        context: CompactionSelectorContext<'_>,
97    ) -> Option<CompactionTask> {
98        let CompactionSelectorContext {
99            group,
100            levels,
101            level_handlers,
102            developer_config,
103            in_progress_compactions,
104            ..
105        } = context;
106        self.blocked_by_pending = false;
107        self.validation_error = None;
108
109        let dynamic_level_core =
110            DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
111        let overlap_strategy = create_overlap_strategy(group.compaction_config.compaction_mode());
112        let ctx = dynamic_level_core.calculate_level_base_size(levels);
113        let (mut picker, base_level) = {
114            let max_level = group.compaction_config.max_level as usize;
115            if self.option.level > max_level {
116                self.validation_error = Some(format!(
117                    "level {} exceeds max_level {}",
118                    self.option.level, max_level
119                ));
120                return None;
121            }
122            let target_level = self.option.target_level.unwrap_or_else(|| {
123                if self.option.level == 0 {
124                    if self.option.sst_ids.is_empty() {
125                        ctx.base_level
126                    } else {
127                        0
128                    }
129                } else if self.option.level == group.compaction_config.max_level as usize {
130                    self.option.level
131                } else {
132                    self.option.level + 1
133                }
134            });
135            if target_level > max_level {
136                self.validation_error = Some(format!(
137                    "target_level {} exceeds max_level {}",
138                    target_level, max_level
139                ));
140                return None;
141            }
142            if self.option.level == 0 {
143                let expected_target_level = if self.option.sst_ids.is_empty() {
144                    ctx.base_level
145                } else {
146                    0
147                };
148                if target_level != expected_target_level {
149                    self.validation_error = Some(format!(
150                        "target_level for L0 must be {}, got {}",
151                        expected_target_level, target_level
152                    ));
153                    return None;
154                }
155            }
156            if self.option.level > 0
157                && target_level != self.option.level
158                && target_level != self.option.level + 1
159            {
160                self.validation_error = Some(format!(
161                    "target_level for L{} must be {} or {}, got {}",
162                    self.option.level,
163                    self.option.level,
164                    self.option.level + 1,
165                    target_level
166                ));
167                return None;
168            }
169            if self.option.level > 0 && self.option.level < ctx.base_level {
170                return None;
171            }
172            (
173                ManualCompactionPicker::new(overlap_strategy, self.option.clone(), target_level),
174                ctx.base_level,
175            )
176        };
177
178        let compaction_input =
179            picker.pick_compaction(levels, level_handlers, &mut LocalPickerStatistic::default());
180
181        if compaction_input.is_none()
182            && self.option.exclusive
183            && level_handlers
184                .iter()
185                .any(|level_handler| level_handler.pending_file_count() > 0)
186        {
187            self.blocked_by_pending = true;
188        }
189
190        let compaction_input = compaction_input?;
191        if !compaction_input.skip_target_range_conflict_check
192            && in_progress_compactions.has_conflict_with_input(&compaction_input)
193        {
194            return None;
195        }
196        compaction_input.add_pending_task(task_id, level_handlers);
197
198        Some(create_compaction_task(
199            group.compaction_config.as_ref(),
200            compaction_input,
201            base_level,
202            self.task_type(),
203        ))
204    }
205
206    fn name(&self) -> &'static str {
207        "ManualCompactionSelector"
208    }
209
210    fn task_type(&self) -> compact_task::TaskType {
211        compact_task::TaskType::Manual
212    }
213}