risingwave_meta/hummock/compaction/
mod.rs1#![expect(clippy::arc_with_non_send_sync, reason = "FIXME: later")]
16
17pub mod compaction_config;
18mod overlap_strategy;
19use risingwave_common::catalog::{TableId, TableOption};
20use risingwave_hummock_sdk::compact_task::CompactTask;
21use risingwave_hummock_sdk::level::Levels;
22use risingwave_pb::hummock::compact_task::{self};
23
24mod picker;
25pub mod selector;
26use std::collections::{BTreeSet, HashMap};
27use std::fmt::{Debug, Formatter};
28use std::sync::Arc;
29
30use picker::{LevelCompactionPicker, TierCompactionPicker};
31use risingwave_hummock_sdk::table_watermark::TableWatermarks;
32use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
33use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
34use risingwave_pb::hummock::CompactionConfig;
35use risingwave_pb::hummock::compaction_config::CompactionMode;
36pub use selector::{CompactionSelector, CompactionSelectorContext};
37
38use self::selector::{EmergencySelector, LocalSelectorStatistic};
39use super::GroupStateValidator;
40use crate::MetaOpts;
41use crate::hummock::compaction::overlap_strategy::{OverlapStrategy, RangeOverlapStrategy};
42use crate::hummock::compaction::picker::CompactionInput;
43use crate::hummock::level_handler::LevelHandler;
44use crate::hummock::model::CompactionGroup;
45
46#[derive(Clone)]
47pub struct CompactStatus {
48 pub compaction_group_id: CompactionGroupId,
49 pub level_handlers: Vec<LevelHandler>,
50}
51
52impl Debug for CompactStatus {
53 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct("CompactStatus")
55 .field("compaction_group_id", &self.compaction_group_id)
56 .field("level_handlers", &self.level_handlers)
57 .finish()
58 }
59}
60
61impl PartialEq for CompactStatus {
62 fn eq(&self, other: &Self) -> bool {
63 self.level_handlers.eq(&other.level_handlers)
64 && self.compaction_group_id == other.compaction_group_id
65 }
66}
67
68pub struct CompactionTask {
69 pub input: CompactionInput,
70 pub base_level: usize,
71 pub compression_algorithm: String,
72 pub target_file_size: u64,
73 pub compaction_task_type: compact_task::TaskType,
74}
75
76pub fn create_overlap_strategy(compaction_mode: CompactionMode) -> Arc<dyn OverlapStrategy> {
77 match compaction_mode {
78 CompactionMode::Range => Arc::new(RangeOverlapStrategy::default()),
79 CompactionMode::Unspecified => unreachable!(),
80 }
81}
82
83impl CompactStatus {
84 pub fn new(compaction_group_id: CompactionGroupId, max_level: u64) -> CompactStatus {
85 let mut level_handlers = vec![];
86 for level in 0..=max_level {
87 level_handlers.push(LevelHandler::new(level as u32));
88 }
89 CompactStatus {
90 compaction_group_id,
91 level_handlers,
92 }
93 }
94
95 #[allow(clippy::too_many_arguments)]
96 pub fn get_compact_task(
97 &mut self,
98 levels: &Levels,
99 member_table_ids: &BTreeSet<TableId>,
100 task_id: HummockCompactionTaskId,
101 group: &CompactionGroup,
102 stats: &mut LocalSelectorStatistic,
103 selector: &mut Box<dyn CompactionSelector>,
104 table_id_to_options: &HashMap<u32, TableOption>,
105 developer_config: Arc<CompactionDeveloperConfig>,
106 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
107 state_table_info: &HummockVersionStateTableInfo,
108 ) -> Option<CompactionTask> {
109 let selector_context = CompactionSelectorContext {
110 group,
111 levels,
112 member_table_ids,
113 level_handlers: &mut self.level_handlers,
114 selector_stats: stats,
115 table_id_to_options,
116 developer_config: developer_config.clone(),
117 table_watermarks,
118 state_table_info,
119 };
120 match selector.pick_compaction(task_id, selector_context) {
124 Some(task) => {
125 return Some(task);
126 }
127 _ => {
128 let compaction_group_config = &group.compaction_config;
129 let group_state =
130 GroupStateValidator::group_state(levels, compaction_group_config.as_ref());
131 if (group_state.is_write_stop() || group_state.is_emergency())
132 && compaction_group_config.enable_emergency_picker
133 {
134 let selector_context = CompactionSelectorContext {
135 group,
136 levels,
137 member_table_ids,
138 level_handlers: &mut self.level_handlers,
139 selector_stats: stats,
140 table_id_to_options,
141 developer_config,
142 table_watermarks,
143 state_table_info,
144 };
145 return EmergencySelector::default().pick_compaction(task_id, selector_context);
146 }
147 }
148 }
149
150 None
151 }
152
153 pub fn report_compact_task(&mut self, compact_task: &CompactTask) {
154 for level in &compact_task.input_ssts {
155 self.level_handlers[level.level_idx as usize].remove_task(compact_task.task_id);
156 }
157 }
158
159 pub fn compaction_group_id(&self) -> CompactionGroupId {
160 self.compaction_group_id
161 }
162}
163
164pub fn create_compaction_task(
165 compaction_config: &CompactionConfig,
166 input: CompactionInput,
167 base_level: usize,
168 compaction_task_type: compact_task::TaskType,
169) -> CompactionTask {
170 let target_file_size = if input.target_level == 0 {
171 compaction_config.target_file_size_base
172 } else {
173 assert!(input.target_level >= base_level);
174 let step = (input.target_level - base_level) / 2;
175 compaction_config.target_file_size_base << step
176 };
177
178 CompactionTask {
179 compression_algorithm: get_compression_algorithm(
180 compaction_config,
181 base_level,
182 input.target_level,
183 ),
184 base_level,
185 input,
186 target_file_size,
187 compaction_task_type,
188 }
189}
190
191pub fn get_compression_algorithm(
192 compaction_config: &CompactionConfig,
193 base_level: usize,
194 level: usize,
195) -> String {
196 if level == 0 || level < base_level {
197 compaction_config.compression_algorithm[0].clone()
198 } else {
199 let idx = level - base_level + 1;
200 compaction_config.compression_algorithm[idx].clone()
201 }
202}
203
204pub struct CompactionDeveloperConfig {
205 pub enable_trivial_move: bool,
207
208 pub enable_check_task_level_overlap: bool,
210}
211
212impl CompactionDeveloperConfig {
213 pub fn new_from_meta_opts(opts: &MetaOpts) -> Self {
214 Self {
215 enable_trivial_move: opts.enable_trivial_move,
216 enable_check_task_level_overlap: opts.enable_check_task_level_overlap,
217 }
218 }
219}
220
221impl Default for CompactionDeveloperConfig {
222 fn default() -> Self {
223 Self {
224 enable_trivial_move: true,
225 enable_check_task_level_overlap: true,
226 }
227 }
228}