risingwave_meta/hummock/compaction/selector/
manual_selector.rs1use std::collections::HashSet;
21
22use bytes::Bytes;
23use risingwave_hummock_sdk::compaction_group::StateTableId;
24use risingwave_hummock_sdk::key_range::KeyRange;
25use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockSstableId};
26use risingwave_pb::hummock::compact_task;
27
28use super::{CompactionSelector, DynamicLevelSelectorCore};
29use crate::hummock::compaction::picker::{
30 CompactionPicker, LocalPickerStatistic, ManualCompactionPicker,
31};
32use crate::hummock::compaction::selector::CompactionSelectorContext;
33use crate::hummock::compaction::{CompactionTask, create_compaction_task, create_overlap_strategy};
34
35#[derive(Clone, Debug, PartialEq)]
36pub struct ManualCompactionOption {
37 pub sst_ids: Vec<HummockSstableId>,
39 pub key_range: KeyRange,
41 pub internal_table_id: HashSet<StateTableId>,
43 pub level: usize,
45}
46
47impl Default for ManualCompactionOption {
48 fn default() -> Self {
49 Self {
50 sst_ids: vec![],
51 key_range: KeyRange {
52 left: Bytes::default(),
53 right: Bytes::default(),
54 right_exclusive: false,
55 },
56 internal_table_id: HashSet::default(),
57 level: 1,
58 }
59 }
60}
61
62pub struct ManualCompactionSelector {
63 option: ManualCompactionOption,
64}
65
66impl ManualCompactionSelector {
67 pub fn new(option: ManualCompactionOption) -> Self {
68 Self { option }
69 }
70}
71
72impl CompactionSelector for ManualCompactionSelector {
73 fn pick_compaction(
74 &mut self,
75 task_id: HummockCompactionTaskId,
76 context: CompactionSelectorContext<'_>,
77 ) -> Option<CompactionTask> {
78 let CompactionSelectorContext {
79 group,
80 levels,
81 level_handlers,
82 developer_config,
83 ..
84 } = context;
85 let dynamic_level_core =
86 DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
87 let overlap_strategy = create_overlap_strategy(group.compaction_config.compaction_mode());
88 let ctx = dynamic_level_core.calculate_level_base_size(levels);
89 let (mut picker, base_level) = {
90 let target_level = if self.option.level == 0 {
91 ctx.base_level
92 } else if self.option.level == group.compaction_config.max_level as usize {
93 self.option.level
94 } else {
95 self.option.level + 1
96 };
97 if self.option.level > 0 && self.option.level < ctx.base_level {
98 return None;
99 }
100 (
101 ManualCompactionPicker::new(overlap_strategy, self.option.clone(), target_level),
102 ctx.base_level,
103 )
104 };
105
106 let compaction_input =
107 picker.pick_compaction(levels, level_handlers, &mut LocalPickerStatistic::default())?;
108 compaction_input.add_pending_task(task_id, level_handlers);
109
110 Some(create_compaction_task(
111 group.compaction_config.as_ref(),
112 compaction_input,
113 base_level,
114 self.task_type(),
115 ))
116 }
117
118 fn name(&self) -> &'static str {
119 "ManualCompactionSelector"
120 }
121
122 fn task_type(&self) -> compact_task::TaskType {
123 compact_task::TaskType::Manual
124 }
125}