risingwave_hummock_sdk/
level.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem::size_of;
16
17use itertools::Itertools;
18use risingwave_pb::hummock::hummock_version::PbLevels;
19use risingwave_pb::hummock::{
20    PbInputLevel, PbLevel, PbLevelType, PbOverlappingLevel, PbSstableInfo,
21};
22use risingwave_pb::id::CompactionGroupId;
23
24use crate::sstable_info::SstableInfo;
25
26#[derive(Debug, Clone, PartialEq, Default)]
27pub struct OverlappingLevelCommon<T> {
28    pub sub_levels: Vec<LevelCommon<T>>,
29    pub total_file_size: u64,
30    pub uncompressed_file_size: u64,
31}
32
33pub type OverlappingLevel = OverlappingLevelCommon<SstableInfo>;
34
35impl<T> From<&PbOverlappingLevel> for OverlappingLevelCommon<T>
36where
37    for<'a> LevelCommon<T>: From<&'a PbLevel>,
38{
39    fn from(pb_overlapping_level: &PbOverlappingLevel) -> Self {
40        Self {
41            sub_levels: pb_overlapping_level
42                .sub_levels
43                .iter()
44                .map(LevelCommon::from)
45                .collect_vec(),
46            total_file_size: pb_overlapping_level.total_file_size,
47            uncompressed_file_size: pb_overlapping_level.uncompressed_file_size,
48        }
49    }
50}
51
52impl<T> From<&OverlappingLevelCommon<T>> for PbOverlappingLevel
53where
54    for<'a> &'a LevelCommon<T>: Into<PbLevel>,
55{
56    fn from(overlapping_level: &OverlappingLevelCommon<T>) -> Self {
57        Self {
58            sub_levels: overlapping_level
59                .sub_levels
60                .iter()
61                .map(|level| level.into())
62                .collect_vec(),
63            total_file_size: overlapping_level.total_file_size,
64            uncompressed_file_size: overlapping_level.uncompressed_file_size,
65        }
66    }
67}
68
69impl<T> From<OverlappingLevelCommon<T>> for PbOverlappingLevel
70where
71    LevelCommon<T>: Into<PbLevel>,
72{
73    fn from(overlapping_level: OverlappingLevelCommon<T>) -> Self {
74        Self {
75            sub_levels: overlapping_level
76                .sub_levels
77                .into_iter()
78                .map(|pb_level| pb_level.into())
79                .collect_vec(),
80            total_file_size: overlapping_level.total_file_size,
81            uncompressed_file_size: overlapping_level.uncompressed_file_size,
82        }
83    }
84}
85
86impl<T> From<PbOverlappingLevel> for OverlappingLevelCommon<T>
87where
88    LevelCommon<T>: From<PbLevel>,
89{
90    fn from(pb_overlapping_level: PbOverlappingLevel) -> Self {
91        Self {
92            sub_levels: pb_overlapping_level
93                .sub_levels
94                .into_iter()
95                .map(LevelCommon::from)
96                .collect_vec(),
97            total_file_size: pb_overlapping_level.total_file_size,
98            uncompressed_file_size: pb_overlapping_level.uncompressed_file_size,
99        }
100    }
101}
102
103impl OverlappingLevel {
104    pub fn estimated_encode_len(&self) -> usize {
105        self.sub_levels
106            .iter()
107            .map(|level| level.estimated_encode_len())
108            .sum::<usize>()
109            + size_of::<u64>()
110            + size_of::<u64>()
111    }
112}
113
114#[derive(Debug, Clone, PartialEq, Default)]
115pub struct LevelCommon<T> {
116    pub level_idx: u32,
117    pub level_type: PbLevelType,
118    pub table_infos: Vec<T>,
119    pub total_file_size: u64,
120    pub sub_level_id: u64,
121    pub uncompressed_file_size: u64,
122    pub vnode_partition_count: u32,
123}
124
125pub type Level = LevelCommon<SstableInfo>;
126
127impl<T> From<&PbLevel> for LevelCommon<T>
128where
129    T: for<'a> From<&'a PbSstableInfo>,
130{
131    fn from(pb_level: &PbLevel) -> Self {
132        Self {
133            level_idx: pb_level.level_idx,
134            level_type: PbLevelType::try_from(pb_level.level_type).unwrap(),
135            table_infos: pb_level.table_infos.iter().map(Into::into).collect_vec(),
136            total_file_size: pb_level.total_file_size,
137            sub_level_id: pb_level.sub_level_id,
138            uncompressed_file_size: pb_level.uncompressed_file_size,
139            vnode_partition_count: pb_level.vnode_partition_count,
140        }
141    }
142}
143
144impl<T> From<&LevelCommon<T>> for PbLevel
145where
146    PbSstableInfo: for<'a> From<&'a T>,
147{
148    fn from(level: &LevelCommon<T>) -> Self {
149        Self {
150            level_idx: level.level_idx,
151            level_type: level.level_type.into(),
152            table_infos: level.table_infos.iter().map(Into::into).collect_vec(),
153            total_file_size: level.total_file_size,
154            sub_level_id: level.sub_level_id,
155            uncompressed_file_size: level.uncompressed_file_size,
156            vnode_partition_count: level.vnode_partition_count,
157        }
158    }
159}
160
161impl<T> From<LevelCommon<T>> for PbLevel
162where
163    PbSstableInfo: From<T>,
164{
165    fn from(level: LevelCommon<T>) -> Self {
166        Self {
167            level_idx: level.level_idx,
168            level_type: level.level_type.into(),
169            table_infos: level.table_infos.into_iter().map(Into::into).collect_vec(),
170            total_file_size: level.total_file_size,
171            sub_level_id: level.sub_level_id,
172            uncompressed_file_size: level.uncompressed_file_size,
173            vnode_partition_count: level.vnode_partition_count,
174        }
175    }
176}
177
178impl<T> From<PbLevel> for LevelCommon<T>
179where
180    T: From<PbSstableInfo>,
181{
182    fn from(pb_level: PbLevel) -> Self {
183        Self {
184            level_idx: pb_level.level_idx,
185            level_type: PbLevelType::try_from(pb_level.level_type).unwrap(),
186            table_infos: pb_level
187                .table_infos
188                .into_iter()
189                .map(Into::into)
190                .collect_vec(),
191            total_file_size: pb_level.total_file_size,
192            sub_level_id: pb_level.sub_level_id,
193            uncompressed_file_size: pb_level.uncompressed_file_size,
194            vnode_partition_count: pb_level.vnode_partition_count,
195        }
196    }
197}
198
199impl Level {
200    pub fn estimated_encode_len(&self) -> usize {
201        size_of::<u32>()
202            + size_of::<u32>()
203            + self
204                .table_infos
205                .iter()
206                .map(|sst| sst.estimated_encode_len())
207                .sum::<usize>()
208            + size_of::<u64>()
209            + size_of::<u64>()
210            + size_of::<u64>()
211            + size_of::<u32>()
212    }
213}
214
215#[derive(Debug, Clone, PartialEq, Default)]
216pub struct LevelsCommon<T> {
217    pub levels: Vec<LevelCommon<T>>,
218    pub l0: OverlappingLevelCommon<T>,
219    pub group_id: CompactionGroupId,
220    pub parent_group_id: CompactionGroupId,
221
222    #[deprecated]
223    pub member_table_ids: Vec<u32>,
224    pub compaction_group_version_id: u64,
225}
226
227pub type Levels = LevelsCommon<SstableInfo>;
228
229impl Levels {
230    pub fn level0(&self) -> &OverlappingLevel {
231        &self.l0
232    }
233
234    pub fn get_level(&self, level_idx: usize) -> &Level {
235        &self.levels[level_idx - 1]
236    }
237
238    pub fn get_level_mut(&mut self, level_idx: usize) -> &mut Level {
239        &mut self.levels[level_idx - 1]
240    }
241
242    pub fn is_last_level(&self, level_idx: u32) -> bool {
243        self.levels
244            .last()
245            .as_ref()
246            .is_some_and(|level| level.level_idx == level_idx)
247    }
248
249    pub fn count_ssts(&self) -> usize {
250        self.level0()
251            .sub_levels
252            .iter()
253            .chain(self.levels.iter())
254            .map(|level| level.table_infos.len())
255            .sum()
256    }
257}
258
259impl<T> LevelsCommon<T>
260where
261    PbLevels: for<'a> From<&'a LevelsCommon<T>>,
262{
263    pub fn to_protobuf(&self) -> PbLevels {
264        self.into()
265    }
266}
267
268impl<T> LevelsCommon<T>
269where
270    LevelsCommon<T>: for<'a> From<&'a PbLevels>,
271{
272    pub fn from_protobuf(pb_levels: &PbLevels) -> LevelsCommon<T> {
273        LevelsCommon::<T>::from(pb_levels)
274    }
275}
276
277impl Levels {
278    pub fn estimated_encode_len(&self) -> usize {
279        let mut basic = self
280            .levels
281            .iter()
282            .map(|level| level.estimated_encode_len())
283            .sum::<usize>()
284            + size_of::<u64>()
285            + size_of::<u64>()
286            + size_of::<u32>();
287        basic += self.l0.estimated_encode_len();
288
289        basic
290    }
291}
292
293impl<T> From<&PbLevels> for LevelsCommon<T>
294where
295    T: for<'a> From<&'a PbSstableInfo>,
296{
297    #[expect(deprecated)]
298    fn from(pb_levels: &PbLevels) -> Self {
299        Self {
300            l0: OverlappingLevelCommon::from(pb_levels.l0.as_ref().unwrap()),
301            levels: pb_levels.levels.iter().map(Into::into).collect_vec(),
302            group_id: pb_levels.group_id,
303            parent_group_id: pb_levels.parent_group_id,
304            member_table_ids: pb_levels.member_table_ids.clone(),
305            compaction_group_version_id: pb_levels.compaction_group_version_id,
306        }
307    }
308}
309
310impl<T> From<&LevelsCommon<T>> for PbLevels
311where
312    PbSstableInfo: for<'a> From<&'a T>,
313{
314    #[expect(deprecated)]
315    fn from(levels: &LevelsCommon<T>) -> Self {
316        Self {
317            l0: Some((&levels.l0).into()),
318            levels: levels.levels.iter().map(PbLevel::from).collect_vec(),
319            group_id: levels.group_id,
320            parent_group_id: levels.parent_group_id,
321            member_table_ids: levels.member_table_ids.clone(),
322            compaction_group_version_id: levels.compaction_group_version_id,
323        }
324    }
325}
326
327impl<T> From<PbLevels> for LevelsCommon<T>
328where
329    T: From<PbSstableInfo>,
330{
331    #[expect(deprecated)]
332    fn from(pb_levels: PbLevels) -> Self {
333        Self {
334            l0: OverlappingLevelCommon::from(pb_levels.l0.unwrap()),
335            levels: pb_levels
336                .levels
337                .into_iter()
338                .map(LevelCommon::from)
339                .collect_vec(),
340            group_id: pb_levels.group_id,
341            parent_group_id: pb_levels.parent_group_id,
342            member_table_ids: pb_levels.member_table_ids,
343            compaction_group_version_id: pb_levels.compaction_group_version_id,
344        }
345    }
346}
347
348impl<T> From<LevelsCommon<T>> for PbLevels
349where
350    PbSstableInfo: From<T>,
351{
352    fn from(levels: LevelsCommon<T>) -> Self {
353        #[expect(deprecated)]
354        Self {
355            l0: Some(levels.l0.into()),
356            levels: levels.levels.into_iter().map(PbLevel::from).collect_vec(),
357            group_id: levels.group_id,
358            parent_group_id: levels.parent_group_id,
359            member_table_ids: levels.member_table_ids,
360            compaction_group_version_id: levels.compaction_group_version_id,
361        }
362    }
363}
364
365#[derive(Clone, PartialEq, Default, Debug)]
366pub struct InputLevel {
367    pub level_idx: u32,
368    pub level_type: PbLevelType,
369    pub table_infos: Vec<SstableInfo>,
370}
371
372impl InputLevel {
373    pub fn estimated_encode_len(&self) -> usize {
374        size_of::<u32>()
375            + size_of::<i32>()
376            + self
377                .table_infos
378                .iter()
379                .map(|sst| sst.estimated_encode_len())
380                .sum::<usize>()
381    }
382}
383
384impl From<PbInputLevel> for InputLevel {
385    fn from(pb_input_level: PbInputLevel) -> Self {
386        Self {
387            level_idx: pb_input_level.level_idx,
388            level_type: PbLevelType::try_from(pb_input_level.level_type).unwrap(),
389            table_infos: pb_input_level
390                .table_infos
391                .into_iter()
392                .map(SstableInfo::from)
393                .collect_vec(),
394        }
395    }
396}
397
398impl From<&PbInputLevel> for InputLevel {
399    fn from(pb_input_level: &PbInputLevel) -> Self {
400        Self {
401            level_idx: pb_input_level.level_idx,
402            level_type: PbLevelType::try_from(pb_input_level.level_type).unwrap(),
403            table_infos: pb_input_level
404                .table_infos
405                .iter()
406                .map(SstableInfo::from)
407                .collect_vec(),
408        }
409    }
410}
411
412impl From<InputLevel> for PbInputLevel {
413    fn from(input_level: InputLevel) -> Self {
414        Self {
415            level_idx: input_level.level_idx,
416            level_type: input_level.level_type.into(),
417            table_infos: input_level
418                .table_infos
419                .into_iter()
420                .map(|sst| sst.into())
421                .collect_vec(),
422        }
423    }
424}
425
426impl From<&InputLevel> for PbInputLevel {
427    fn from(input_level: &InputLevel) -> Self {
428        Self {
429            level_idx: input_level.level_idx,
430            level_type: input_level.level_type.into(),
431            table_infos: input_level
432                .table_infos
433                .iter()
434                .map(|sst| sst.into())
435                .collect_vec(),
436        }
437    }
438}