1use std::collections::HashSet;
16
17use risingwave_common::catalog::TableId;
18use risingwave_hummock_sdk::level::{InputLevel, Levels};
19use risingwave_hummock_sdk::sstable_info::SstableInfo;
20
21use super::CompactionInput;
22use crate::hummock::level_handler::LevelHandler;
23
24pub struct SpaceReclaimCompactionPicker {
27 pub _max_space_reclaim_bytes: u64,
29
30 pub all_table_ids: HashSet<TableId>,
32}
33
34#[derive(Default)]
37pub struct SpaceReclaimPickerState {
38 pub last_level: usize,
39}
40
41impl SpaceReclaimCompactionPicker {
42 pub fn new(max_space_reclaim_bytes: u64, all_table_ids: HashSet<TableId>) -> Self {
43 Self {
44 _max_space_reclaim_bytes: max_space_reclaim_bytes,
45 all_table_ids,
46 }
47 }
48
49 fn exist_table_count(&self, sst: &SstableInfo) -> usize {
50 sst.table_ids
52 .iter()
53 .filter(|id| self.all_table_ids.contains(*id))
54 .count()
55 }
56}
57
58impl SpaceReclaimCompactionPicker {
59 pub fn pick_compaction(
60 &mut self,
61 levels: &Levels,
62 level_handlers: &[LevelHandler],
63 state: &mut SpaceReclaimPickerState,
64 ) -> Option<CompactionInput> {
65 assert!(!levels.levels.is_empty());
66 let mut select_input_ssts = vec![];
67
68 if state.last_level == 0 {
69 let l0 = &levels.l0;
70 for level in &l0.sub_levels {
72 for sst in &level.table_infos {
73 let exist_count = self.exist_table_count(sst);
74 if exist_count == sst.table_ids.len()
75 || level_handlers[0].is_pending_compact(&sst.sst_id)
76 {
77 if !select_input_ssts.is_empty() {
78 break;
79 }
80 } else if exist_count == 0 {
81 select_input_ssts.push(sst.clone());
82 } else if !select_input_ssts.is_empty() {
83 break;
84 }
85 }
86 if !select_input_ssts.is_empty() {
87 return Some(CompactionInput {
88 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
89 total_file_count: select_input_ssts.len() as u64,
90 input_levels: vec![
91 InputLevel {
92 level_idx: level.level_idx,
93 level_type: level.level_type,
94 table_infos: select_input_ssts,
95 },
96 InputLevel {
97 level_idx: 0,
98 level_type: level.level_type,
99 table_infos: vec![],
100 },
101 ],
102 target_level: level.level_idx as usize,
103 target_sub_level_id: level.sub_level_id,
104 ..Default::default()
105 });
106 }
107 }
108 state.last_level = 1;
109 }
110 while state.last_level <= levels.levels.len() {
111 let mut is_trivial_task = true;
112 for sst in &levels.levels[state.last_level - 1].table_infos {
113 let exist_count = self.exist_table_count(sst);
114 let need_reclaim = exist_count < sst.table_ids.len();
115 let is_trivial_sst = exist_count == 0;
116 if !need_reclaim || level_handlers[state.last_level].is_pending_compact(&sst.sst_id)
117 {
118 if !select_input_ssts.is_empty() {
119 break;
124 }
125 continue;
126 }
127
128 if !is_trivial_sst {
129 if !select_input_ssts.is_empty() {
130 break;
131 }
132 is_trivial_task = false;
133 }
134
135 select_input_ssts.push(sst.clone());
136 if !is_trivial_task {
137 break;
138 }
139 }
140
141 if !select_input_ssts.is_empty() {
143 return Some(CompactionInput {
144 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
145 total_file_count: select_input_ssts.len() as u64,
146 input_levels: vec![
147 InputLevel {
148 level_idx: state.last_level as u32,
149 level_type: levels.levels[state.last_level - 1].level_type,
150 table_infos: select_input_ssts,
151 },
152 InputLevel {
153 level_idx: state.last_level as u32,
154 level_type: levels.levels[state.last_level - 1].level_type,
155 table_infos: vec![],
156 },
157 ],
158 target_level: state.last_level,
159 ..Default::default()
160 });
161 }
162 state.last_level += 1;
163 }
164 state.last_level = 0;
165 None
166 }
167}
168
169#[cfg(test)]
170mod test {
171
172 use std::collections::{BTreeSet, HashMap};
173 use std::sync::Arc;
174
175 use itertools::Itertools;
176 use risingwave_common::catalog::TableId;
177 use risingwave_hummock_sdk::key_range::KeyRange;
178 use risingwave_hummock_sdk::level::Level;
179 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
180 use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
181 pub use risingwave_pb::hummock::LevelType;
182 use risingwave_pb::hummock::compact_task;
183
184 use super::*;
185 use crate::hummock::compaction::CompactionDeveloperConfig;
186 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
187 use crate::hummock::compaction::selector::tests::{
188 assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level,
189 generate_table_with_ids_and_epochs,
190 };
191 use crate::hummock::compaction::selector::{
192 CompactionSelector, LocalSelectorStatistic, SpaceReclaimCompactionSelector,
193 };
194 use crate::hummock::model::CompactionGroup;
195 use crate::hummock::test_utils::compaction_selector_context;
196
197 #[test]
198 fn test_space_reclaim_compaction_selector() {
199 let max_space_reclaim_bytes = 400;
200 let config = CompactionConfigBuilder::new()
201 .max_level(4)
202 .max_space_reclaim_bytes(max_space_reclaim_bytes)
203 .build();
204 let group_config = CompactionGroup::new(1, config);
205
206 let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
207 assert_eq!(l0.sub_levels.len(), 0);
208 let mut levels = vec![
209 generate_level(1, vec![]),
210 generate_level(2, vec![]),
211 generate_level(
212 3,
213 vec![
214 generate_table_with_ids_and_epochs(0, 1, 150, 151, 1, vec![0], 0, 0),
215 generate_table_with_ids_and_epochs(1, 1, 250, 251, 1, vec![1], 0, 0),
216 ],
217 ),
218 Level {
219 level_idx: 4,
220 level_type: LevelType::Nonoverlapping,
221 table_infos: vec![
222 generate_table_with_ids_and_epochs(2, 1, 0, 100, 1, vec![2], 0, 0),
223 generate_table_with_ids_and_epochs(3, 1, 101, 200, 1, vec![3], 0, 0),
224 generate_table_with_ids_and_epochs(4, 1, 222, 300, 1, vec![4], 0, 0),
225 generate_table_with_ids_and_epochs(5, 1, 333, 400, 1, vec![5], 0, 0),
226 generate_table_with_ids_and_epochs(6, 1, 444, 500, 1, vec![6], 0, 0),
227 generate_table_with_ids_and_epochs(7, 1, 555, 600, 1, vec![7], 0, 0),
228 generate_table_with_ids_and_epochs(8, 1, 666, 700, 1, vec![8], 0, 0),
229 generate_table_with_ids_and_epochs(9, 1, 777, 800, 1, vec![9], 0, 0),
230 generate_table_with_ids_and_epochs(10, 1, 888, 1600, 1, vec![10], 0, 0),
231 generate_table_with_ids_and_epochs(11, 1, 1600, 1800, 1, vec![10], 0, 0),
232 ],
233 ..Default::default()
234 },
235 ];
236
237 {
238 let sst_10 = levels[3].table_infos.get_mut(8).unwrap();
239 assert_eq!(10, sst_10.sst_id);
240 *sst_10 = SstableInfoInner {
241 key_range: KeyRange {
242 right_exclusive: true,
243 ..sst_10.get_inner().key_range
244 },
245 ..sst_10.get_inner()
246 }
247 .into();
248 }
249
250 assert_eq!(levels.len(), 4);
251 let levels = Levels {
252 levels,
253 l0,
254 ..Default::default()
255 };
256 let mut member_table_ids = BTreeSet::new();
257 let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
258 let mut local_stats = LocalSelectorStatistic::default();
259
260 let mut selector = SpaceReclaimCompactionSelector::default();
261 {
262 let task = selector
266 .pick_compaction(
267 1,
268 compaction_selector_context(
269 &group_config,
270 &levels,
271 &member_table_ids,
272 &mut levels_handler,
273 &mut local_stats,
274 &HashMap::default(),
275 Arc::new(CompactionDeveloperConfig::default()),
276 &Default::default(),
277 &HummockVersionStateTableInfo::empty(),
278 ),
279 )
280 .unwrap();
281 assert_compaction_task(&task, &levels_handler);
282 assert_eq!(task.input.input_levels.len(), 2);
283 assert_eq!(task.input.input_levels[0].level_idx, 3);
284 assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
285 levels_handler[4].add_pending_task(0, 4, &levels.levels[3].table_infos[5..6]);
286 let task = selector
287 .pick_compaction(
288 1,
289 compaction_selector_context(
290 &group_config,
291 &levels,
292 &member_table_ids,
293 &mut levels_handler,
294 &mut local_stats,
295 &HashMap::default(),
296 Arc::new(CompactionDeveloperConfig::default()),
297 &Default::default(),
298 &HummockVersionStateTableInfo::empty(),
299 ),
300 )
301 .unwrap();
302 assert_eq!(task.input.input_levels.len(), 2);
303 assert_eq!(task.input.input_levels[0].level_idx, 4);
304 assert_eq!(task.input.input_levels[0].table_infos.len(), 5);
305
306 for (i, sst) in task.input.input_levels[0].table_infos.iter().enumerate() {
307 assert_eq!(2 + i as u64, sst.sst_id);
308 }
309
310 assert_eq!(task.input.input_levels[1].level_idx, 4);
311 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
312 assert_eq!(task.input.target_level, 4);
313 assert!(matches!(
314 task.compaction_task_type,
315 compact_task::TaskType::SpaceReclaim
316 ));
317
318 let select_file_size: u64 = task.input.input_levels[0]
320 .table_infos
321 .iter()
322 .map(|sst| sst.sst_size)
323 .sum();
324 assert!(select_file_size > max_space_reclaim_bytes);
325 }
326
327 {
328 let task = selector
330 .pick_compaction(
331 1,
332 compaction_selector_context(
333 &group_config,
334 &levels,
335 &member_table_ids,
336 &mut levels_handler,
337 &mut local_stats,
338 &HashMap::default(),
339 Arc::new(CompactionDeveloperConfig::default()),
340 &Default::default(),
341 &HummockVersionStateTableInfo::empty(),
342 ),
343 )
344 .unwrap();
345 assert_compaction_task(&task, &levels_handler);
346 assert_eq!(task.input.input_levels.len(), 2);
347 assert_eq!(task.input.input_levels[0].level_idx, 4);
348 assert_eq!(task.input.input_levels[0].table_infos.len(), 4);
349 assert_eq!(task.input.target_level, 4);
350 assert!(matches!(
351 task.compaction_task_type,
352 compact_task::TaskType::SpaceReclaim
353 ));
354 for (i, sst) in task.input.input_levels[0].table_infos.iter().enumerate() {
355 assert_eq!(8 + i as u64, sst.sst_id);
356 }
357
358 assert!(
359 selector
360 .pick_compaction(
361 1,
362 compaction_selector_context(
363 &group_config,
364 &levels,
365 &member_table_ids,
366 &mut levels_handler,
367 &mut local_stats,
368 &HashMap::default(),
369 Arc::new(CompactionDeveloperConfig::default()),
370 &Default::default(),
371 &HummockVersionStateTableInfo::empty(),
372 ),
373 )
374 .is_none()
375 )
376 }
377
378 {
379 for level_handler in &mut levels_handler {
382 for pending_task_id in &level_handler.pending_tasks_ids() {
383 level_handler.remove_task(*pending_task_id);
384 }
385 }
386
387 member_table_ids = BTreeSet::from_iter(
388 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
389 .into_iter()
390 .map(TableId::new),
391 );
392 let task = selector.pick_compaction(
394 1,
395 compaction_selector_context(
396 &group_config,
397 &levels,
398 &member_table_ids,
399 &mut levels_handler,
400 &mut local_stats,
401 &HashMap::default(),
402 Arc::new(CompactionDeveloperConfig::default()),
403 &Default::default(),
404 &HummockVersionStateTableInfo::empty(),
405 ),
406 );
407 assert!(task.is_none());
408 }
409
410 {
411 for level_handler in &mut levels_handler {
412 for pending_task_id in &level_handler.pending_tasks_ids() {
413 level_handler.remove_task(*pending_task_id);
414 }
415 }
416
417 member_table_ids =
418 BTreeSet::from_iter([2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(TableId::new));
419 let task = selector
421 .pick_compaction(
422 1,
423 compaction_selector_context(
424 &group_config,
425 &levels,
426 &member_table_ids,
427 &mut levels_handler,
428 &mut local_stats,
429 &HashMap::default(),
430 Arc::new(CompactionDeveloperConfig::default()),
431 &Default::default(),
432 &HummockVersionStateTableInfo::empty(),
433 ),
434 )
435 .unwrap();
436 assert_compaction_task(&task, &levels_handler);
437 assert_eq!(task.input.input_levels.len(), 2);
438 assert_eq!(task.input.input_levels[0].level_idx, 3);
439 assert_eq!(task.input.input_levels[0].table_infos.len(), 2);
440 assert_eq!(task.input.target_level, 3);
441 assert!(matches!(
442 task.compaction_task_type,
443 compact_task::TaskType::SpaceReclaim
444 ));
445 }
446
447 {
448 for level_handler in &mut levels_handler {
450 for pending_task_id in &level_handler.pending_tasks_ids() {
451 level_handler.remove_task(*pending_task_id);
452 }
453 }
454
455 selector = SpaceReclaimCompactionSelector::default();
457 member_table_ids = BTreeSet::from_iter([0, 1, 2, 5, 7].into_iter().map(TableId::new));
459 let expect_task_file_count = [2, 1, 4];
460 let expect_task_sst_id_range = [vec![3, 4], vec![6], vec![8, 9, 10, 11]];
461 for (index, x) in expect_task_file_count.iter().enumerate() {
462 let task = selector
464 .pick_compaction(
465 1,
466 compaction_selector_context(
467 &group_config,
468 &levels,
469 &member_table_ids,
470 &mut levels_handler,
471 &mut local_stats,
472 &HashMap::default(),
473 Arc::new(CompactionDeveloperConfig::default()),
474 &Default::default(),
475 &HummockVersionStateTableInfo::empty(),
476 ),
477 )
478 .unwrap();
479
480 assert_compaction_task(&task, &levels_handler);
481 assert_eq!(task.input.input_levels.len(), 2);
482 assert_eq!(task.input.input_levels[0].level_idx, 4);
483
484 assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
485 let select_sst = &task.input.input_levels[0]
486 .table_infos
487 .iter()
488 .map(|sst| sst.sst_id)
489 .collect_vec();
490 assert!(select_sst.is_sorted());
491 assert_eq!(expect_task_sst_id_range[index], *select_sst);
492
493 assert_eq!(task.input.input_levels[1].level_idx, 4);
494 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
495 assert_eq!(task.input.target_level, 4);
496 assert!(matches!(
497 task.compaction_task_type,
498 compact_task::TaskType::SpaceReclaim
499 ));
500 }
501 }
502
503 {
504 for level_handler in &mut levels_handler {
506 for pending_task_id in &level_handler.pending_tasks_ids() {
507 level_handler.remove_task(*pending_task_id);
508 }
509 }
510
511 selector = SpaceReclaimCompactionSelector::default();
513 member_table_ids = BTreeSet::from_iter([0, 1, 2, 5, 7].into_iter().map(TableId::new));
516 let expect_task_file_count = [2, 1, 5];
517 let expect_task_sst_id_range = [vec![3, 4], vec![6], vec![7, 8, 9, 10, 11]];
518 for (index, x) in expect_task_file_count.iter().enumerate() {
519 if index == expect_task_file_count.len() - 1 {
520 member_table_ids = BTreeSet::from_iter([2, 5].into_iter().map(TableId::new));
521 }
522
523 let task = selector
525 .pick_compaction(
526 1,
527 compaction_selector_context(
528 &group_config,
529 &levels,
530 &member_table_ids,
531 &mut levels_handler,
532 &mut local_stats,
533 &HashMap::default(),
534 Arc::new(CompactionDeveloperConfig::default()),
535 &Default::default(),
536 &HummockVersionStateTableInfo::empty(),
537 ),
538 )
539 .unwrap();
540
541 assert_compaction_task(&task, &levels_handler);
542 assert_eq!(task.input.input_levels.len(), 2);
543 assert_eq!(task.input.input_levels[0].level_idx, 4);
544
545 assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
546 let select_sst = &task.input.input_levels[0]
547 .table_infos
548 .iter()
549 .map(|sst| sst.sst_id)
550 .collect_vec();
551 assert!(select_sst.is_sorted());
552 assert_eq!(expect_task_sst_id_range[index], *select_sst);
553
554 assert_eq!(task.input.input_levels[1].level_idx, 4);
555 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
556 assert_eq!(task.input.target_level, 4);
557 assert!(matches!(
558 task.compaction_task_type,
559 compact_task::TaskType::SpaceReclaim
560 ));
561 }
562 }
563 }
564}