risingwave_meta/hummock/compaction/selector/
ttl_selector.rs1use std::collections::HashMap;
21
22use risingwave_hummock_sdk::HummockCompactionTaskId;
23use risingwave_pb::hummock::compact_task;
24
25use super::{CompactionSelector, DynamicLevelSelectorCore};
26use crate::hummock::compaction::picker::{TtlPickerState, TtlReclaimCompactionPicker};
27use crate::hummock::compaction::selector::CompactionSelectorContext;
28use crate::hummock::compaction::{CompactionTask, create_compaction_task};
29
30#[derive(Default)]
31pub struct TtlCompactionSelector {
32 state: HashMap<u64, TtlPickerState>,
33}
34
35impl CompactionSelector for TtlCompactionSelector {
36 fn pick_compaction(
37 &mut self,
38 task_id: HummockCompactionTaskId,
39 context: CompactionSelectorContext<'_>,
40 ) -> Option<CompactionTask> {
41 let CompactionSelectorContext {
42 group,
43 levels,
44 level_handlers,
45 table_id_to_options,
46 developer_config,
47 ..
48 } = context;
49 let dynamic_level_core =
50 DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
51 let ctx = dynamic_level_core.calculate_level_base_size(levels);
52 let picker = TtlReclaimCompactionPicker::new(table_id_to_options);
53 let state = self.state.entry(group.group_id).or_default();
54 let compaction_input = picker.pick_compaction(levels, level_handlers, state)?;
55 compaction_input.add_pending_task(task_id, level_handlers);
56
57 Some(create_compaction_task(
58 group.compaction_config.as_ref(),
59 compaction_input,
60 ctx.base_level,
61 self.task_type(),
62 ))
63 }
64
65 fn name(&self) -> &'static str {
66 "TtlCompaction"
67 }
68
69 fn task_type(&self) -> compact_task::TaskType {
70 compact_task::TaskType::Ttl
71 }
72}