risingwave_meta/hummock/compaction/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![expect(clippy::arc_with_non_send_sync, reason = "FIXME: later")]
16
17pub mod compaction_config;
18mod overlap_strategy;
19use risingwave_common::catalog::{TableId, TableOption};
20use risingwave_hummock_sdk::compact_task::CompactTask;
21use risingwave_hummock_sdk::level::Levels;
22use risingwave_pb::hummock::compact_task::{self};
23
24mod picker;
25pub mod selector;
26use std::collections::{BTreeSet, HashMap};
27use std::fmt::{Debug, Formatter};
28use std::sync::Arc;
29
30use picker::{LevelCompactionPicker, TierCompactionPicker};
31use risingwave_hummock_sdk::filter_utils::{
32    must_resolve_sstable_filter_kind, must_resolve_sstable_filter_layout,
33};
34use risingwave_hummock_sdk::table_watermark::TableWatermarks;
35use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
36use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
37use risingwave_pb::hummock::compaction_config::CompactionMode;
38use risingwave_pb::hummock::{CompactionConfig, PbSstableFilterLayout, PbSstableFilterType};
39pub use selector::{CompactionSelector, CompactionSelectorContext};
40
41use self::selector::{EmergencySelector, LocalSelectorStatistic};
42use super::GroupStateValidator;
43use crate::MetaOpts;
44use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy};
45use crate::hummock::compaction::picker::CompactionInput;
46use crate::hummock::level_handler::LevelHandler;
47use crate::hummock::model::CompactionGroup;
48
49#[derive(Clone)]
50pub struct CompactStatus {
51    pub compaction_group_id: CompactionGroupId,
52    pub level_handlers: Vec<LevelHandler>,
53}
54
55impl Debug for CompactStatus {
56    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("CompactStatus")
58            .field("compaction_group_id", &self.compaction_group_id)
59            .field("level_handlers", &self.level_handlers)
60            .finish()
61    }
62}
63
64impl PartialEq for CompactStatus {
65    fn eq(&self, other: &Self) -> bool {
66        self.level_handlers.eq(&other.level_handlers)
67            && self.compaction_group_id == other.compaction_group_id
68    }
69}
70
71pub struct CompactionTask {
72    pub input: CompactionInput,
73    pub base_level: usize,
74    pub compression_algorithm: String,
75    pub sstable_filter_kind: PbSstableFilterType,
76    pub sstable_filter_layout: PbSstableFilterLayout,
77    pub target_file_size: u64,
78    pub compaction_task_type: compact_task::TaskType,
79}
80
81pub fn create_overlap_strategy(compaction_mode: CompactionMode) -> Arc<dyn OverlapStrategy> {
82    match compaction_mode {
83        CompactionMode::Range => Arc::new(RangeOverlapStrategy::default()),
84        CompactionMode::Unspecified => unreachable!(),
85    }
86}
87
88impl CompactStatus {
89    pub fn new(compaction_group_id: CompactionGroupId, max_level: u64) -> CompactStatus {
90        let mut level_handlers = vec![];
91        for level in 0..=max_level {
92            level_handlers.push(LevelHandler::new(level as u32));
93        }
94        CompactStatus {
95            compaction_group_id,
96            level_handlers,
97        }
98    }
99
100    #[expect(clippy::too_many_arguments)]
101    pub fn get_compact_task(
102        &mut self,
103        levels: &Levels,
104        member_table_ids: &BTreeSet<TableId>,
105        task_id: HummockCompactionTaskId,
106        group: &CompactionGroup,
107        stats: &mut LocalSelectorStatistic,
108        selector: &mut dyn CompactionSelector,
109        table_id_to_options: &HashMap<TableId, TableOption>,
110        developer_config: Arc<CompactionDeveloperConfig>,
111        table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
112        state_table_info: &HummockVersionStateTableInfo,
113    ) -> Option<CompactionTask> {
114        let selector_context = CompactionSelectorContext {
115            group,
116            levels,
117            member_table_ids,
118            level_handlers: &mut self.level_handlers,
119            selector_stats: stats,
120            table_id_to_options,
121            developer_config: developer_config.clone(),
122            table_watermarks,
123            state_table_info,
124        };
125        // When we compact the files, we must make the result of compaction meet the following
126        // conditions, for any user key, the epoch of it in the file existing in the lower
127        // layer must be larger.
128        match selector.pick_compaction(task_id, selector_context) {
129            Some(task) => {
130                return Some(task);
131            }
132            _ => {
133                let compaction_group_config = &group.compaction_config;
134                let group_state =
135                    GroupStateValidator::group_state(levels, compaction_group_config.as_ref());
136                if (group_state.is_write_stop() || group_state.is_emergency())
137                    && compaction_group_config.enable_emergency_picker
138                {
139                    let selector_context = CompactionSelectorContext {
140                        group,
141                        levels,
142                        member_table_ids,
143                        level_handlers: &mut self.level_handlers,
144                        selector_stats: stats,
145                        table_id_to_options,
146                        developer_config,
147                        table_watermarks,
148                        state_table_info,
149                    };
150                    return EmergencySelector::default().pick_compaction(task_id, selector_context);
151                }
152            }
153        }
154
155        None
156    }
157
158    pub fn report_compact_task(&mut self, compact_task: &CompactTask) {
159        for level in &compact_task.input_ssts {
160            self.level_handlers[level.level_idx as usize].remove_task(compact_task.task_id);
161        }
162    }
163
164    pub fn compaction_group_id(&self) -> CompactionGroupId {
165        self.compaction_group_id
166    }
167}
168
169pub fn create_compaction_task(
170    compaction_config: &CompactionConfig,
171    input: CompactionInput,
172    base_level: usize,
173    compaction_task_type: compact_task::TaskType,
174) -> CompactionTask {
175    let target_file_size = if input.target_level == 0 {
176        compaction_config.target_file_size_base
177    } else {
178        assert!(input.target_level >= base_level);
179        let step = (input.target_level - base_level) / 2;
180        compaction_config.target_file_size_base << step
181    };
182
183    CompactionTask {
184        compression_algorithm: get_compression_algorithm(
185            compaction_config,
186            base_level,
187            input.target_level,
188        ),
189        sstable_filter_kind: must_resolve_sstable_filter_kind(
190            compaction_config,
191            base_level,
192            input.target_level,
193        ),
194        sstable_filter_layout: must_resolve_sstable_filter_layout(
195            compaction_config,
196            base_level,
197            input.target_level,
198        ),
199        base_level,
200        input,
201        target_file_size,
202        compaction_task_type,
203    }
204}
205
206pub fn get_compression_algorithm(
207    compaction_config: &CompactionConfig,
208    base_level: usize,
209    level: usize,
210) -> String {
211    if level == 0 || level < base_level {
212        compaction_config.compression_algorithm[0].clone()
213    } else {
214        let idx = level - base_level + 1;
215        compaction_config.compression_algorithm[idx].clone()
216    }
217}
218
219pub struct CompactionDeveloperConfig {
220    /// l0 picker whether to select trivial move task
221    pub enable_trivial_move: bool,
222
223    /// l0 multi level picker whether to check the overlap accuracy between sub levels
224    pub enable_check_task_level_overlap: bool,
225}
226
227impl CompactionDeveloperConfig {
228    pub fn new_from_meta_opts(opts: &MetaOpts) -> Self {
229        Self {
230            enable_trivial_move: opts.enable_trivial_move,
231            enable_check_task_level_overlap: opts.enable_check_task_level_overlap,
232        }
233    }
234}
235
236impl Default for CompactionDeveloperConfig {
237    fn default() -> Self {
238        Self {
239            enable_trivial_move: true,
240            enable_check_task_level_overlap: true,
241        }
242    }
243}