risingwave_meta/hummock/compaction/selector/
manual_selector.rs1use std::collections::HashSet;
21
22use bytes::Bytes;
23use risingwave_hummock_sdk::compaction_group::StateTableId;
24use risingwave_hummock_sdk::key_range::KeyRange;
25use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockSstableId};
26use risingwave_pb::hummock::compact_task;
27
28use super::{CompactionSelector, DynamicLevelSelectorCore};
29use crate::hummock::compaction::picker::{
30 CompactionPicker, LocalPickerStatistic, ManualCompactionPicker,
31};
32use crate::hummock::compaction::selector::CompactionSelectorContext;
33use crate::hummock::compaction::{CompactionTask, create_compaction_task, create_overlap_strategy};
34
35#[derive(Clone, Debug, PartialEq)]
36pub struct ManualCompactionOption {
37 pub sst_ids: Vec<HummockSstableId>,
39 pub key_range: KeyRange,
41 pub internal_table_id: HashSet<StateTableId>,
43 pub level: usize,
45 pub exclusive: bool,
47}
48
49impl Default for ManualCompactionOption {
50 fn default() -> Self {
51 Self {
52 sst_ids: vec![],
53 key_range: KeyRange {
54 left: Bytes::default(),
55 right: Bytes::default(),
56 right_exclusive: false,
57 },
58 internal_table_id: HashSet::default(),
59 level: 1,
60 exclusive: false,
61 }
62 }
63}
64
65pub struct ManualCompactionSelector {
66 option: ManualCompactionOption,
67 blocked_by_pending: bool,
68}
69
70impl ManualCompactionSelector {
71 pub fn new(option: ManualCompactionOption) -> Self {
72 Self {
73 option,
74 blocked_by_pending: false,
75 }
76 }
77
78 pub fn blocked_by_pending(&self) -> bool {
79 self.blocked_by_pending
80 }
81}
82
83impl CompactionSelector for ManualCompactionSelector {
84 fn pick_compaction(
85 &mut self,
86 task_id: HummockCompactionTaskId,
87 context: CompactionSelectorContext<'_>,
88 ) -> Option<CompactionTask> {
89 let CompactionSelectorContext {
90 group,
91 levels,
92 level_handlers,
93 developer_config,
94 ..
95 } = context;
96 self.blocked_by_pending = false;
97
98 let dynamic_level_core =
99 DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
100 let overlap_strategy = create_overlap_strategy(group.compaction_config.compaction_mode());
101 let ctx = dynamic_level_core.calculate_level_base_size(levels);
102 let (mut picker, base_level) = {
103 let target_level = if self.option.level == 0 {
104 ctx.base_level
105 } else if self.option.level == group.compaction_config.max_level as usize {
106 self.option.level
107 } else {
108 self.option.level + 1
109 };
110 if self.option.level > 0 && self.option.level < ctx.base_level {
111 return None;
112 }
113 (
114 ManualCompactionPicker::new(overlap_strategy, self.option.clone(), target_level),
115 ctx.base_level,
116 )
117 };
118
119 let compaction_input =
120 picker.pick_compaction(levels, level_handlers, &mut LocalPickerStatistic::default());
121
122 if compaction_input.is_none()
123 && self.option.exclusive
124 && level_handlers
125 .iter()
126 .any(|level_handler| level_handler.pending_file_count() > 0)
127 {
128 self.blocked_by_pending = true;
129 }
130
131 let compaction_input = compaction_input?;
132 compaction_input.add_pending_task(task_id, level_handlers);
133
134 Some(create_compaction_task(
135 group.compaction_config.as_ref(),
136 compaction_input,
137 base_level,
138 self.task_type(),
139 ))
140 }
141
142 fn name(&self) -> &'static str {
143 "ManualCompactionSelector"
144 }
145
146 fn task_type(&self) -> compact_task::TaskType {
147 compact_task::TaskType::Manual
148 }
149}