risingwave_meta/hummock/
level_handler.rs1use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_hummock_sdk::level::Level;
19use risingwave_hummock_sdk::sstable_info::SstableInfo;
20use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockSstableId};
21use risingwave_pb::hummock::level_handler::RunningCompactTask;
22
23#[derive(Clone, Debug, PartialEq)]
24pub struct LevelHandler {
25 level: u32,
26 compacting_files: HashMap<HummockSstableId, HummockCompactionTaskId>,
27 pending_tasks: Vec<RunningCompactTask>,
28}
29
30impl LevelHandler {
31 pub fn new(level: u32) -> Self {
32 Self {
33 level,
34 compacting_files: HashMap::default(),
35 pending_tasks: vec![],
36 }
37 }
38
39 pub fn get_level(&self) -> u32 {
40 self.level
41 }
42
43 pub fn remove_task(&mut self, target_task_id: u64) {
44 for task in &self.pending_tasks {
45 if task.task_id == target_task_id {
46 for sst in &task.ssts {
47 self.compacting_files.remove(sst);
48 }
49 }
50 }
51 self.pending_tasks
52 .retain(|task| task.task_id != target_task_id);
53 }
54
55 pub fn is_pending_compact(&self, sst_id: &HummockSstableId) -> bool {
56 self.compacting_files.contains_key(sst_id)
57 }
58
59 pub fn pending_task_id_by_sst(
60 &self,
61 sst_id: &HummockSstableId,
62 ) -> Option<HummockCompactionTaskId> {
63 self.compacting_files.get(sst_id).cloned()
64 }
65
66 pub fn is_level_pending_compact(&self, level: &Level) -> bool {
67 level
68 .table_infos
69 .iter()
70 .any(|table| self.compacting_files.contains_key(&table.sst_id))
71 }
72
73 pub fn is_level_all_pending_compact(&self, level: &Level) -> bool {
74 if level.table_infos.is_empty() {
75 return false;
76 }
77
78 level
79 .table_infos
80 .iter()
81 .all(|table| self.compacting_files.contains_key(&table.sst_id))
82 }
83
84 pub fn add_pending_task<'a>(
85 &mut self,
86 task_id: u64,
87 target_level: usize,
88 ssts: impl IntoIterator<Item = &'a SstableInfo>,
89 ) {
90 let target_level = target_level as u32;
91 let mut table_ids = vec![];
92 let mut total_file_size = 0;
93 for sst in ssts {
94 self.compacting_files.insert(sst.sst_id, task_id);
95 total_file_size += sst.sst_size;
96 table_ids.push(sst.sst_id);
97 }
98
99 self.pending_tasks.push(RunningCompactTask {
100 task_id,
101 target_level,
102 total_file_size,
103 ssts: table_ids,
104 });
105 }
106
107 pub fn pending_file_count(&self) -> usize {
108 self.compacting_files.len()
109 }
110
111 pub fn pending_file_size(&self) -> u64 {
112 self.pending_tasks
113 .iter()
114 .map(|task| task.total_file_size)
115 .sum::<u64>()
116 }
117
118 pub fn pending_output_file_size(&self, target_level: u32) -> u64 {
119 self.pending_tasks
120 .iter()
121 .filter(|task| task.target_level == target_level)
122 .map(|task| task.total_file_size)
123 .sum::<u64>()
124 }
125
126 pub fn pending_tasks_ids(&self) -> Vec<u64> {
127 self.pending_tasks
128 .iter()
129 .map(|task| task.task_id)
130 .collect_vec()
131 }
132
133 pub fn pending_tasks(&self) -> &[RunningCompactTask] {
134 &self.pending_tasks
135 }
136
137 pub fn compacting_files(&self) -> &HashMap<HummockSstableId, HummockCompactionTaskId> {
138 &self.compacting_files
139 }
140
141 #[cfg(test)]
142 pub(crate) fn test_add_pending_sst(&mut self, sst_id: HummockSstableId, task_id: u64) {
143 self.compacting_files.insert(sst_id, task_id);
144 }
145}
146
147impl From<&LevelHandler> for risingwave_pb::hummock::LevelHandler {
148 fn from(lh: &LevelHandler) -> Self {
149 risingwave_pb::hummock::LevelHandler {
150 level: lh.level,
151 tasks: lh.pending_tasks.clone(),
152 }
153 }
154}
155
156impl From<&risingwave_pb::hummock::LevelHandler> for LevelHandler {
157 fn from(lh: &risingwave_pb::hummock::LevelHandler) -> Self {
158 let mut pending_tasks = vec![];
159 let mut compacting_files = HashMap::new();
160 for task in &lh.tasks {
161 pending_tasks.push(task.clone());
162 for s in &task.ssts {
163 compacting_files.insert(*s, task.task_id);
164 }
165 }
166 Self {
167 pending_tasks,
168 compacting_files,
169 level: lh.level,
170 }
171 }
172}