risingwave_hummock_sdk/
level.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem::size_of;
16
17use itertools::Itertools;
18use risingwave_pb::hummock::hummock_version::PbLevels;
19use risingwave_pb::hummock::{
20    PbInputLevel, PbLevel, PbLevelType, PbOverlappingLevel, PbSstableInfo,
21};
22
23use crate::sstable_info::SstableInfo;
24
25#[derive(Debug, Clone, PartialEq, Default)]
26pub struct OverlappingLevelCommon<T> {
27    pub sub_levels: Vec<LevelCommon<T>>,
28    pub total_file_size: u64,
29    pub uncompressed_file_size: u64,
30}
31
32pub type OverlappingLevel = OverlappingLevelCommon<SstableInfo>;
33
34impl<T> From<&PbOverlappingLevel> for OverlappingLevelCommon<T>
35where
36    for<'a> LevelCommon<T>: From<&'a PbLevel>,
37{
38    fn from(pb_overlapping_level: &PbOverlappingLevel) -> Self {
39        Self {
40            sub_levels: pb_overlapping_level
41                .sub_levels
42                .iter()
43                .map(LevelCommon::from)
44                .collect_vec(),
45            total_file_size: pb_overlapping_level.total_file_size,
46            uncompressed_file_size: pb_overlapping_level.uncompressed_file_size,
47        }
48    }
49}
50
51impl<T> From<&OverlappingLevelCommon<T>> for PbOverlappingLevel
52where
53    for<'a> &'a LevelCommon<T>: Into<PbLevel>,
54{
55    fn from(overlapping_level: &OverlappingLevelCommon<T>) -> Self {
56        Self {
57            sub_levels: overlapping_level
58                .sub_levels
59                .iter()
60                .map(|level| level.into())
61                .collect_vec(),
62            total_file_size: overlapping_level.total_file_size,
63            uncompressed_file_size: overlapping_level.uncompressed_file_size,
64        }
65    }
66}
67
68impl<T> From<OverlappingLevelCommon<T>> for PbOverlappingLevel
69where
70    LevelCommon<T>: Into<PbLevel>,
71{
72    fn from(overlapping_level: OverlappingLevelCommon<T>) -> Self {
73        Self {
74            sub_levels: overlapping_level
75                .sub_levels
76                .into_iter()
77                .map(|pb_level| pb_level.into())
78                .collect_vec(),
79            total_file_size: overlapping_level.total_file_size,
80            uncompressed_file_size: overlapping_level.uncompressed_file_size,
81        }
82    }
83}
84
85impl<T> From<PbOverlappingLevel> for OverlappingLevelCommon<T>
86where
87    LevelCommon<T>: From<PbLevel>,
88{
89    fn from(pb_overlapping_level: PbOverlappingLevel) -> Self {
90        Self {
91            sub_levels: pb_overlapping_level
92                .sub_levels
93                .into_iter()
94                .map(LevelCommon::from)
95                .collect_vec(),
96            total_file_size: pb_overlapping_level.total_file_size,
97            uncompressed_file_size: pb_overlapping_level.uncompressed_file_size,
98        }
99    }
100}
101
102impl OverlappingLevel {
103    pub fn estimated_encode_len(&self) -> usize {
104        self.sub_levels
105            .iter()
106            .map(|level| level.estimated_encode_len())
107            .sum::<usize>()
108            + size_of::<u64>()
109            + size_of::<u64>()
110    }
111}
112
113#[derive(Debug, Clone, PartialEq, Default)]
114pub struct LevelCommon<T> {
115    pub level_idx: u32,
116    pub level_type: PbLevelType,
117    pub table_infos: Vec<T>,
118    pub total_file_size: u64,
119    pub sub_level_id: u64,
120    pub uncompressed_file_size: u64,
121    pub vnode_partition_count: u32,
122}
123
124pub type Level = LevelCommon<SstableInfo>;
125
126impl<T> From<&PbLevel> for LevelCommon<T>
127where
128    T: for<'a> From<&'a PbSstableInfo>,
129{
130    fn from(pb_level: &PbLevel) -> Self {
131        Self {
132            level_idx: pb_level.level_idx,
133            level_type: PbLevelType::try_from(pb_level.level_type).unwrap(),
134            table_infos: pb_level.table_infos.iter().map(Into::into).collect_vec(),
135            total_file_size: pb_level.total_file_size,
136            sub_level_id: pb_level.sub_level_id,
137            uncompressed_file_size: pb_level.uncompressed_file_size,
138            vnode_partition_count: pb_level.vnode_partition_count,
139        }
140    }
141}
142
143impl<T> From<&LevelCommon<T>> for PbLevel
144where
145    PbSstableInfo: for<'a> From<&'a T>,
146{
147    fn from(level: &LevelCommon<T>) -> Self {
148        Self {
149            level_idx: level.level_idx,
150            level_type: level.level_type.into(),
151            table_infos: level.table_infos.iter().map(Into::into).collect_vec(),
152            total_file_size: level.total_file_size,
153            sub_level_id: level.sub_level_id,
154            uncompressed_file_size: level.uncompressed_file_size,
155            vnode_partition_count: level.vnode_partition_count,
156        }
157    }
158}
159
160impl<T> From<LevelCommon<T>> for PbLevel
161where
162    PbSstableInfo: From<T>,
163{
164    fn from(level: LevelCommon<T>) -> Self {
165        Self {
166            level_idx: level.level_idx,
167            level_type: level.level_type.into(),
168            table_infos: level.table_infos.into_iter().map(Into::into).collect_vec(),
169            total_file_size: level.total_file_size,
170            sub_level_id: level.sub_level_id,
171            uncompressed_file_size: level.uncompressed_file_size,
172            vnode_partition_count: level.vnode_partition_count,
173        }
174    }
175}
176
177impl<T> From<PbLevel> for LevelCommon<T>
178where
179    T: From<PbSstableInfo>,
180{
181    fn from(pb_level: PbLevel) -> Self {
182        Self {
183            level_idx: pb_level.level_idx,
184            level_type: PbLevelType::try_from(pb_level.level_type).unwrap(),
185            table_infos: pb_level
186                .table_infos
187                .into_iter()
188                .map(Into::into)
189                .collect_vec(),
190            total_file_size: pb_level.total_file_size,
191            sub_level_id: pb_level.sub_level_id,
192            uncompressed_file_size: pb_level.uncompressed_file_size,
193            vnode_partition_count: pb_level.vnode_partition_count,
194        }
195    }
196}
197
198impl Level {
199    pub fn estimated_encode_len(&self) -> usize {
200        size_of::<u32>()
201            + size_of::<u32>()
202            + self
203                .table_infos
204                .iter()
205                .map(|sst| sst.estimated_encode_len())
206                .sum::<usize>()
207            + size_of::<u64>()
208            + size_of::<u64>()
209            + size_of::<u64>()
210            + size_of::<u32>()
211    }
212}
213
214#[derive(Debug, Clone, PartialEq, Default)]
215pub struct LevelsCommon<T> {
216    pub levels: Vec<LevelCommon<T>>,
217    pub l0: OverlappingLevelCommon<T>,
218    pub group_id: u64,
219    pub parent_group_id: u64,
220
221    #[deprecated]
222    pub member_table_ids: Vec<u32>,
223    pub compaction_group_version_id: u64,
224}
225
226pub type Levels = LevelsCommon<SstableInfo>;
227
228impl Levels {
229    pub fn level0(&self) -> &OverlappingLevel {
230        &self.l0
231    }
232
233    pub fn get_level(&self, level_idx: usize) -> &Level {
234        &self.levels[level_idx - 1]
235    }
236
237    pub fn get_level_mut(&mut self, level_idx: usize) -> &mut Level {
238        &mut self.levels[level_idx - 1]
239    }
240
241    pub fn is_last_level(&self, level_idx: u32) -> bool {
242        self.levels
243            .last()
244            .as_ref()
245            .is_some_and(|level| level.level_idx == level_idx)
246    }
247
248    pub fn count_ssts(&self) -> usize {
249        self.level0()
250            .sub_levels
251            .iter()
252            .chain(self.levels.iter())
253            .map(|level| level.table_infos.len())
254            .sum()
255    }
256}
257
258impl<T> LevelsCommon<T>
259where
260    PbLevels: for<'a> From<&'a LevelsCommon<T>>,
261{
262    pub fn to_protobuf(&self) -> PbLevels {
263        self.into()
264    }
265}
266
267impl<T> LevelsCommon<T>
268where
269    LevelsCommon<T>: for<'a> From<&'a PbLevels>,
270{
271    pub fn from_protobuf(pb_levels: &PbLevels) -> LevelsCommon<T> {
272        LevelsCommon::<T>::from(pb_levels)
273    }
274}
275
276impl Levels {
277    pub fn estimated_encode_len(&self) -> usize {
278        let mut basic = self
279            .levels
280            .iter()
281            .map(|level| level.estimated_encode_len())
282            .sum::<usize>()
283            + size_of::<u64>()
284            + size_of::<u64>()
285            + size_of::<u32>();
286        basic += self.l0.estimated_encode_len();
287
288        basic
289    }
290}
291
292impl<T> From<&PbLevels> for LevelsCommon<T>
293where
294    T: for<'a> From<&'a PbSstableInfo>,
295{
296    #[expect(deprecated)]
297    fn from(pb_levels: &PbLevels) -> Self {
298        Self {
299            l0: OverlappingLevelCommon::from(pb_levels.l0.as_ref().unwrap()),
300            levels: pb_levels.levels.iter().map(Into::into).collect_vec(),
301            group_id: pb_levels.group_id,
302            parent_group_id: pb_levels.parent_group_id,
303            member_table_ids: pb_levels.member_table_ids.clone(),
304            compaction_group_version_id: pb_levels.compaction_group_version_id,
305        }
306    }
307}
308
309impl<T> From<&LevelsCommon<T>> for PbLevels
310where
311    PbSstableInfo: for<'a> From<&'a T>,
312{
313    #[expect(deprecated)]
314    fn from(levels: &LevelsCommon<T>) -> Self {
315        Self {
316            l0: Some((&levels.l0).into()),
317            levels: levels.levels.iter().map(PbLevel::from).collect_vec(),
318            group_id: levels.group_id,
319            parent_group_id: levels.parent_group_id,
320            member_table_ids: levels.member_table_ids.clone(),
321            compaction_group_version_id: levels.compaction_group_version_id,
322        }
323    }
324}
325
326impl<T> From<PbLevels> for LevelsCommon<T>
327where
328    T: From<PbSstableInfo>,
329{
330    #[expect(deprecated)]
331    fn from(pb_levels: PbLevels) -> Self {
332        Self {
333            l0: OverlappingLevelCommon::from(pb_levels.l0.unwrap()),
334            levels: pb_levels
335                .levels
336                .into_iter()
337                .map(LevelCommon::from)
338                .collect_vec(),
339            group_id: pb_levels.group_id,
340            parent_group_id: pb_levels.parent_group_id,
341            member_table_ids: pb_levels.member_table_ids,
342            compaction_group_version_id: pb_levels.compaction_group_version_id,
343        }
344    }
345}
346
347impl<T> From<LevelsCommon<T>> for PbLevels
348where
349    PbSstableInfo: From<T>,
350{
351    fn from(levels: LevelsCommon<T>) -> Self {
352        #[expect(deprecated)]
353        Self {
354            l0: Some(levels.l0.into()),
355            levels: levels.levels.into_iter().map(PbLevel::from).collect_vec(),
356            group_id: levels.group_id,
357            parent_group_id: levels.parent_group_id,
358            member_table_ids: levels.member_table_ids,
359            compaction_group_version_id: levels.compaction_group_version_id,
360        }
361    }
362}
363
364#[derive(Clone, PartialEq, Default, Debug)]
365pub struct InputLevel {
366    pub level_idx: u32,
367    pub level_type: PbLevelType,
368    pub table_infos: Vec<SstableInfo>,
369}
370
371impl InputLevel {
372    pub fn estimated_encode_len(&self) -> usize {
373        size_of::<u32>()
374            + size_of::<i32>()
375            + self
376                .table_infos
377                .iter()
378                .map(|sst| sst.estimated_encode_len())
379                .sum::<usize>()
380    }
381}
382
383impl From<PbInputLevel> for InputLevel {
384    fn from(pb_input_level: PbInputLevel) -> Self {
385        Self {
386            level_idx: pb_input_level.level_idx,
387            level_type: PbLevelType::try_from(pb_input_level.level_type).unwrap(),
388            table_infos: pb_input_level
389                .table_infos
390                .into_iter()
391                .map(SstableInfo::from)
392                .collect_vec(),
393        }
394    }
395}
396
397impl From<&PbInputLevel> for InputLevel {
398    fn from(pb_input_level: &PbInputLevel) -> Self {
399        Self {
400            level_idx: pb_input_level.level_idx,
401            level_type: PbLevelType::try_from(pb_input_level.level_type).unwrap(),
402            table_infos: pb_input_level
403                .table_infos
404                .iter()
405                .map(SstableInfo::from)
406                .collect_vec(),
407        }
408    }
409}
410
411impl From<InputLevel> for PbInputLevel {
412    fn from(input_level: InputLevel) -> Self {
413        Self {
414            level_idx: input_level.level_idx,
415            level_type: input_level.level_type.into(),
416            table_infos: input_level
417                .table_infos
418                .into_iter()
419                .map(|sst| sst.into())
420                .collect_vec(),
421        }
422    }
423}
424
425impl From<&InputLevel> for PbInputLevel {
426    fn from(input_level: &InputLevel) -> Self {
427        Self {
428            level_idx: input_level.level_idx,
429            level_type: input_level.level_type.into(),
430            table_infos: input_level
431                .table_infos
432                .iter()
433                .map(|sst| sst.into())
434                .collect_vec(),
435        }
436    }
437}