risingwave_meta/hummock/compaction/selector/
space_reclaim_selector.rs1use std::collections::HashMap;
21
22use risingwave_hummock_sdk::HummockCompactionTaskId;
23use risingwave_pb::hummock::compact_task;
24
25use super::{CompactionSelector, DynamicLevelSelectorCore};
26use crate::hummock::compaction::picker::{SpaceReclaimCompactionPicker, SpaceReclaimPickerState};
27use crate::hummock::compaction::selector::CompactionSelectorContext;
28use crate::hummock::compaction::{CompactionTask, create_compaction_task};
29
30#[derive(Default)]
31pub struct SpaceReclaimCompactionSelector {
32 state: HashMap<u64, SpaceReclaimPickerState>,
33}
34
35impl CompactionSelector for SpaceReclaimCompactionSelector {
36 fn pick_compaction(
37 &mut self,
38 task_id: HummockCompactionTaskId,
39 context: CompactionSelectorContext<'_>,
40 ) -> Option<CompactionTask> {
41 let CompactionSelectorContext {
42 group,
43 levels,
44 member_table_ids,
45 level_handlers,
46 developer_config,
47 ..
48 } = context;
49 let dynamic_level_core =
50 DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
51 let mut picker = SpaceReclaimCompactionPicker::new(
52 group.compaction_config.max_space_reclaim_bytes,
53 member_table_ids
54 .iter()
55 .map(|table_id| table_id.table_id)
56 .collect(),
57 );
58 let ctx = dynamic_level_core.calculate_level_base_size(levels);
59 let state = self.state.entry(group.group_id).or_default();
60
61 let compaction_input = picker.pick_compaction(levels, level_handlers, state)?;
62 compaction_input.add_pending_task(task_id, level_handlers);
63
64 Some(create_compaction_task(
65 dynamic_level_core.get_config(),
66 compaction_input,
67 ctx.base_level,
68 self.task_type(),
69 ))
70 }
71
72 fn name(&self) -> &'static str {
73 "SpaceReclaimCompaction"
74 }
75
76 fn task_type(&self) -> compact_task::TaskType {
77 compact_task::TaskType::SpaceReclaim
78 }
79}