risingwave_meta/hummock/compaction/
mod.rs1#![expect(clippy::arc_with_non_send_sync, reason = "FIXME: later")]
16
17pub mod compaction_config;
18pub(crate) mod in_progress_compaction;
19mod overlap_strategy;
20use risingwave_common::catalog::{TableId, TableOption};
21use risingwave_hummock_sdk::compact_task::CompactTask;
22use risingwave_hummock_sdk::level::Levels;
23use risingwave_pb::hummock::compact_task::{self};
24
25mod picker;
26pub mod selector;
27use std::collections::{BTreeSet, HashMap};
28use std::fmt::{Debug, Formatter};
29use std::sync::Arc;
30
31use picker::{LevelCompactionPicker, TierCompactionPicker};
32use risingwave_hummock_sdk::filter_utils::{
33 must_resolve_sstable_filter_layout, must_resolve_sstable_filter_type,
34};
35use risingwave_hummock_sdk::table_watermark::TableWatermarks;
36use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
37use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
38use risingwave_pb::hummock::compaction_config::CompactionMode;
39use risingwave_pb::hummock::{CompactionConfig, PbSstableFilterLayout, PbSstableFilterType};
40pub use selector::{CompactionSelector, CompactionSelectorContext};
41
42use self::selector::{EmergencySelector, LocalSelectorStatistic};
43use super::GroupStateValidator;
44use crate::MetaOpts;
45use crate::hummock::compaction::in_progress_compaction::InProgressCompactionView;
46use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy};
47use crate::hummock::compaction::picker::CompactionInput;
48use crate::hummock::level_handler::LevelHandler;
49use crate::hummock::model::CompactionGroup;
50
51#[derive(Clone)]
52pub struct CompactStatus {
53 pub compaction_group_id: CompactionGroupId,
54 pub level_handlers: Vec<LevelHandler>,
55}
56
57impl Debug for CompactStatus {
58 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("CompactStatus")
60 .field("compaction_group_id", &self.compaction_group_id)
61 .field("level_handlers", &self.level_handlers)
62 .finish()
63 }
64}
65
66impl PartialEq for CompactStatus {
67 fn eq(&self, other: &Self) -> bool {
68 self.level_handlers.eq(&other.level_handlers)
69 && self.compaction_group_id == other.compaction_group_id
70 }
71}
72
73pub struct CompactionTask {
74 pub input: CompactionInput,
75 pub base_level: usize,
76 pub compression_algorithm: String,
77 pub sstable_filter_type: PbSstableFilterType,
78 pub sstable_filter_layout: PbSstableFilterLayout,
79 pub target_file_size: u64,
80 pub compaction_task_type: compact_task::TaskType,
81}
82
83pub fn create_overlap_strategy(compaction_mode: CompactionMode) -> Arc<dyn OverlapStrategy> {
84 match compaction_mode {
85 CompactionMode::Range => Arc::new(RangeOverlapStrategy::default()),
86 CompactionMode::Unspecified => unreachable!(),
87 }
88}
89
90impl CompactStatus {
91 pub fn new(compaction_group_id: CompactionGroupId, max_level: u64) -> CompactStatus {
92 let mut level_handlers = vec![];
93 for level in 0..=max_level {
94 level_handlers.push(LevelHandler::new(level as u32));
95 }
96 CompactStatus {
97 compaction_group_id,
98 level_handlers,
99 }
100 }
101
102 #[expect(clippy::too_many_arguments)]
103 pub fn get_compact_task(
104 &mut self,
105 levels: &Levels,
106 member_table_ids: &BTreeSet<TableId>,
107 task_id: HummockCompactionTaskId,
108 group: &CompactionGroup,
109 stats: &mut LocalSelectorStatistic,
110 selector: &mut dyn CompactionSelector,
111 table_id_to_options: &HashMap<TableId, TableOption>,
112 developer_config: Arc<CompactionDeveloperConfig>,
113 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
114 state_table_info: &HummockVersionStateTableInfo,
115 in_progress_compactions: &InProgressCompactionView,
116 ) -> Option<CompactionTask> {
117 let selector_context = CompactionSelectorContext {
118 group,
119 levels,
120 member_table_ids,
121 level_handlers: &mut self.level_handlers,
122 selector_stats: stats,
123 table_id_to_options,
124 developer_config: developer_config.clone(),
125 table_watermarks,
126 state_table_info,
127 in_progress_compactions,
128 };
129 match selector.pick_compaction(task_id, selector_context) {
133 Some(task) => {
134 return Some(task);
135 }
136 _ => {
137 let compaction_group_config = &group.compaction_config;
138 let group_state =
139 GroupStateValidator::group_state(levels, compaction_group_config.as_ref());
140 if (group_state.is_write_stop() || group_state.is_emergency())
141 && compaction_group_config.enable_emergency_picker
142 {
143 let selector_context = CompactionSelectorContext {
144 group,
145 levels,
146 member_table_ids,
147 level_handlers: &mut self.level_handlers,
148 selector_stats: stats,
149 table_id_to_options,
150 developer_config,
151 table_watermarks,
152 state_table_info,
153 in_progress_compactions,
154 };
155 return EmergencySelector::default().pick_compaction(task_id, selector_context);
156 }
157 }
158 }
159
160 None
161 }
162
163 pub fn report_compact_task(&mut self, compact_task: &CompactTask) {
164 for level in &compact_task.input_ssts {
165 self.level_handlers[level.level_idx as usize].remove_task(compact_task.task_id);
166 }
167 }
168
169 pub fn compaction_group_id(&self) -> CompactionGroupId {
170 self.compaction_group_id
171 }
172}
173
174pub fn create_compaction_task(
175 compaction_config: &CompactionConfig,
176 input: CompactionInput,
177 base_level: usize,
178 compaction_task_type: compact_task::TaskType,
179) -> CompactionTask {
180 let target_file_size = if input.target_level == 0 {
181 compaction_config.target_file_size_base
182 } else {
183 assert!(input.target_level >= base_level);
184 let step = (input.target_level - base_level) / 2;
185 compaction_config.target_file_size_base << step
186 };
187
188 CompactionTask {
189 compression_algorithm: get_compression_algorithm(
190 compaction_config,
191 base_level,
192 input.target_level,
193 ),
194 sstable_filter_type: must_resolve_sstable_filter_type(
195 compaction_config,
196 base_level,
197 input.target_level,
198 ),
199 sstable_filter_layout: must_resolve_sstable_filter_layout(
200 compaction_config,
201 base_level,
202 input.target_level,
203 ),
204 base_level,
205 input,
206 target_file_size,
207 compaction_task_type,
208 }
209}
210
211pub fn get_compression_algorithm(
212 compaction_config: &CompactionConfig,
213 base_level: usize,
214 level: usize,
215) -> String {
216 if level == 0 || level < base_level {
217 compaction_config.compression_algorithm[0].clone()
218 } else {
219 let idx = level - base_level + 1;
220 compaction_config.compression_algorithm[idx].clone()
221 }
222}
223
224pub struct CompactionDeveloperConfig {
225 pub enable_trivial_move: bool,
227
228 pub enable_check_task_level_overlap: bool,
230}
231
232impl CompactionDeveloperConfig {
233 pub fn new_from_meta_opts(opts: &MetaOpts) -> Self {
234 Self {
235 enable_trivial_move: opts.enable_trivial_move,
236 enable_check_task_level_overlap: opts.enable_check_task_level_overlap,
237 }
238 }
239}
240
241impl Default for CompactionDeveloperConfig {
242 fn default() -> Self {
243 Self {
244 enable_trivial_move: true,
245 enable_check_task_level_overlap: true,
246 }
247 }
248}