risingwave_meta/hummock/compaction/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![expect(clippy::arc_with_non_send_sync, reason = "FIXME: later")]
16
17pub mod compaction_config;
18mod overlap_strategy;
19use risingwave_common::catalog::{TableId, TableOption};
20use risingwave_hummock_sdk::compact_task::CompactTask;
21use risingwave_hummock_sdk::level::Levels;
22use risingwave_pb::hummock::compact_task::{self};
23
24mod picker;
25pub mod selector;
26use std::collections::{BTreeSet, HashMap};
27use std::fmt::{Debug, Formatter};
28use std::sync::Arc;
29
30use picker::{LevelCompactionPicker, TierCompactionPicker};
31use risingwave_hummock_sdk::table_watermark::TableWatermarks;
32use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
33use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
34use risingwave_pb::hummock::CompactionConfig;
35use risingwave_pb::hummock::compaction_config::CompactionMode;
36pub use selector::{CompactionSelector, CompactionSelectorContext};
37
38use self::selector::{EmergencySelector, LocalSelectorStatistic};
39use super::GroupStateValidator;
40use crate::MetaOpts;
41use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy};
42use crate::hummock::compaction::picker::CompactionInput;
43use crate::hummock::level_handler::LevelHandler;
44use crate::hummock::model::CompactionGroup;
45
46#[derive(Clone)]
47pub struct CompactStatus {
48    pub compaction_group_id: CompactionGroupId,
49    pub level_handlers: Vec<LevelHandler>,
50}
51
52impl Debug for CompactStatus {
53    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54        f.debug_struct("CompactStatus")
55            .field("compaction_group_id", &self.compaction_group_id)
56            .field("level_handlers", &self.level_handlers)
57            .finish()
58    }
59}
60
61impl PartialEq for CompactStatus {
62    fn eq(&self, other: &Self) -> bool {
63        self.level_handlers.eq(&other.level_handlers)
64            && self.compaction_group_id == other.compaction_group_id
65    }
66}
67
68pub struct CompactionTask {
69    pub input: CompactionInput,
70    pub base_level: usize,
71    pub compression_algorithm: String,
72    pub target_file_size: u64,
73    pub compaction_task_type: compact_task::TaskType,
74}
75
76pub fn create_overlap_strategy(compaction_mode: CompactionMode) -> Arc<dyn OverlapStrategy> {
77    match compaction_mode {
78        CompactionMode::Range => Arc::new(RangeOverlapStrategy::default()),
79        CompactionMode::Unspecified => unreachable!(),
80    }
81}
82
83impl CompactStatus {
84    pub fn new(compaction_group_id: CompactionGroupId, max_level: u64) -> CompactStatus {
85        let mut level_handlers = vec![];
86        for level in 0..=max_level {
87            level_handlers.push(LevelHandler::new(level as u32));
88        }
89        CompactStatus {
90            compaction_group_id,
91            level_handlers,
92        }
93    }
94
95    #[allow(clippy::too_many_arguments)]
96    pub fn get_compact_task(
97        &mut self,
98        levels: &Levels,
99        member_table_ids: &BTreeSet<TableId>,
100        task_id: HummockCompactionTaskId,
101        group: &CompactionGroup,
102        stats: &mut LocalSelectorStatistic,
103        selector: &mut Box<dyn CompactionSelector>,
104        table_id_to_options: &HashMap<u32, TableOption>,
105        developer_config: Arc<CompactionDeveloperConfig>,
106        table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
107        state_table_info: &HummockVersionStateTableInfo,
108    ) -> Option<CompactionTask> {
109        let selector_context = CompactionSelectorContext {
110            group,
111            levels,
112            member_table_ids,
113            level_handlers: &mut self.level_handlers,
114            selector_stats: stats,
115            table_id_to_options,
116            developer_config: developer_config.clone(),
117            table_watermarks,
118            state_table_info,
119        };
120        // When we compact the files, we must make the result of compaction meet the following
121        // conditions, for any user key, the epoch of it in the file existing in the lower
122        // layer must be larger.
123        match selector.pick_compaction(task_id, selector_context) {
124            Some(task) => {
125                return Some(task);
126            }
127            _ => {
128                let compaction_group_config = &group.compaction_config;
129                let group_state =
130                    GroupStateValidator::group_state(levels, compaction_group_config.as_ref());
131                if (group_state.is_write_stop() || group_state.is_emergency())
132                    && compaction_group_config.enable_emergency_picker
133                {
134                    let selector_context = CompactionSelectorContext {
135                        group,
136                        levels,
137                        member_table_ids,
138                        level_handlers: &mut self.level_handlers,
139                        selector_stats: stats,
140                        table_id_to_options,
141                        developer_config,
142                        table_watermarks,
143                        state_table_info,
144                    };
145                    return EmergencySelector::default().pick_compaction(task_id, selector_context);
146                }
147            }
148        }
149
150        None
151    }
152
153    pub fn report_compact_task(&mut self, compact_task: &CompactTask) {
154        for level in &compact_task.input_ssts {
155            self.level_handlers[level.level_idx as usize].remove_task(compact_task.task_id);
156        }
157    }
158
159    pub fn compaction_group_id(&self) -> CompactionGroupId {
160        self.compaction_group_id
161    }
162}
163
164pub fn create_compaction_task(
165    compaction_config: &CompactionConfig,
166    input: CompactionInput,
167    base_level: usize,
168    compaction_task_type: compact_task::TaskType,
169) -> CompactionTask {
170    let target_file_size = if input.target_level == 0 {
171        compaction_config.target_file_size_base
172    } else {
173        assert!(input.target_level >= base_level);
174        let step = (input.target_level - base_level) / 2;
175        compaction_config.target_file_size_base << step
176    };
177
178    CompactionTask {
179        compression_algorithm: get_compression_algorithm(
180            compaction_config,
181            base_level,
182            input.target_level,
183        ),
184        base_level,
185        input,
186        target_file_size,
187        compaction_task_type,
188    }
189}
190
191pub fn get_compression_algorithm(
192    compaction_config: &CompactionConfig,
193    base_level: usize,
194    level: usize,
195) -> String {
196    if level == 0 || level < base_level {
197        compaction_config.compression_algorithm[0].clone()
198    } else {
199        let idx = level - base_level + 1;
200        compaction_config.compression_algorithm[idx].clone()
201    }
202}
203
204pub struct CompactionDeveloperConfig {
205    /// l0 picker whether to select trivial move task
206    pub enable_trivial_move: bool,
207
208    /// l0 multi level picker whether to check the overlap accuracy between sub levels
209    pub enable_check_task_level_overlap: bool,
210}
211
212impl CompactionDeveloperConfig {
213    pub fn new_from_meta_opts(opts: &MetaOpts) -> Self {
214        Self {
215            enable_trivial_move: opts.enable_trivial_move,
216            enable_check_task_level_overlap: opts.enable_check_task_level_overlap,
217        }
218    }
219}
220
221impl Default for CompactionDeveloperConfig {
222    fn default() -> Self {
223        Self {
224            enable_trivial_move: true,
225            enable_check_task_level_overlap: true,
226        }
227    }
228}