1use std::collections::HashSet;
16
17use risingwave_hummock_sdk::level::{InputLevel, Levels};
18use risingwave_hummock_sdk::sstable_info::SstableInfo;
19
20use super::CompactionInput;
21use crate::hummock::level_handler::LevelHandler;
22
23pub struct SpaceReclaimCompactionPicker {
26 pub _max_space_reclaim_bytes: u64,
28
29 pub all_table_ids: HashSet<u32>,
31}
32
33#[derive(Default)]
36pub struct SpaceReclaimPickerState {
37 pub last_level: usize,
38}
39
40impl SpaceReclaimCompactionPicker {
41 pub fn new(max_space_reclaim_bytes: u64, all_table_ids: HashSet<u32>) -> Self {
42 Self {
43 _max_space_reclaim_bytes: max_space_reclaim_bytes,
44 all_table_ids,
45 }
46 }
47
48 fn exist_table_count(&self, sst: &SstableInfo) -> usize {
49 sst.table_ids
51 .iter()
52 .filter(|id| self.all_table_ids.contains(id))
53 .count()
54 }
55}
56
57impl SpaceReclaimCompactionPicker {
58 pub fn pick_compaction(
59 &mut self,
60 levels: &Levels,
61 level_handlers: &[LevelHandler],
62 state: &mut SpaceReclaimPickerState,
63 ) -> Option<CompactionInput> {
64 assert!(!levels.levels.is_empty());
65 let mut select_input_ssts = vec![];
66
67 if state.last_level == 0 {
68 let l0 = &levels.l0;
69 for level in &l0.sub_levels {
71 for sst in &level.table_infos {
72 let exist_count = self.exist_table_count(sst);
73 if exist_count == sst.table_ids.len()
74 || level_handlers[0].is_pending_compact(&sst.sst_id)
75 {
76 if !select_input_ssts.is_empty() {
77 break;
78 }
79 } else if exist_count == 0 {
80 select_input_ssts.push(sst.clone());
81 } else if !select_input_ssts.is_empty() {
82 break;
83 }
84 }
85 if !select_input_ssts.is_empty() {
86 return Some(CompactionInput {
87 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
88 total_file_count: select_input_ssts.len() as u64,
89 input_levels: vec![
90 InputLevel {
91 level_idx: level.level_idx,
92 level_type: level.level_type,
93 table_infos: select_input_ssts,
94 },
95 InputLevel {
96 level_idx: 0,
97 level_type: level.level_type,
98 table_infos: vec![],
99 },
100 ],
101 target_level: level.level_idx as usize,
102 target_sub_level_id: level.sub_level_id,
103 ..Default::default()
104 });
105 }
106 }
107 state.last_level = 1;
108 }
109 while state.last_level <= levels.levels.len() {
110 let mut is_trivial_task = true;
111 for sst in &levels.levels[state.last_level - 1].table_infos {
112 let exist_count = self.exist_table_count(sst);
113 let need_reclaim = exist_count < sst.table_ids.len();
114 let is_trivial_sst = exist_count == 0;
115 if !need_reclaim || level_handlers[state.last_level].is_pending_compact(&sst.sst_id)
116 {
117 if !select_input_ssts.is_empty() {
118 break;
123 }
124 continue;
125 }
126
127 if !is_trivial_sst {
128 if !select_input_ssts.is_empty() {
129 break;
130 }
131 is_trivial_task = false;
132 }
133
134 select_input_ssts.push(sst.clone());
135 if !is_trivial_task {
136 break;
137 }
138 }
139
140 if !select_input_ssts.is_empty() {
142 return Some(CompactionInput {
143 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
144 total_file_count: select_input_ssts.len() as u64,
145 input_levels: vec![
146 InputLevel {
147 level_idx: state.last_level as u32,
148 level_type: levels.levels[state.last_level - 1].level_type,
149 table_infos: select_input_ssts,
150 },
151 InputLevel {
152 level_idx: state.last_level as u32,
153 level_type: levels.levels[state.last_level - 1].level_type,
154 table_infos: vec![],
155 },
156 ],
157 target_level: state.last_level,
158 ..Default::default()
159 });
160 }
161 state.last_level += 1;
162 }
163 state.last_level = 0;
164 None
165 }
166}
167
168#[cfg(test)]
169mod test {
170
171 use std::collections::{BTreeSet, HashMap};
172 use std::sync::Arc;
173
174 use itertools::Itertools;
175 use risingwave_common::catalog::TableId;
176 use risingwave_hummock_sdk::key_range::KeyRange;
177 use risingwave_hummock_sdk::level::Level;
178 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
179 use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
180 pub use risingwave_pb::hummock::LevelType;
181 use risingwave_pb::hummock::compact_task;
182
183 use super::*;
184 use crate::hummock::compaction::CompactionDeveloperConfig;
185 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
186 use crate::hummock::compaction::selector::tests::{
187 assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level,
188 generate_table_with_ids_and_epochs,
189 };
190 use crate::hummock::compaction::selector::{
191 CompactionSelector, LocalSelectorStatistic, SpaceReclaimCompactionSelector,
192 };
193 use crate::hummock::model::CompactionGroup;
194 use crate::hummock::test_utils::compaction_selector_context;
195
196 #[test]
197 fn test_space_reclaim_compaction_selector() {
198 let max_space_reclaim_bytes = 400;
199 let config = CompactionConfigBuilder::new()
200 .max_level(4)
201 .max_space_reclaim_bytes(max_space_reclaim_bytes)
202 .build();
203 let group_config = CompactionGroup::new(1, config);
204
205 let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
206 assert_eq!(l0.sub_levels.len(), 0);
207 let mut levels = vec![
208 generate_level(1, vec![]),
209 generate_level(2, vec![]),
210 generate_level(
211 3,
212 vec![
213 generate_table_with_ids_and_epochs(0, 1, 150, 151, 1, vec![0], 0, 0),
214 generate_table_with_ids_and_epochs(1, 1, 250, 251, 1, vec![1], 0, 0),
215 ],
216 ),
217 Level {
218 level_idx: 4,
219 level_type: LevelType::Nonoverlapping,
220 table_infos: vec![
221 generate_table_with_ids_and_epochs(2, 1, 0, 100, 1, vec![2], 0, 0),
222 generate_table_with_ids_and_epochs(3, 1, 101, 200, 1, vec![3], 0, 0),
223 generate_table_with_ids_and_epochs(4, 1, 222, 300, 1, vec![4], 0, 0),
224 generate_table_with_ids_and_epochs(5, 1, 333, 400, 1, vec![5], 0, 0),
225 generate_table_with_ids_and_epochs(6, 1, 444, 500, 1, vec![6], 0, 0),
226 generate_table_with_ids_and_epochs(7, 1, 555, 600, 1, vec![7], 0, 0),
227 generate_table_with_ids_and_epochs(8, 1, 666, 700, 1, vec![8], 0, 0),
228 generate_table_with_ids_and_epochs(9, 1, 777, 800, 1, vec![9], 0, 0),
229 generate_table_with_ids_and_epochs(10, 1, 888, 1600, 1, vec![10], 0, 0),
230 generate_table_with_ids_and_epochs(11, 1, 1600, 1800, 1, vec![10], 0, 0),
231 ],
232 ..Default::default()
233 },
234 ];
235
236 {
237 let sst_10 = levels[3].table_infos.get_mut(8).unwrap();
238 assert_eq!(10, sst_10.sst_id);
239 *sst_10 = SstableInfoInner {
240 key_range: KeyRange {
241 right_exclusive: true,
242 ..sst_10.get_inner().key_range.clone()
243 },
244 ..sst_10.get_inner()
245 }
246 .into();
247 }
248
249 assert_eq!(levels.len(), 4);
250 let levels = Levels {
251 levels,
252 l0,
253 ..Default::default()
254 };
255 let mut member_table_ids = BTreeSet::new();
256 let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
257 let mut local_stats = LocalSelectorStatistic::default();
258
259 let mut selector = SpaceReclaimCompactionSelector::default();
260 {
261 let task = selector
265 .pick_compaction(
266 1,
267 compaction_selector_context(
268 &group_config,
269 &levels,
270 &member_table_ids,
271 &mut levels_handler,
272 &mut local_stats,
273 &HashMap::default(),
274 Arc::new(CompactionDeveloperConfig::default()),
275 &Default::default(),
276 &HummockVersionStateTableInfo::empty(),
277 ),
278 )
279 .unwrap();
280 assert_compaction_task(&task, &levels_handler);
281 assert_eq!(task.input.input_levels.len(), 2);
282 assert_eq!(task.input.input_levels[0].level_idx, 3);
283 assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
284 levels_handler[4].add_pending_task(0, 4, &levels.levels[3].table_infos[5..6]);
285 let task = selector
286 .pick_compaction(
287 1,
288 compaction_selector_context(
289 &group_config,
290 &levels,
291 &member_table_ids,
292 &mut levels_handler,
293 &mut local_stats,
294 &HashMap::default(),
295 Arc::new(CompactionDeveloperConfig::default()),
296 &Default::default(),
297 &HummockVersionStateTableInfo::empty(),
298 ),
299 )
300 .unwrap();
301 assert_eq!(task.input.input_levels.len(), 2);
302 assert_eq!(task.input.input_levels[0].level_idx, 4);
303 assert_eq!(task.input.input_levels[0].table_infos.len(), 5);
304
305 let mut start_id = 2;
306 for sst in &task.input.input_levels[0].table_infos {
307 assert_eq!(start_id, sst.sst_id);
308 start_id += 1;
309 }
310
311 assert_eq!(task.input.input_levels[1].level_idx, 4);
312 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
313 assert_eq!(task.input.target_level, 4);
314 assert!(matches!(
315 task.compaction_task_type,
316 compact_task::TaskType::SpaceReclaim
317 ));
318
319 let select_file_size: u64 = task.input.input_levels[0]
321 .table_infos
322 .iter()
323 .map(|sst| sst.sst_size)
324 .sum();
325 assert!(select_file_size > max_space_reclaim_bytes);
326 }
327
328 {
329 let task = selector
331 .pick_compaction(
332 1,
333 compaction_selector_context(
334 &group_config,
335 &levels,
336 &member_table_ids,
337 &mut levels_handler,
338 &mut local_stats,
339 &HashMap::default(),
340 Arc::new(CompactionDeveloperConfig::default()),
341 &Default::default(),
342 &HummockVersionStateTableInfo::empty(),
343 ),
344 )
345 .unwrap();
346 assert_compaction_task(&task, &levels_handler);
347 assert_eq!(task.input.input_levels.len(), 2);
348 assert_eq!(task.input.input_levels[0].level_idx, 4);
349 assert_eq!(task.input.input_levels[0].table_infos.len(), 4);
350 assert_eq!(task.input.target_level, 4);
351 assert!(matches!(
352 task.compaction_task_type,
353 compact_task::TaskType::SpaceReclaim
354 ));
355 let mut start_id = 8;
356 for sst in &task.input.input_levels[0].table_infos {
357 assert_eq!(start_id, sst.sst_id);
358 start_id += 1;
359 }
360
361 assert!(
362 selector
363 .pick_compaction(
364 1,
365 compaction_selector_context(
366 &group_config,
367 &levels,
368 &member_table_ids,
369 &mut levels_handler,
370 &mut local_stats,
371 &HashMap::default(),
372 Arc::new(CompactionDeveloperConfig::default()),
373 &Default::default(),
374 &HummockVersionStateTableInfo::empty(),
375 ),
376 )
377 .is_none()
378 )
379 }
380
381 {
382 for level_handler in &mut levels_handler {
385 for pending_task_id in &level_handler.pending_tasks_ids() {
386 level_handler.remove_task(*pending_task_id);
387 }
388 }
389
390 member_table_ids = BTreeSet::from_iter(
391 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
392 .into_iter()
393 .map(TableId::new),
394 );
395 let task = selector.pick_compaction(
397 1,
398 compaction_selector_context(
399 &group_config,
400 &levels,
401 &member_table_ids,
402 &mut levels_handler,
403 &mut local_stats,
404 &HashMap::default(),
405 Arc::new(CompactionDeveloperConfig::default()),
406 &Default::default(),
407 &HummockVersionStateTableInfo::empty(),
408 ),
409 );
410 assert!(task.is_none());
411 }
412
413 {
414 for level_handler in &mut levels_handler {
415 for pending_task_id in &level_handler.pending_tasks_ids() {
416 level_handler.remove_task(*pending_task_id);
417 }
418 }
419
420 member_table_ids =
421 BTreeSet::from_iter([2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(TableId::new));
422 let task = selector
424 .pick_compaction(
425 1,
426 compaction_selector_context(
427 &group_config,
428 &levels,
429 &member_table_ids,
430 &mut levels_handler,
431 &mut local_stats,
432 &HashMap::default(),
433 Arc::new(CompactionDeveloperConfig::default()),
434 &Default::default(),
435 &HummockVersionStateTableInfo::empty(),
436 ),
437 )
438 .unwrap();
439 assert_compaction_task(&task, &levels_handler);
440 assert_eq!(task.input.input_levels.len(), 2);
441 assert_eq!(task.input.input_levels[0].level_idx, 3);
442 assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
443 assert_eq!(task.input.target_level, 3);
444 assert!(matches!(
445 task.compaction_task_type,
446 compact_task::TaskType::SpaceReclaim
447 ));
448 }
449
450 {
451 for level_handler in &mut levels_handler {
453 for pending_task_id in &level_handler.pending_tasks_ids() {
454 level_handler.remove_task(*pending_task_id);
455 }
456 }
457
458 selector = SpaceReclaimCompactionSelector::default();
460 member_table_ids = BTreeSet::from_iter([0, 1, 2, 5, 7].into_iter().map(TableId::new));
462 let expect_task_file_count = [2, 1, 4];
463 let expect_task_sst_id_range = [vec![3, 4], vec![6], vec![8, 9, 10, 11]];
464 for (index, x) in expect_task_file_count.iter().enumerate() {
465 let task = selector
467 .pick_compaction(
468 1,
469 compaction_selector_context(
470 &group_config,
471 &levels,
472 &member_table_ids,
473 &mut levels_handler,
474 &mut local_stats,
475 &HashMap::default(),
476 Arc::new(CompactionDeveloperConfig::default()),
477 &Default::default(),
478 &HummockVersionStateTableInfo::empty(),
479 ),
480 )
481 .unwrap();
482
483 assert_compaction_task(&task, &levels_handler);
484 assert_eq!(task.input.input_levels.len(), 2);
485 assert_eq!(task.input.input_levels[0].level_idx, 4);
486
487 assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
488 let select_sst = &task.input.input_levels[0]
489 .table_infos
490 .iter()
491 .map(|sst| sst.sst_id)
492 .collect_vec();
493 assert!(select_sst.is_sorted());
494 assert_eq!(expect_task_sst_id_range[index], *select_sst);
495
496 assert_eq!(task.input.input_levels[1].level_idx, 4);
497 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
498 assert_eq!(task.input.target_level, 4);
499 assert!(matches!(
500 task.compaction_task_type,
501 compact_task::TaskType::SpaceReclaim
502 ));
503 }
504 }
505
506 {
507 for level_handler in &mut levels_handler {
509 for pending_task_id in &level_handler.pending_tasks_ids() {
510 level_handler.remove_task(*pending_task_id);
511 }
512 }
513
514 selector = SpaceReclaimCompactionSelector::default();
516 member_table_ids = BTreeSet::from_iter([0, 1, 2, 5, 7].into_iter().map(TableId::new));
519 let expect_task_file_count = [2, 1, 5];
520 let expect_task_sst_id_range = [vec![3, 4], vec![6], vec![7, 8, 9, 10, 11]];
521 for (index, x) in expect_task_file_count.iter().enumerate() {
522 if index == expect_task_file_count.len() - 1 {
523 member_table_ids = BTreeSet::from_iter([2, 5].into_iter().map(TableId::new));
524 }
525
526 let task = selector
528 .pick_compaction(
529 1,
530 compaction_selector_context(
531 &group_config,
532 &levels,
533 &member_table_ids,
534 &mut levels_handler,
535 &mut local_stats,
536 &HashMap::default(),
537 Arc::new(CompactionDeveloperConfig::default()),
538 &Default::default(),
539 &HummockVersionStateTableInfo::empty(),
540 ),
541 )
542 .unwrap();
543
544 assert_compaction_task(&task, &levels_handler);
545 assert_eq!(task.input.input_levels.len(), 2);
546 assert_eq!(task.input.input_levels[0].level_idx, 4);
547
548 assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
549 let select_sst = &task.input.input_levels[0]
550 .table_infos
551 .iter()
552 .map(|sst| sst.sst_id)
553 .collect_vec();
554 assert!(select_sst.is_sorted());
555 assert_eq!(expect_task_sst_id_range[index], *select_sst);
556
557 assert_eq!(task.input.input_levels[1].level_idx, 4);
558 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
559 assert_eq!(task.input.target_level, 4);
560 assert!(matches!(
561 task.compaction_task_type,
562 compact_task::TaskType::SpaceReclaim
563 ));
564 }
565 }
566 }
567}