1use std::mem::size_of;
16
17use itertools::Itertools;
18use risingwave_pb::hummock::hummock_version::PbLevels;
19use risingwave_pb::hummock::{
20 PbInputLevel, PbLevel, PbLevelType, PbOverlappingLevel, PbSstableInfo,
21};
22use risingwave_pb::id::CompactionGroupId;
23
24use crate::sstable_info::SstableInfo;
25
26#[derive(Debug, Clone, PartialEq, Default)]
27pub struct OverlappingLevelCommon<T> {
28 pub sub_levels: Vec<LevelCommon<T>>,
29 pub total_file_size: u64,
30 pub uncompressed_file_size: u64,
31}
32
33pub type OverlappingLevel = OverlappingLevelCommon<SstableInfo>;
34
35impl<T> From<&PbOverlappingLevel> for OverlappingLevelCommon<T>
36where
37 for<'a> LevelCommon<T>: From<&'a PbLevel>,
38{
39 fn from(pb_overlapping_level: &PbOverlappingLevel) -> Self {
40 Self {
41 sub_levels: pb_overlapping_level
42 .sub_levels
43 .iter()
44 .map(LevelCommon::from)
45 .collect_vec(),
46 total_file_size: pb_overlapping_level.total_file_size,
47 uncompressed_file_size: pb_overlapping_level.uncompressed_file_size,
48 }
49 }
50}
51
52impl<T> From<&OverlappingLevelCommon<T>> for PbOverlappingLevel
53where
54 for<'a> &'a LevelCommon<T>: Into<PbLevel>,
55{
56 fn from(overlapping_level: &OverlappingLevelCommon<T>) -> Self {
57 Self {
58 sub_levels: overlapping_level
59 .sub_levels
60 .iter()
61 .map(|level| level.into())
62 .collect_vec(),
63 total_file_size: overlapping_level.total_file_size,
64 uncompressed_file_size: overlapping_level.uncompressed_file_size,
65 }
66 }
67}
68
69impl<T> From<OverlappingLevelCommon<T>> for PbOverlappingLevel
70where
71 LevelCommon<T>: Into<PbLevel>,
72{
73 fn from(overlapping_level: OverlappingLevelCommon<T>) -> Self {
74 Self {
75 sub_levels: overlapping_level
76 .sub_levels
77 .into_iter()
78 .map(|pb_level| pb_level.into())
79 .collect_vec(),
80 total_file_size: overlapping_level.total_file_size,
81 uncompressed_file_size: overlapping_level.uncompressed_file_size,
82 }
83 }
84}
85
86impl<T> From<PbOverlappingLevel> for OverlappingLevelCommon<T>
87where
88 LevelCommon<T>: From<PbLevel>,
89{
90 fn from(pb_overlapping_level: PbOverlappingLevel) -> Self {
91 Self {
92 sub_levels: pb_overlapping_level
93 .sub_levels
94 .into_iter()
95 .map(LevelCommon::from)
96 .collect_vec(),
97 total_file_size: pb_overlapping_level.total_file_size,
98 uncompressed_file_size: pb_overlapping_level.uncompressed_file_size,
99 }
100 }
101}
102
103impl OverlappingLevel {
104 pub fn estimated_encode_len(&self) -> usize {
105 self.sub_levels
106 .iter()
107 .map(|level| level.estimated_encode_len())
108 .sum::<usize>()
109 + size_of::<u64>()
110 + size_of::<u64>()
111 }
112}
113
114#[derive(Debug, Clone, PartialEq, Default)]
115pub struct LevelCommon<T> {
116 pub level_idx: u32,
117 pub level_type: PbLevelType,
118 pub table_infos: Vec<T>,
119 pub total_file_size: u64,
120 pub sub_level_id: u64,
121 pub uncompressed_file_size: u64,
122 pub vnode_partition_count: u32,
123}
124
125pub type Level = LevelCommon<SstableInfo>;
126
127impl<T> From<&PbLevel> for LevelCommon<T>
128where
129 T: for<'a> From<&'a PbSstableInfo>,
130{
131 fn from(pb_level: &PbLevel) -> Self {
132 Self {
133 level_idx: pb_level.level_idx,
134 level_type: PbLevelType::try_from(pb_level.level_type).unwrap(),
135 table_infos: pb_level.table_infos.iter().map(Into::into).collect_vec(),
136 total_file_size: pb_level.total_file_size,
137 sub_level_id: pb_level.sub_level_id,
138 uncompressed_file_size: pb_level.uncompressed_file_size,
139 vnode_partition_count: pb_level.vnode_partition_count,
140 }
141 }
142}
143
144impl<T> From<&LevelCommon<T>> for PbLevel
145where
146 PbSstableInfo: for<'a> From<&'a T>,
147{
148 fn from(level: &LevelCommon<T>) -> Self {
149 Self {
150 level_idx: level.level_idx,
151 level_type: level.level_type.into(),
152 table_infos: level.table_infos.iter().map(Into::into).collect_vec(),
153 total_file_size: level.total_file_size,
154 sub_level_id: level.sub_level_id,
155 uncompressed_file_size: level.uncompressed_file_size,
156 vnode_partition_count: level.vnode_partition_count,
157 }
158 }
159}
160
161impl<T> From<LevelCommon<T>> for PbLevel
162where
163 PbSstableInfo: From<T>,
164{
165 fn from(level: LevelCommon<T>) -> Self {
166 Self {
167 level_idx: level.level_idx,
168 level_type: level.level_type.into(),
169 table_infos: level.table_infos.into_iter().map(Into::into).collect_vec(),
170 total_file_size: level.total_file_size,
171 sub_level_id: level.sub_level_id,
172 uncompressed_file_size: level.uncompressed_file_size,
173 vnode_partition_count: level.vnode_partition_count,
174 }
175 }
176}
177
178impl<T> From<PbLevel> for LevelCommon<T>
179where
180 T: From<PbSstableInfo>,
181{
182 fn from(pb_level: PbLevel) -> Self {
183 Self {
184 level_idx: pb_level.level_idx,
185 level_type: PbLevelType::try_from(pb_level.level_type).unwrap(),
186 table_infos: pb_level
187 .table_infos
188 .into_iter()
189 .map(Into::into)
190 .collect_vec(),
191 total_file_size: pb_level.total_file_size,
192 sub_level_id: pb_level.sub_level_id,
193 uncompressed_file_size: pb_level.uncompressed_file_size,
194 vnode_partition_count: pb_level.vnode_partition_count,
195 }
196 }
197}
198
199impl Level {
200 pub fn estimated_encode_len(&self) -> usize {
201 size_of::<u32>()
202 + size_of::<u32>()
203 + self
204 .table_infos
205 .iter()
206 .map(|sst| sst.estimated_encode_len())
207 .sum::<usize>()
208 + size_of::<u64>()
209 + size_of::<u64>()
210 + size_of::<u64>()
211 + size_of::<u32>()
212 }
213}
214
215#[derive(Debug, Clone, PartialEq, Default)]
216pub struct LevelsCommon<T> {
217 pub levels: Vec<LevelCommon<T>>,
218 pub l0: OverlappingLevelCommon<T>,
219 pub group_id: CompactionGroupId,
220 pub parent_group_id: CompactionGroupId,
221
222 #[deprecated]
223 pub member_table_ids: Vec<u32>,
224 pub compaction_group_version_id: u64,
225}
226
227pub type Levels = LevelsCommon<SstableInfo>;
228
229impl Levels {
230 pub fn level0(&self) -> &OverlappingLevel {
231 &self.l0
232 }
233
234 pub fn get_level(&self, level_idx: usize) -> &Level {
235 &self.levels[level_idx - 1]
236 }
237
238 pub fn get_level_mut(&mut self, level_idx: usize) -> &mut Level {
239 &mut self.levels[level_idx - 1]
240 }
241
242 pub fn is_last_level(&self, level_idx: u32) -> bool {
243 self.levels
244 .last()
245 .as_ref()
246 .is_some_and(|level| level.level_idx == level_idx)
247 }
248
249 pub fn count_ssts(&self) -> usize {
250 self.level0()
251 .sub_levels
252 .iter()
253 .chain(self.levels.iter())
254 .map(|level| level.table_infos.len())
255 .sum()
256 }
257}
258
259impl<T> LevelsCommon<T>
260where
261 PbLevels: for<'a> From<&'a LevelsCommon<T>>,
262{
263 pub fn to_protobuf(&self) -> PbLevels {
264 self.into()
265 }
266}
267
268impl<T> LevelsCommon<T>
269where
270 LevelsCommon<T>: for<'a> From<&'a PbLevels>,
271{
272 pub fn from_protobuf(pb_levels: &PbLevels) -> LevelsCommon<T> {
273 LevelsCommon::<T>::from(pb_levels)
274 }
275}
276
277impl Levels {
278 pub fn estimated_encode_len(&self) -> usize {
279 let mut basic = self
280 .levels
281 .iter()
282 .map(|level| level.estimated_encode_len())
283 .sum::<usize>()
284 + size_of::<u64>()
285 + size_of::<u64>()
286 + size_of::<u32>();
287 basic += self.l0.estimated_encode_len();
288
289 basic
290 }
291}
292
293impl<T> From<&PbLevels> for LevelsCommon<T>
294where
295 T: for<'a> From<&'a PbSstableInfo>,
296{
297 #[expect(deprecated)]
298 fn from(pb_levels: &PbLevels) -> Self {
299 Self {
300 l0: OverlappingLevelCommon::from(pb_levels.l0.as_ref().unwrap()),
301 levels: pb_levels.levels.iter().map(Into::into).collect_vec(),
302 group_id: pb_levels.group_id,
303 parent_group_id: pb_levels.parent_group_id,
304 member_table_ids: pb_levels.member_table_ids.clone(),
305 compaction_group_version_id: pb_levels.compaction_group_version_id,
306 }
307 }
308}
309
310impl<T> From<&LevelsCommon<T>> for PbLevels
311where
312 PbSstableInfo: for<'a> From<&'a T>,
313{
314 #[expect(deprecated)]
315 fn from(levels: &LevelsCommon<T>) -> Self {
316 Self {
317 l0: Some((&levels.l0).into()),
318 levels: levels.levels.iter().map(PbLevel::from).collect_vec(),
319 group_id: levels.group_id,
320 parent_group_id: levels.parent_group_id,
321 member_table_ids: levels.member_table_ids.clone(),
322 compaction_group_version_id: levels.compaction_group_version_id,
323 }
324 }
325}
326
327impl<T> From<PbLevels> for LevelsCommon<T>
328where
329 T: From<PbSstableInfo>,
330{
331 #[expect(deprecated)]
332 fn from(pb_levels: PbLevels) -> Self {
333 Self {
334 l0: OverlappingLevelCommon::from(pb_levels.l0.unwrap()),
335 levels: pb_levels
336 .levels
337 .into_iter()
338 .map(LevelCommon::from)
339 .collect_vec(),
340 group_id: pb_levels.group_id,
341 parent_group_id: pb_levels.parent_group_id,
342 member_table_ids: pb_levels.member_table_ids,
343 compaction_group_version_id: pb_levels.compaction_group_version_id,
344 }
345 }
346}
347
348impl<T> From<LevelsCommon<T>> for PbLevels
349where
350 PbSstableInfo: From<T>,
351{
352 fn from(levels: LevelsCommon<T>) -> Self {
353 #[expect(deprecated)]
354 Self {
355 l0: Some(levels.l0.into()),
356 levels: levels.levels.into_iter().map(PbLevel::from).collect_vec(),
357 group_id: levels.group_id,
358 parent_group_id: levels.parent_group_id,
359 member_table_ids: levels.member_table_ids,
360 compaction_group_version_id: levels.compaction_group_version_id,
361 }
362 }
363}
364
365#[derive(Clone, PartialEq, Default, Debug)]
366pub struct InputLevel {
367 pub level_idx: u32,
368 pub level_type: PbLevelType,
369 pub table_infos: Vec<SstableInfo>,
370}
371
372impl InputLevel {
373 pub fn estimated_encode_len(&self) -> usize {
374 size_of::<u32>()
375 + size_of::<i32>()
376 + self
377 .table_infos
378 .iter()
379 .map(|sst| sst.estimated_encode_len())
380 .sum::<usize>()
381 }
382}
383
384impl From<PbInputLevel> for InputLevel {
385 fn from(pb_input_level: PbInputLevel) -> Self {
386 Self {
387 level_idx: pb_input_level.level_idx,
388 level_type: PbLevelType::try_from(pb_input_level.level_type).unwrap(),
389 table_infos: pb_input_level
390 .table_infos
391 .into_iter()
392 .map(SstableInfo::from)
393 .collect_vec(),
394 }
395 }
396}
397
398impl From<&PbInputLevel> for InputLevel {
399 fn from(pb_input_level: &PbInputLevel) -> Self {
400 Self {
401 level_idx: pb_input_level.level_idx,
402 level_type: PbLevelType::try_from(pb_input_level.level_type).unwrap(),
403 table_infos: pb_input_level
404 .table_infos
405 .iter()
406 .map(SstableInfo::from)
407 .collect_vec(),
408 }
409 }
410}
411
412impl From<InputLevel> for PbInputLevel {
413 fn from(input_level: InputLevel) -> Self {
414 Self {
415 level_idx: input_level.level_idx,
416 level_type: input_level.level_type.into(),
417 table_infos: input_level
418 .table_infos
419 .into_iter()
420 .map(|sst| sst.into())
421 .collect_vec(),
422 }
423 }
424}
425
426impl From<&InputLevel> for PbInputLevel {
427 fn from(input_level: &InputLevel) -> Self {
428 Self {
429 level_idx: input_level.level_idx,
430 level_type: input_level.level_type.into(),
431 table_infos: input_level
432 .table_infos
433 .iter()
434 .map(|sst| sst.into())
435 .collect_vec(),
436 }
437 }
438}