risingwave_meta/hummock/
level_handler.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use itertools::Itertools;
18use risingwave_hummock_sdk::level::Level;
19use risingwave_hummock_sdk::sstable_info::SstableInfo;
20use risingwave_hummock_sdk::{HummockCompactionTaskId, HummockSstableId};
21use risingwave_pb::hummock::level_handler::RunningCompactTask;
22
23#[derive(Clone, Debug, PartialEq)]
24pub struct LevelHandler {
25    level: u32,
26    compacting_files: HashMap<HummockSstableId, HummockCompactionTaskId>,
27    pending_tasks: Vec<RunningCompactTask>,
28}
29
30impl LevelHandler {
31    pub fn new(level: u32) -> Self {
32        Self {
33            level,
34            compacting_files: HashMap::default(),
35            pending_tasks: vec![],
36        }
37    }
38
39    pub fn get_level(&self) -> u32 {
40        self.level
41    }
42
43    pub fn remove_task(&mut self, target_task_id: u64) {
44        for task in &self.pending_tasks {
45            if task.task_id == target_task_id {
46                for sst in &task.ssts {
47                    self.compacting_files.remove(sst);
48                }
49            }
50        }
51        self.pending_tasks
52            .retain(|task| task.task_id != target_task_id);
53    }
54
55    pub fn is_pending_compact(&self, sst_id: &HummockSstableId) -> bool {
56        self.compacting_files.contains_key(sst_id)
57    }
58
59    pub fn pending_task_id_by_sst(
60        &self,
61        sst_id: &HummockSstableId,
62    ) -> Option<HummockCompactionTaskId> {
63        self.compacting_files.get(sst_id).cloned()
64    }
65
66    pub fn is_level_pending_compact(&self, level: &Level) -> bool {
67        level
68            .table_infos
69            .iter()
70            .any(|table| self.compacting_files.contains_key(&table.sst_id))
71    }
72
73    pub fn is_level_all_pending_compact(&self, level: &Level) -> bool {
74        if level.table_infos.is_empty() {
75            return false;
76        }
77
78        level
79            .table_infos
80            .iter()
81            .all(|table| self.compacting_files.contains_key(&table.sst_id))
82    }
83
84    pub fn add_pending_task<'a>(
85        &mut self,
86        task_id: u64,
87        target_level: usize,
88        ssts: impl IntoIterator<Item = &'a SstableInfo>,
89    ) {
90        let target_level = target_level as u32;
91        let mut table_ids = vec![];
92        let mut total_file_size = 0;
93        for sst in ssts {
94            self.compacting_files.insert(sst.sst_id, task_id);
95            total_file_size += sst.sst_size;
96            table_ids.push(sst.sst_id);
97        }
98
99        self.pending_tasks.push(RunningCompactTask {
100            task_id,
101            target_level,
102            total_file_size,
103            ssts: table_ids,
104        });
105    }
106
107    pub fn pending_file_count(&self) -> usize {
108        self.compacting_files.len()
109    }
110
111    pub fn pending_file_size(&self) -> u64 {
112        self.pending_tasks
113            .iter()
114            .map(|task| task.total_file_size)
115            .sum::<u64>()
116    }
117
118    pub fn pending_output_file_size(&self, target_level: u32) -> u64 {
119        self.pending_tasks
120            .iter()
121            .filter(|task| task.target_level == target_level)
122            .map(|task| task.total_file_size)
123            .sum::<u64>()
124    }
125
126    pub fn pending_tasks_ids(&self) -> Vec<u64> {
127        self.pending_tasks
128            .iter()
129            .map(|task| task.task_id)
130            .collect_vec()
131    }
132
133    pub fn pending_tasks(&self) -> &[RunningCompactTask] {
134        &self.pending_tasks
135    }
136
137    pub fn compacting_files(&self) -> &HashMap<HummockSstableId, HummockCompactionTaskId> {
138        &self.compacting_files
139    }
140
141    #[cfg(test)]
142    pub(crate) fn test_add_pending_sst(&mut self, sst_id: HummockSstableId, task_id: u64) {
143        self.compacting_files.insert(sst_id, task_id);
144    }
145}
146
147impl From<&LevelHandler> for risingwave_pb::hummock::LevelHandler {
148    fn from(lh: &LevelHandler) -> Self {
149        risingwave_pb::hummock::LevelHandler {
150            level: lh.level,
151            tasks: lh.pending_tasks.clone(),
152        }
153    }
154}
155
156impl From<&risingwave_pb::hummock::LevelHandler> for LevelHandler {
157    fn from(lh: &risingwave_pb::hummock::LevelHandler) -> Self {
158        let mut pending_tasks = vec![];
159        let mut compacting_files = HashMap::new();
160        for task in &lh.tasks {
161            pending_tasks.push(task.clone());
162            for s in &task.ssts {
163                compacting_files.insert(*s, task.task_id);
164            }
165        }
166        Self {
167            pending_tasks,
168            compacting_files,
169            level: lh.level,
170        }
171    }
172}