1use std::collections::HashSet;
16
17use risingwave_common::catalog::TableId;
18use risingwave_hummock_sdk::level::{InputLevel, Levels};
19use risingwave_hummock_sdk::sstable_info::SstableInfo;
20
21use super::CompactionInput;
22use crate::hummock::level_handler::LevelHandler;
23
24pub struct SpaceReclaimCompactionPicker {
27 pub _max_space_reclaim_bytes: u64,
29
30 pub all_table_ids: HashSet<TableId>,
32}
33
34#[derive(Default)]
37pub struct SpaceReclaimPickerState {
38 pub last_level: usize,
39}
40
41impl SpaceReclaimCompactionPicker {
42 pub fn new(max_space_reclaim_bytes: u64, all_table_ids: HashSet<TableId>) -> Self {
43 Self {
44 _max_space_reclaim_bytes: max_space_reclaim_bytes,
45 all_table_ids,
46 }
47 }
48
49 fn exist_table_count(&self, sst: &SstableInfo) -> usize {
50 sst.table_ids
52 .iter()
53 .filter(|id| self.all_table_ids.contains(id))
54 .count()
55 }
56}
57
58impl SpaceReclaimCompactionPicker {
59 pub fn pick_compaction(
60 &mut self,
61 levels: &Levels,
62 level_handlers: &[LevelHandler],
63 state: &mut SpaceReclaimPickerState,
64 ) -> Option<CompactionInput> {
65 assert!(!levels.levels.is_empty());
66 let mut select_input_ssts = vec![];
67
68 if state.last_level == 0 {
69 let l0 = &levels.l0;
70 for level in &l0.sub_levels {
72 for sst in &level.table_infos {
73 let exist_count = self.exist_table_count(sst);
74 if exist_count == sst.table_ids.len()
75 || level_handlers[0].is_pending_compact(&sst.sst_id)
76 {
77 if !select_input_ssts.is_empty() {
78 break;
79 }
80 } else if exist_count == 0 {
81 select_input_ssts.push(sst.clone());
82 } else if !select_input_ssts.is_empty() {
83 break;
84 }
85 }
86 if !select_input_ssts.is_empty() {
87 return Some(CompactionInput {
88 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
89 total_file_count: select_input_ssts.len() as u64,
90 input_levels: vec![
91 InputLevel {
92 level_idx: level.level_idx,
93 level_type: level.level_type,
94 table_infos: select_input_ssts,
95 },
96 InputLevel {
97 level_idx: 0,
98 level_type: level.level_type,
99 table_infos: vec![],
100 },
101 ],
102 target_level: level.level_idx as usize,
103 target_sub_level_id: level.sub_level_id,
104 ..Default::default()
105 });
106 }
107 }
108 state.last_level = 1;
109 }
110 while state.last_level <= levels.levels.len() {
111 let mut is_trivial_task = true;
112 for sst in &levels.levels[state.last_level - 1].table_infos {
113 let exist_count = self.exist_table_count(sst);
114 let need_reclaim = exist_count < sst.table_ids.len();
115 let is_trivial_sst = exist_count == 0;
116 if !need_reclaim || level_handlers[state.last_level].is_pending_compact(&sst.sst_id)
117 {
118 if !select_input_ssts.is_empty() {
119 break;
124 }
125 continue;
126 }
127
128 if !is_trivial_sst {
129 if !select_input_ssts.is_empty() {
130 break;
131 }
132 is_trivial_task = false;
133 }
134
135 select_input_ssts.push(sst.clone());
136 if !is_trivial_task {
137 break;
138 }
139 }
140
141 if !select_input_ssts.is_empty() {
143 return Some(CompactionInput {
144 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
145 total_file_count: select_input_ssts.len() as u64,
146 input_levels: vec![
147 InputLevel {
148 level_idx: state.last_level as u32,
149 level_type: levels.levels[state.last_level - 1].level_type,
150 table_infos: select_input_ssts,
151 },
152 InputLevel {
153 level_idx: state.last_level as u32,
154 level_type: levels.levels[state.last_level - 1].level_type,
155 table_infos: vec![],
156 },
157 ],
158 target_level: state.last_level,
159 ..Default::default()
160 });
161 }
162 state.last_level += 1;
163 }
164 state.last_level = 0;
165 None
166 }
167}
168
169#[cfg(test)]
170mod test {
171
172 use std::collections::{BTreeSet, HashMap};
173 use std::sync::Arc;
174
175 use itertools::Itertools;
176 use risingwave_common::catalog::TableId;
177 use risingwave_hummock_sdk::key_range::KeyRange;
178 use risingwave_hummock_sdk::level::Level;
179 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
180 use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
181 pub use risingwave_pb::hummock::LevelType;
182 use risingwave_pb::hummock::compact_task;
183
184 use super::*;
185 use crate::hummock::compaction::CompactionDeveloperConfig;
186 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
187 use crate::hummock::compaction::selector::tests::{
188 assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level,
189 generate_table_with_ids_and_epochs,
190 };
191 use crate::hummock::compaction::selector::{
192 CompactionSelector, LocalSelectorStatistic, SpaceReclaimCompactionSelector,
193 };
194 use crate::hummock::model::CompactionGroup;
195 use crate::hummock::test_utils::compaction_selector_context;
196
197 #[test]
198 fn test_space_reclaim_compaction_selector() {
199 let max_space_reclaim_bytes = 400;
200 let config = CompactionConfigBuilder::new()
201 .max_level(4)
202 .max_space_reclaim_bytes(max_space_reclaim_bytes)
203 .build();
204 let group_config = CompactionGroup::new(1, config);
205
206 let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
207 assert_eq!(l0.sub_levels.len(), 0);
208 let mut levels = vec![
209 generate_level(1, vec![]),
210 generate_level(2, vec![]),
211 generate_level(
212 3,
213 vec![
214 generate_table_with_ids_and_epochs(0, 1, 150, 151, 1, vec![0], 0, 0),
215 generate_table_with_ids_and_epochs(1, 1, 250, 251, 1, vec![1], 0, 0),
216 ],
217 ),
218 Level {
219 level_idx: 4,
220 level_type: LevelType::Nonoverlapping,
221 table_infos: vec![
222 generate_table_with_ids_and_epochs(2, 1, 0, 100, 1, vec![2], 0, 0),
223 generate_table_with_ids_and_epochs(3, 1, 101, 200, 1, vec![3], 0, 0),
224 generate_table_with_ids_and_epochs(4, 1, 222, 300, 1, vec![4], 0, 0),
225 generate_table_with_ids_and_epochs(5, 1, 333, 400, 1, vec![5], 0, 0),
226 generate_table_with_ids_and_epochs(6, 1, 444, 500, 1, vec![6], 0, 0),
227 generate_table_with_ids_and_epochs(7, 1, 555, 600, 1, vec![7], 0, 0),
228 generate_table_with_ids_and_epochs(8, 1, 666, 700, 1, vec![8], 0, 0),
229 generate_table_with_ids_and_epochs(9, 1, 777, 800, 1, vec![9], 0, 0),
230 generate_table_with_ids_and_epochs(10, 1, 888, 1600, 1, vec![10], 0, 0),
231 generate_table_with_ids_and_epochs(11, 1, 1600, 1800, 1, vec![10], 0, 0),
232 ],
233 ..Default::default()
234 },
235 ];
236
237 {
238 let sst_10 = levels[3].table_infos.get_mut(8).unwrap();
239 assert_eq!(10, sst_10.sst_id);
240 *sst_10 = SstableInfoInner {
241 key_range: KeyRange {
242 right_exclusive: true,
243 ..sst_10.get_inner().key_range
244 },
245 ..sst_10.get_inner()
246 }
247 .into();
248 }
249
250 assert_eq!(levels.len(), 4);
251 let levels = Levels {
252 levels,
253 l0,
254 ..Default::default()
255 };
256 let mut member_table_ids = BTreeSet::new();
257 let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
258 let mut local_stats = LocalSelectorStatistic::default();
259
260 let mut selector = SpaceReclaimCompactionSelector::default();
261 {
262 let task = selector
266 .pick_compaction(
267 1,
268 compaction_selector_context(
269 &group_config,
270 &levels,
271 &member_table_ids,
272 &mut levels_handler,
273 &mut local_stats,
274 &HashMap::default(),
275 Arc::new(CompactionDeveloperConfig::default()),
276 &Default::default(),
277 &HummockVersionStateTableInfo::empty(),
278 ),
279 )
280 .unwrap();
281 assert_compaction_task(&task, &levels_handler);
282 assert_eq!(task.input.input_levels.len(), 2);
283 assert_eq!(task.input.input_levels[0].level_idx, 3);
284 assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
285 levels_handler[4].add_pending_task(0, 4, &levels.levels[3].table_infos[5..6]);
286 let task = selector
287 .pick_compaction(
288 1,
289 compaction_selector_context(
290 &group_config,
291 &levels,
292 &member_table_ids,
293 &mut levels_handler,
294 &mut local_stats,
295 &HashMap::default(),
296 Arc::new(CompactionDeveloperConfig::default()),
297 &Default::default(),
298 &HummockVersionStateTableInfo::empty(),
299 ),
300 )
301 .unwrap();
302 assert_eq!(task.input.input_levels.len(), 2);
303 assert_eq!(task.input.input_levels[0].level_idx, 4);
304 assert_eq!(task.input.input_levels[0].table_infos.len(), 5);
305
306 let mut start_id = 2;
307 for sst in &task.input.input_levels[0].table_infos {
308 assert_eq!(start_id, sst.sst_id);
309 start_id += 1;
310 }
311
312 assert_eq!(task.input.input_levels[1].level_idx, 4);
313 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
314 assert_eq!(task.input.target_level, 4);
315 assert!(matches!(
316 task.compaction_task_type,
317 compact_task::TaskType::SpaceReclaim
318 ));
319
320 let select_file_size: u64 = task.input.input_levels[0]
322 .table_infos
323 .iter()
324 .map(|sst| sst.sst_size)
325 .sum();
326 assert!(select_file_size > max_space_reclaim_bytes);
327 }
328
329 {
330 let task = selector
332 .pick_compaction(
333 1,
334 compaction_selector_context(
335 &group_config,
336 &levels,
337 &member_table_ids,
338 &mut levels_handler,
339 &mut local_stats,
340 &HashMap::default(),
341 Arc::new(CompactionDeveloperConfig::default()),
342 &Default::default(),
343 &HummockVersionStateTableInfo::empty(),
344 ),
345 )
346 .unwrap();
347 assert_compaction_task(&task, &levels_handler);
348 assert_eq!(task.input.input_levels.len(), 2);
349 assert_eq!(task.input.input_levels[0].level_idx, 4);
350 assert_eq!(task.input.input_levels[0].table_infos.len(), 4);
351 assert_eq!(task.input.target_level, 4);
352 assert!(matches!(
353 task.compaction_task_type,
354 compact_task::TaskType::SpaceReclaim
355 ));
356 let mut start_id = 8;
357 for sst in &task.input.input_levels[0].table_infos {
358 assert_eq!(start_id, sst.sst_id);
359 start_id += 1;
360 }
361
362 assert!(
363 selector
364 .pick_compaction(
365 1,
366 compaction_selector_context(
367 &group_config,
368 &levels,
369 &member_table_ids,
370 &mut levels_handler,
371 &mut local_stats,
372 &HashMap::default(),
373 Arc::new(CompactionDeveloperConfig::default()),
374 &Default::default(),
375 &HummockVersionStateTableInfo::empty(),
376 ),
377 )
378 .is_none()
379 )
380 }
381
382 {
383 for level_handler in &mut levels_handler {
386 for pending_task_id in &level_handler.pending_tasks_ids() {
387 level_handler.remove_task(*pending_task_id);
388 }
389 }
390
391 member_table_ids = BTreeSet::from_iter(
392 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
393 .into_iter()
394 .map(TableId::new),
395 );
396 let task = selector.pick_compaction(
398 1,
399 compaction_selector_context(
400 &group_config,
401 &levels,
402 &member_table_ids,
403 &mut levels_handler,
404 &mut local_stats,
405 &HashMap::default(),
406 Arc::new(CompactionDeveloperConfig::default()),
407 &Default::default(),
408 &HummockVersionStateTableInfo::empty(),
409 ),
410 );
411 assert!(task.is_none());
412 }
413
414 {
415 for level_handler in &mut levels_handler {
416 for pending_task_id in &level_handler.pending_tasks_ids() {
417 level_handler.remove_task(*pending_task_id);
418 }
419 }
420
421 member_table_ids =
422 BTreeSet::from_iter([2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(TableId::new));
423 let task = selector
425 .pick_compaction(
426 1,
427 compaction_selector_context(
428 &group_config,
429 &levels,
430 &member_table_ids,
431 &mut levels_handler,
432 &mut local_stats,
433 &HashMap::default(),
434 Arc::new(CompactionDeveloperConfig::default()),
435 &Default::default(),
436 &HummockVersionStateTableInfo::empty(),
437 ),
438 )
439 .unwrap();
440 assert_compaction_task(&task, &levels_handler);
441 assert_eq!(task.input.input_levels.len(), 2);
442 assert_eq!(task.input.input_levels[0].level_idx, 3);
443 assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
444 assert_eq!(task.input.target_level, 3);
445 assert!(matches!(
446 task.compaction_task_type,
447 compact_task::TaskType::SpaceReclaim
448 ));
449 }
450
451 {
452 for level_handler in &mut levels_handler {
454 for pending_task_id in &level_handler.pending_tasks_ids() {
455 level_handler.remove_task(*pending_task_id);
456 }
457 }
458
459 selector = SpaceReclaimCompactionSelector::default();
461 member_table_ids = BTreeSet::from_iter([0, 1, 2, 5, 7].into_iter().map(TableId::new));
463 let expect_task_file_count = [2, 1, 4];
464 let expect_task_sst_id_range = [vec![3, 4], vec![6], vec![8, 9, 10, 11]];
465 for (index, x) in expect_task_file_count.iter().enumerate() {
466 let task = selector
468 .pick_compaction(
469 1,
470 compaction_selector_context(
471 &group_config,
472 &levels,
473 &member_table_ids,
474 &mut levels_handler,
475 &mut local_stats,
476 &HashMap::default(),
477 Arc::new(CompactionDeveloperConfig::default()),
478 &Default::default(),
479 &HummockVersionStateTableInfo::empty(),
480 ),
481 )
482 .unwrap();
483
484 assert_compaction_task(&task, &levels_handler);
485 assert_eq!(task.input.input_levels.len(), 2);
486 assert_eq!(task.input.input_levels[0].level_idx, 4);
487
488 assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
489 let select_sst = &task.input.input_levels[0]
490 .table_infos
491 .iter()
492 .map(|sst| sst.sst_id)
493 .collect_vec();
494 assert!(select_sst.is_sorted());
495 assert_eq!(expect_task_sst_id_range[index], *select_sst);
496
497 assert_eq!(task.input.input_levels[1].level_idx, 4);
498 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
499 assert_eq!(task.input.target_level, 4);
500 assert!(matches!(
501 task.compaction_task_type,
502 compact_task::TaskType::SpaceReclaim
503 ));
504 }
505 }
506
507 {
508 for level_handler in &mut levels_handler {
510 for pending_task_id in &level_handler.pending_tasks_ids() {
511 level_handler.remove_task(*pending_task_id);
512 }
513 }
514
515 selector = SpaceReclaimCompactionSelector::default();
517 member_table_ids = BTreeSet::from_iter([0, 1, 2, 5, 7].into_iter().map(TableId::new));
520 let expect_task_file_count = [2, 1, 5];
521 let expect_task_sst_id_range = [vec![3, 4], vec![6], vec![7, 8, 9, 10, 11]];
522 for (index, x) in expect_task_file_count.iter().enumerate() {
523 if index == expect_task_file_count.len() - 1 {
524 member_table_ids = BTreeSet::from_iter([2, 5].into_iter().map(TableId::new));
525 }
526
527 let task = selector
529 .pick_compaction(
530 1,
531 compaction_selector_context(
532 &group_config,
533 &levels,
534 &member_table_ids,
535 &mut levels_handler,
536 &mut local_stats,
537 &HashMap::default(),
538 Arc::new(CompactionDeveloperConfig::default()),
539 &Default::default(),
540 &HummockVersionStateTableInfo::empty(),
541 ),
542 )
543 .unwrap();
544
545 assert_compaction_task(&task, &levels_handler);
546 assert_eq!(task.input.input_levels.len(), 2);
547 assert_eq!(task.input.input_levels[0].level_idx, 4);
548
549 assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
550 let select_sst = &task.input.input_levels[0]
551 .table_infos
552 .iter()
553 .map(|sst| sst.sst_id)
554 .collect_vec();
555 assert!(select_sst.is_sorted());
556 assert_eq!(expect_task_sst_id_range[index], *select_sst);
557
558 assert_eq!(task.input.input_levels[1].level_idx, 4);
559 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
560 assert_eq!(task.input.target_level, 4);
561 assert!(matches!(
562 task.compaction_task_type,
563 compact_task::TaskType::SpaceReclaim
564 ));
565 }
566 }
567 }
568}