risingwave_meta/hummock/compaction/selector/
manual_selector.rs1use std::collections::HashSet;
21
22use bytes::Bytes;
23use risingwave_hummock_sdk::compaction_group::StateTableId;
24use risingwave_hummock_sdk::key_range::KeyRange;
25use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockSstableId};
26use risingwave_pb::hummock::compact_task;
27
28use super::{CompactionSelector, DynamicLevelSelectorCore};
29use crate::hummock::compaction::picker::{
30 CompactionPicker, LocalPickerStatistic, ManualCompactionPicker,
31};
32use crate::hummock::compaction::selector::CompactionSelectorContext;
33use crate::hummock::compaction::{CompactionTask, create_compaction_task, create_overlap_strategy};
34
35#[derive(Clone, Debug, PartialEq)]
36pub struct ManualCompactionOption {
37 pub sst_ids: Vec<HummockSstableId>,
39 pub key_range: KeyRange,
41 pub internal_table_id: HashSet<StateTableId>,
43 pub level: usize,
45 pub target_level: Option<usize>,
47 pub exclusive: bool,
49}
50
51impl Default for ManualCompactionOption {
52 fn default() -> Self {
53 Self {
54 sst_ids: vec![],
55 key_range: KeyRange {
56 left: Bytes::default(),
57 right: Bytes::default(),
58 right_exclusive: false,
59 },
60 internal_table_id: HashSet::default(),
61 level: 1,
62 target_level: None,
63 exclusive: false,
64 }
65 }
66}
67
68pub struct ManualCompactionSelector {
69 option: ManualCompactionOption,
70 blocked_by_pending: bool,
71 validation_error: Option<String>,
72}
73
74impl ManualCompactionSelector {
75 pub fn new(option: ManualCompactionOption) -> Self {
76 Self {
77 option,
78 blocked_by_pending: false,
79 validation_error: None,
80 }
81 }
82
83 pub fn blocked_by_pending(&self) -> bool {
84 self.blocked_by_pending
85 }
86
87 pub fn validation_error(&self) -> Option<&str> {
88 self.validation_error.as_deref()
89 }
90}
91
92impl CompactionSelector for ManualCompactionSelector {
93 fn pick_compaction(
94 &mut self,
95 task_id: HummockCompactionTaskId,
96 context: CompactionSelectorContext<'_>,
97 ) -> Option<CompactionTask> {
98 let CompactionSelectorContext {
99 group,
100 levels,
101 level_handlers,
102 developer_config,
103 in_progress_compactions,
104 ..
105 } = context;
106 self.blocked_by_pending = false;
107 self.validation_error = None;
108
109 let dynamic_level_core =
110 DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
111 let overlap_strategy = create_overlap_strategy(group.compaction_config.compaction_mode());
112 let ctx = dynamic_level_core.calculate_level_base_size(levels);
113 let (mut picker, base_level) = {
114 let max_level = group.compaction_config.max_level as usize;
115 if self.option.level > max_level {
116 self.validation_error = Some(format!(
117 "level {} exceeds max_level {}",
118 self.option.level, max_level
119 ));
120 return None;
121 }
122 let target_level = self.option.target_level.unwrap_or_else(|| {
123 if self.option.level == 0 {
124 if self.option.sst_ids.is_empty() {
125 ctx.base_level
126 } else {
127 0
128 }
129 } else if self.option.level == group.compaction_config.max_level as usize {
130 self.option.level
131 } else {
132 self.option.level + 1
133 }
134 });
135 if target_level > max_level {
136 self.validation_error = Some(format!(
137 "target_level {} exceeds max_level {}",
138 target_level, max_level
139 ));
140 return None;
141 }
142 if self.option.level == 0 {
143 let expected_target_level = if self.option.sst_ids.is_empty() {
144 ctx.base_level
145 } else {
146 0
147 };
148 if target_level != expected_target_level {
149 self.validation_error = Some(format!(
150 "target_level for L0 must be {}, got {}",
151 expected_target_level, target_level
152 ));
153 return None;
154 }
155 }
156 if self.option.level > 0
157 && target_level != self.option.level
158 && target_level != self.option.level + 1
159 {
160 self.validation_error = Some(format!(
161 "target_level for L{} must be {} or {}, got {}",
162 self.option.level,
163 self.option.level,
164 self.option.level + 1,
165 target_level
166 ));
167 return None;
168 }
169 if self.option.level > 0 && self.option.level < ctx.base_level {
170 return None;
171 }
172 (
173 ManualCompactionPicker::new(overlap_strategy, self.option.clone(), target_level),
174 ctx.base_level,
175 )
176 };
177
178 let compaction_input =
179 picker.pick_compaction(levels, level_handlers, &mut LocalPickerStatistic::default());
180
181 if compaction_input.is_none()
182 && self.option.exclusive
183 && level_handlers
184 .iter()
185 .any(|level_handler| level_handler.pending_file_count() > 0)
186 {
187 self.blocked_by_pending = true;
188 }
189
190 let compaction_input = compaction_input?;
191 if !compaction_input.skip_target_range_conflict_check
192 && in_progress_compactions.has_conflict_with_input(&compaction_input)
193 {
194 return None;
195 }
196 compaction_input.add_pending_task(task_id, level_handlers);
197
198 Some(create_compaction_task(
199 group.compaction_config.as_ref(),
200 compaction_input,
201 base_level,
202 self.task_type(),
203 ))
204 }
205
206 fn name(&self) -> &'static str {
207 "ManualCompactionSelector"
208 }
209
210 fn task_type(&self) -> compact_task::TaskType {
211 compact_task::TaskType::Manual
212 }
213}