risingwave_meta/hummock/compaction/
mod.rs1#![expect(clippy::arc_with_non_send_sync, reason = "FIXME: later")]
16
17pub mod compaction_config;
18mod overlap_strategy;
19use risingwave_common::catalog::{TableId, TableOption};
20use risingwave_hummock_sdk::compact_task::CompactTask;
21use risingwave_hummock_sdk::level::Levels;
22use risingwave_pb::hummock::compact_task::{self};
23
24mod picker;
25pub mod selector;
26use std::collections::{BTreeSet, HashMap};
27use std::fmt::{Debug, Formatter};
28use std::sync::Arc;
29
30use picker::{LevelCompactionPicker, TierCompactionPicker};
31use risingwave_hummock_sdk::filter_utils::{
32 must_resolve_sstable_filter_kind, must_resolve_sstable_filter_layout,
33};
34use risingwave_hummock_sdk::table_watermark::TableWatermarks;
35use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
36use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
37use risingwave_pb::hummock::compaction_config::CompactionMode;
38use risingwave_pb::hummock::{CompactionConfig, PbSstableFilterLayout, PbSstableFilterType};
39pub use selector::{CompactionSelector, CompactionSelectorContext};
40
41use self::selector::{EmergencySelector, LocalSelectorStatistic};
42use super::GroupStateValidator;
43use crate::MetaOpts;
44use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy};
45use crate::hummock::compaction::picker::CompactionInput;
46use crate::hummock::level_handler::LevelHandler;
47use crate::hummock::model::CompactionGroup;
48
49#[derive(Clone)]
50pub struct CompactStatus {
51 pub compaction_group_id: CompactionGroupId,
52 pub level_handlers: Vec<LevelHandler>,
53}
54
55impl Debug for CompactStatus {
56 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57 f.debug_struct("CompactStatus")
58 .field("compaction_group_id", &self.compaction_group_id)
59 .field("level_handlers", &self.level_handlers)
60 .finish()
61 }
62}
63
64impl PartialEq for CompactStatus {
65 fn eq(&self, other: &Self) -> bool {
66 self.level_handlers.eq(&other.level_handlers)
67 && self.compaction_group_id == other.compaction_group_id
68 }
69}
70
71pub struct CompactionTask {
72 pub input: CompactionInput,
73 pub base_level: usize,
74 pub compression_algorithm: String,
75 pub sstable_filter_kind: PbSstableFilterType,
76 pub sstable_filter_layout: PbSstableFilterLayout,
77 pub target_file_size: u64,
78 pub compaction_task_type: compact_task::TaskType,
79}
80
81pub fn create_overlap_strategy(compaction_mode: CompactionMode) -> Arc<dyn OverlapStrategy> {
82 match compaction_mode {
83 CompactionMode::Range => Arc::new(RangeOverlapStrategy::default()),
84 CompactionMode::Unspecified => unreachable!(),
85 }
86}
87
88impl CompactStatus {
89 pub fn new(compaction_group_id: CompactionGroupId, max_level: u64) -> CompactStatus {
90 let mut level_handlers = vec![];
91 for level in 0..=max_level {
92 level_handlers.push(LevelHandler::new(level as u32));
93 }
94 CompactStatus {
95 compaction_group_id,
96 level_handlers,
97 }
98 }
99
100 #[expect(clippy::too_many_arguments)]
101 pub fn get_compact_task(
102 &mut self,
103 levels: &Levels,
104 member_table_ids: &BTreeSet<TableId>,
105 task_id: HummockCompactionTaskId,
106 group: &CompactionGroup,
107 stats: &mut LocalSelectorStatistic,
108 selector: &mut dyn CompactionSelector,
109 table_id_to_options: &HashMap<TableId, TableOption>,
110 developer_config: Arc<CompactionDeveloperConfig>,
111 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
112 state_table_info: &HummockVersionStateTableInfo,
113 ) -> Option<CompactionTask> {
114 let selector_context = CompactionSelectorContext {
115 group,
116 levels,
117 member_table_ids,
118 level_handlers: &mut self.level_handlers,
119 selector_stats: stats,
120 table_id_to_options,
121 developer_config: developer_config.clone(),
122 table_watermarks,
123 state_table_info,
124 };
125 match selector.pick_compaction(task_id, selector_context) {
129 Some(task) => {
130 return Some(task);
131 }
132 _ => {
133 let compaction_group_config = &group.compaction_config;
134 let group_state =
135 GroupStateValidator::group_state(levels, compaction_group_config.as_ref());
136 if (group_state.is_write_stop() || group_state.is_emergency())
137 && compaction_group_config.enable_emergency_picker
138 {
139 let selector_context = CompactionSelectorContext {
140 group,
141 levels,
142 member_table_ids,
143 level_handlers: &mut self.level_handlers,
144 selector_stats: stats,
145 table_id_to_options,
146 developer_config,
147 table_watermarks,
148 state_table_info,
149 };
150 return EmergencySelector::default().pick_compaction(task_id, selector_context);
151 }
152 }
153 }
154
155 None
156 }
157
158 pub fn report_compact_task(&mut self, compact_task: &CompactTask) {
159 for level in &compact_task.input_ssts {
160 self.level_handlers[level.level_idx as usize].remove_task(compact_task.task_id);
161 }
162 }
163
164 pub fn compaction_group_id(&self) -> CompactionGroupId {
165 self.compaction_group_id
166 }
167}
168
169pub fn create_compaction_task(
170 compaction_config: &CompactionConfig,
171 input: CompactionInput,
172 base_level: usize,
173 compaction_task_type: compact_task::TaskType,
174) -> CompactionTask {
175 let target_file_size = if input.target_level == 0 {
176 compaction_config.target_file_size_base
177 } else {
178 assert!(input.target_level >= base_level);
179 let step = (input.target_level - base_level) / 2;
180 compaction_config.target_file_size_base << step
181 };
182
183 CompactionTask {
184 compression_algorithm: get_compression_algorithm(
185 compaction_config,
186 base_level,
187 input.target_level,
188 ),
189 sstable_filter_kind: must_resolve_sstable_filter_kind(
190 compaction_config,
191 base_level,
192 input.target_level,
193 ),
194 sstable_filter_layout: must_resolve_sstable_filter_layout(
195 compaction_config,
196 base_level,
197 input.target_level,
198 ),
199 base_level,
200 input,
201 target_file_size,
202 compaction_task_type,
203 }
204}
205
206pub fn get_compression_algorithm(
207 compaction_config: &CompactionConfig,
208 base_level: usize,
209 level: usize,
210) -> String {
211 if level == 0 || level < base_level {
212 compaction_config.compression_algorithm[0].clone()
213 } else {
214 let idx = level - base_level + 1;
215 compaction_config.compression_algorithm[idx].clone()
216 }
217}
218
219pub struct CompactionDeveloperConfig {
220 pub enable_trivial_move: bool,
222
223 pub enable_check_task_level_overlap: bool,
225}
226
227impl CompactionDeveloperConfig {
228 pub fn new_from_meta_opts(opts: &MetaOpts) -> Self {
229 Self {
230 enable_trivial_move: opts.enable_trivial_move,
231 enable_check_task_level_overlap: opts.enable_check_task_level_overlap,
232 }
233 }
234}
235
236impl Default for CompactionDeveloperConfig {
237 fn default() -> Self {
238 Self {
239 enable_trivial_move: true,
240 enable_check_task_level_overlap: true,
241 }
242 }
243}