Skip to main content

risingwave_meta/hummock/compaction/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![expect(clippy::arc_with_non_send_sync, reason = "FIXME: later")]
16
17pub mod compaction_config;
18pub(crate) mod in_progress_compaction;
19mod overlap_strategy;
20use risingwave_common::catalog::{TableId, TableOption};
21use risingwave_hummock_sdk::compact_task::CompactTask;
22use risingwave_hummock_sdk::level::Levels;
23use risingwave_pb::hummock::compact_task::{self};
24
25mod picker;
26pub mod selector;
27use std::collections::{BTreeSet, HashMap};
28use std::fmt::{Debug, Formatter};
29use std::sync::Arc;
30
31use picker::{LevelCompactionPicker, TierCompactionPicker};
32use risingwave_hummock_sdk::filter_utils::{
33    must_resolve_sstable_filter_layout, must_resolve_sstable_filter_type,
34};
35use risingwave_hummock_sdk::table_watermark::TableWatermarks;
36use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
37use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
38use risingwave_pb::hummock::compaction_config::CompactionMode;
39use risingwave_pb::hummock::{CompactionConfig, PbSstableFilterLayout, PbSstableFilterType};
40pub use selector::{CompactionSelector, CompactionSelectorContext};
41
42use self::selector::{EmergencySelector, LocalSelectorStatistic};
43use super::GroupStateValidator;
44use crate::MetaOpts;
45use crate::hummock::compaction::in_progress_compaction::InProgressCompactionView;
46use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy};
47use crate::hummock::compaction::picker::CompactionInput;
48use crate::hummock::level_handler::LevelHandler;
49use crate::hummock::model::CompactionGroup;
50
51#[derive(Clone)]
52pub struct CompactStatus {
53    pub compaction_group_id: CompactionGroupId,
54    pub level_handlers: Vec<LevelHandler>,
55}
56
57impl Debug for CompactStatus {
58    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59        f.debug_struct("CompactStatus")
60            .field("compaction_group_id", &self.compaction_group_id)
61            .field("level_handlers", &self.level_handlers)
62            .finish()
63    }
64}
65
66impl PartialEq for CompactStatus {
67    fn eq(&self, other: &Self) -> bool {
68        self.level_handlers.eq(&other.level_handlers)
69            && self.compaction_group_id == other.compaction_group_id
70    }
71}
72
73pub struct CompactionTask {
74    pub input: CompactionInput,
75    pub base_level: usize,
76    pub compression_algorithm: String,
77    pub sstable_filter_type: PbSstableFilterType,
78    pub sstable_filter_layout: PbSstableFilterLayout,
79    pub target_file_size: u64,
80    pub compaction_task_type: compact_task::TaskType,
81}
82
83pub fn create_overlap_strategy(compaction_mode: CompactionMode) -> Arc<dyn OverlapStrategy> {
84    match compaction_mode {
85        CompactionMode::Range => Arc::new(RangeOverlapStrategy::default()),
86        CompactionMode::Unspecified => unreachable!(),
87    }
88}
89
90impl CompactStatus {
91    pub fn new(compaction_group_id: CompactionGroupId, max_level: u64) -> CompactStatus {
92        let mut level_handlers = vec![];
93        for level in 0..=max_level {
94            level_handlers.push(LevelHandler::new(level as u32));
95        }
96        CompactStatus {
97            compaction_group_id,
98            level_handlers,
99        }
100    }
101
102    #[expect(clippy::too_many_arguments)]
103    pub fn get_compact_task(
104        &mut self,
105        levels: &Levels,
106        member_table_ids: &BTreeSet<TableId>,
107        task_id: HummockCompactionTaskId,
108        group: &CompactionGroup,
109        stats: &mut LocalSelectorStatistic,
110        selector: &mut dyn CompactionSelector,
111        table_id_to_options: &HashMap<TableId, TableOption>,
112        developer_config: Arc<CompactionDeveloperConfig>,
113        table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
114        state_table_info: &HummockVersionStateTableInfo,
115        in_progress_compactions: &InProgressCompactionView,
116    ) -> Option<CompactionTask> {
117        let selector_context = CompactionSelectorContext {
118            group,
119            levels,
120            member_table_ids,
121            level_handlers: &mut self.level_handlers,
122            selector_stats: stats,
123            table_id_to_options,
124            developer_config: developer_config.clone(),
125            table_watermarks,
126            state_table_info,
127            in_progress_compactions,
128        };
129        // When we compact the files, we must make the result of compaction meet the following
130        // conditions, for any user key, the epoch of it in the file existing in the lower
131        // layer must be larger.
132        match selector.pick_compaction(task_id, selector_context) {
133            Some(task) => {
134                return Some(task);
135            }
136            _ => {
137                let compaction_group_config = &group.compaction_config;
138                let group_state =
139                    GroupStateValidator::group_state(levels, compaction_group_config.as_ref());
140                if (group_state.is_write_stop() || group_state.is_emergency())
141                    && compaction_group_config.enable_emergency_picker
142                {
143                    let selector_context = CompactionSelectorContext {
144                        group,
145                        levels,
146                        member_table_ids,
147                        level_handlers: &mut self.level_handlers,
148                        selector_stats: stats,
149                        table_id_to_options,
150                        developer_config,
151                        table_watermarks,
152                        state_table_info,
153                        in_progress_compactions,
154                    };
155                    return EmergencySelector::default().pick_compaction(task_id, selector_context);
156                }
157            }
158        }
159
160        None
161    }
162
163    pub fn report_compact_task(&mut self, compact_task: &CompactTask) {
164        for level in &compact_task.input_ssts {
165            self.level_handlers[level.level_idx as usize].remove_task(compact_task.task_id);
166        }
167    }
168
169    pub fn compaction_group_id(&self) -> CompactionGroupId {
170        self.compaction_group_id
171    }
172}
173
174pub fn create_compaction_task(
175    compaction_config: &CompactionConfig,
176    input: CompactionInput,
177    base_level: usize,
178    compaction_task_type: compact_task::TaskType,
179) -> CompactionTask {
180    let target_file_size = if input.target_level == 0 {
181        compaction_config.target_file_size_base
182    } else {
183        assert!(input.target_level >= base_level);
184        let step = (input.target_level - base_level) / 2;
185        compaction_config.target_file_size_base << step
186    };
187
188    CompactionTask {
189        compression_algorithm: get_compression_algorithm(
190            compaction_config,
191            base_level,
192            input.target_level,
193        ),
194        sstable_filter_type: must_resolve_sstable_filter_type(
195            compaction_config,
196            base_level,
197            input.target_level,
198        ),
199        sstable_filter_layout: must_resolve_sstable_filter_layout(
200            compaction_config,
201            base_level,
202            input.target_level,
203        ),
204        base_level,
205        input,
206        target_file_size,
207        compaction_task_type,
208    }
209}
210
211pub fn get_compression_algorithm(
212    compaction_config: &CompactionConfig,
213    base_level: usize,
214    level: usize,
215) -> String {
216    if level == 0 || level < base_level {
217        compaction_config.compression_algorithm[0].clone()
218    } else {
219        let idx = level - base_level + 1;
220        compaction_config.compression_algorithm[idx].clone()
221    }
222}
223
224pub struct CompactionDeveloperConfig {
225    /// l0 picker whether to select trivial move task
226    pub enable_trivial_move: bool,
227
228    /// l0 multi level picker whether to check the overlap accuracy between sub levels
229    pub enable_check_task_level_overlap: bool,
230}
231
232impl CompactionDeveloperConfig {
233    pub fn new_from_meta_opts(opts: &MetaOpts) -> Self {
234        Self {
235            enable_trivial_move: opts.enable_trivial_move,
236            enable_check_task_level_overlap: opts.enable_check_task_level_overlap,
237        }
238    }
239}
240
241impl Default for CompactionDeveloperConfig {
242    fn default() -> Self {
243        Self {
244            enable_trivial_move: true,
245            enable_check_task_level_overlap: true,
246        }
247    }
248}