risingwave_meta/hummock/compaction/selector/
ttl_selector.rs1use std::collections::HashMap;
21
22use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
23use risingwave_pb::hummock::compact_task;
24
25use super::{CompactionSelector, DynamicLevelSelectorCore};
26use crate::hummock::compaction::picker::{TtlPickerState, TtlReclaimCompactionPicker};
27use crate::hummock::compaction::selector::CompactionSelectorContext;
28use crate::hummock::compaction::{CompactionTask, create_compaction_task};
29
30#[derive(Default)]
31pub struct TtlCompactionSelector {
32 state: HashMap<CompactionGroupId, TtlPickerState>,
33}
34
35impl CompactionSelector for TtlCompactionSelector {
36 fn pick_compaction(
37 &mut self,
38 task_id: HummockCompactionTaskId,
39 context: CompactionSelectorContext<'_>,
40 ) -> Option<CompactionTask> {
41 let CompactionSelectorContext {
42 group,
43 levels,
44 level_handlers,
45 table_id_to_options,
46 developer_config,
47 in_progress_compactions,
48 ..
49 } = context;
50 let dynamic_level_core =
51 DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
52 let ctx = dynamic_level_core.calculate_level_base_size(levels);
53 let picker = TtlReclaimCompactionPicker::new(table_id_to_options);
54 let state = self.state.entry(group.group_id).or_default();
55 let compaction_input = picker.pick_compaction(levels, level_handlers, state)?;
56 if !compaction_input.skip_target_range_conflict_check
57 && in_progress_compactions.has_conflict_with_input(&compaction_input)
58 {
59 return None;
60 }
61 compaction_input.add_pending_task(task_id, level_handlers);
62
63 Some(create_compaction_task(
64 group.compaction_config.as_ref(),
65 compaction_input,
66 ctx.base_level,
67 self.task_type(),
68 ))
69 }
70
71 fn name(&self) -> &'static str {
72 "TtlCompaction"
73 }
74
75 fn task_type(&self) -> compact_task::TaskType {
76 compact_task::TaskType::Ttl
77 }
78}