risingwave_meta/hummock/compaction/selector/
manual_selector.rs1use std::collections::HashSet;
21
22use bytes::Bytes;
23use risingwave_hummock_sdk::compaction_group::StateTableId;
24use risingwave_hummock_sdk::key_range::KeyRange;
25use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockSstableId};
26use risingwave_pb::hummock::compact_task;
27
28use super::{CompactionSelector, DynamicLevelSelectorCore};
29use crate::hummock::compaction::picker::{
30 CompactionPicker, LocalPickerStatistic, ManualCompactionPicker,
31};
32use crate::hummock::compaction::selector::CompactionSelectorContext;
33use crate::hummock::compaction::{CompactionTask, create_compaction_task, create_overlap_strategy};
34
35#[derive(Clone, Debug, PartialEq)]
36pub struct ManualCompactionOption {
37 pub sst_ids: Vec<HummockSstableId>,
39 pub key_range: KeyRange,
41 pub internal_table_id: HashSet<StateTableId>,
43 pub level: usize,
45 pub target_level: Option<usize>,
47 pub exclusive: bool,
49}
50
51impl Default for ManualCompactionOption {
52 fn default() -> Self {
53 Self {
54 sst_ids: vec![],
55 key_range: KeyRange {
56 left: Bytes::default(),
57 right: Bytes::default(),
58 right_exclusive: false,
59 },
60 internal_table_id: HashSet::default(),
61 level: 1,
62 target_level: None,
63 exclusive: false,
64 }
65 }
66}
67
68pub struct ManualCompactionSelector {
69 option: ManualCompactionOption,
70 blocked_by_pending: bool,
71 validation_error: Option<String>,
72}
73
74impl ManualCompactionSelector {
75 pub fn new(option: ManualCompactionOption) -> Self {
76 Self {
77 option,
78 blocked_by_pending: false,
79 validation_error: None,
80 }
81 }
82
83 pub fn blocked_by_pending(&self) -> bool {
84 self.blocked_by_pending
85 }
86
87 pub fn validation_error(&self) -> Option<&str> {
88 self.validation_error.as_deref()
89 }
90}
91
92impl CompactionSelector for ManualCompactionSelector {
93 fn pick_compaction(
94 &mut self,
95 task_id: HummockCompactionTaskId,
96 context: CompactionSelectorContext<'_>,
97 ) -> Option<CompactionTask> {
98 let CompactionSelectorContext {
99 group,
100 levels,
101 level_handlers,
102 developer_config,
103 ..
104 } = context;
105 self.blocked_by_pending = false;
106 self.validation_error = None;
107
108 let dynamic_level_core =
109 DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
110 let overlap_strategy = create_overlap_strategy(group.compaction_config.compaction_mode());
111 let ctx = dynamic_level_core.calculate_level_base_size(levels);
112 let (mut picker, base_level) = {
113 let max_level = group.compaction_config.max_level as usize;
114 if self.option.level > max_level {
115 self.validation_error = Some(format!(
116 "level {} exceeds max_level {}",
117 self.option.level, max_level
118 ));
119 return None;
120 }
121 let target_level = self.option.target_level.unwrap_or_else(|| {
122 if self.option.level == 0 {
123 if self.option.sst_ids.is_empty() {
124 ctx.base_level
125 } else {
126 0
127 }
128 } else if self.option.level == group.compaction_config.max_level as usize {
129 self.option.level
130 } else {
131 self.option.level + 1
132 }
133 });
134 if target_level > max_level {
135 self.validation_error = Some(format!(
136 "target_level {} exceeds max_level {}",
137 target_level, max_level
138 ));
139 return None;
140 }
141 if self.option.level == 0 {
142 let expected_target_level = if self.option.sst_ids.is_empty() {
143 ctx.base_level
144 } else {
145 0
146 };
147 if target_level != expected_target_level {
148 self.validation_error = Some(format!(
149 "target_level for L0 must be {}, got {}",
150 expected_target_level, target_level
151 ));
152 return None;
153 }
154 }
155 if self.option.level > 0
156 && target_level != self.option.level
157 && target_level != self.option.level + 1
158 {
159 self.validation_error = Some(format!(
160 "target_level for L{} must be {} or {}, got {}",
161 self.option.level,
162 self.option.level,
163 self.option.level + 1,
164 target_level
165 ));
166 return None;
167 }
168 if self.option.level > 0 && self.option.level < ctx.base_level {
169 return None;
170 }
171 (
172 ManualCompactionPicker::new(overlap_strategy, self.option.clone(), target_level),
173 ctx.base_level,
174 )
175 };
176
177 let compaction_input =
178 picker.pick_compaction(levels, level_handlers, &mut LocalPickerStatistic::default());
179
180 if compaction_input.is_none()
181 && self.option.exclusive
182 && level_handlers
183 .iter()
184 .any(|level_handler| level_handler.pending_file_count() > 0)
185 {
186 self.blocked_by_pending = true;
187 }
188
189 let compaction_input = compaction_input?;
190 compaction_input.add_pending_task(task_id, level_handlers);
191
192 Some(create_compaction_task(
193 group.compaction_config.as_ref(),
194 compaction_input,
195 base_level,
196 self.task_type(),
197 ))
198 }
199
200 fn name(&self) -> &'static str {
201 "ManualCompactionSelector"
202 }
203
204 fn task_type(&self) -> compact_task::TaskType {
205 compact_task::TaskType::Manual
206 }
207}