1use std::mem::size_of;
16
17use itertools::Itertools;
18use risingwave_pb::hummock::hummock_version::PbLevels;
19use risingwave_pb::hummock::{
20 PbInputLevel, PbLevel, PbLevelType, PbOverlappingLevel, PbSstableInfo,
21};
22
23use crate::sstable_info::SstableInfo;
24
25#[derive(Debug, Clone, PartialEq, Default)]
26pub struct OverlappingLevelCommon<T> {
27 pub sub_levels: Vec<LevelCommon<T>>,
28 pub total_file_size: u64,
29 pub uncompressed_file_size: u64,
30}
31
32pub type OverlappingLevel = OverlappingLevelCommon<SstableInfo>;
33
34impl<T> From<&PbOverlappingLevel> for OverlappingLevelCommon<T>
35where
36 for<'a> LevelCommon<T>: From<&'a PbLevel>,
37{
38 fn from(pb_overlapping_level: &PbOverlappingLevel) -> Self {
39 Self {
40 sub_levels: pb_overlapping_level
41 .sub_levels
42 .iter()
43 .map(LevelCommon::from)
44 .collect_vec(),
45 total_file_size: pb_overlapping_level.total_file_size,
46 uncompressed_file_size: pb_overlapping_level.uncompressed_file_size,
47 }
48 }
49}
50
51impl<T> From<&OverlappingLevelCommon<T>> for PbOverlappingLevel
52where
53 for<'a> &'a LevelCommon<T>: Into<PbLevel>,
54{
55 fn from(overlapping_level: &OverlappingLevelCommon<T>) -> Self {
56 Self {
57 sub_levels: overlapping_level
58 .sub_levels
59 .iter()
60 .map(|level| level.into())
61 .collect_vec(),
62 total_file_size: overlapping_level.total_file_size,
63 uncompressed_file_size: overlapping_level.uncompressed_file_size,
64 }
65 }
66}
67
68impl<T> From<OverlappingLevelCommon<T>> for PbOverlappingLevel
69where
70 LevelCommon<T>: Into<PbLevel>,
71{
72 fn from(overlapping_level: OverlappingLevelCommon<T>) -> Self {
73 Self {
74 sub_levels: overlapping_level
75 .sub_levels
76 .into_iter()
77 .map(|pb_level| pb_level.into())
78 .collect_vec(),
79 total_file_size: overlapping_level.total_file_size,
80 uncompressed_file_size: overlapping_level.uncompressed_file_size,
81 }
82 }
83}
84
85impl<T> From<PbOverlappingLevel> for OverlappingLevelCommon<T>
86where
87 LevelCommon<T>: From<PbLevel>,
88{
89 fn from(pb_overlapping_level: PbOverlappingLevel) -> Self {
90 Self {
91 sub_levels: pb_overlapping_level
92 .sub_levels
93 .into_iter()
94 .map(LevelCommon::from)
95 .collect_vec(),
96 total_file_size: pb_overlapping_level.total_file_size,
97 uncompressed_file_size: pb_overlapping_level.uncompressed_file_size,
98 }
99 }
100}
101
102impl OverlappingLevel {
103 pub fn estimated_encode_len(&self) -> usize {
104 self.sub_levels
105 .iter()
106 .map(|level| level.estimated_encode_len())
107 .sum::<usize>()
108 + size_of::<u64>()
109 + size_of::<u64>()
110 }
111}
112
113#[derive(Debug, Clone, PartialEq, Default)]
114pub struct LevelCommon<T> {
115 pub level_idx: u32,
116 pub level_type: PbLevelType,
117 pub table_infos: Vec<T>,
118 pub total_file_size: u64,
119 pub sub_level_id: u64,
120 pub uncompressed_file_size: u64,
121 pub vnode_partition_count: u32,
122}
123
124pub type Level = LevelCommon<SstableInfo>;
125
126impl<T> From<&PbLevel> for LevelCommon<T>
127where
128 T: for<'a> From<&'a PbSstableInfo>,
129{
130 fn from(pb_level: &PbLevel) -> Self {
131 Self {
132 level_idx: pb_level.level_idx,
133 level_type: PbLevelType::try_from(pb_level.level_type).unwrap(),
134 table_infos: pb_level.table_infos.iter().map(Into::into).collect_vec(),
135 total_file_size: pb_level.total_file_size,
136 sub_level_id: pb_level.sub_level_id,
137 uncompressed_file_size: pb_level.uncompressed_file_size,
138 vnode_partition_count: pb_level.vnode_partition_count,
139 }
140 }
141}
142
143impl<T> From<&LevelCommon<T>> for PbLevel
144where
145 PbSstableInfo: for<'a> From<&'a T>,
146{
147 fn from(level: &LevelCommon<T>) -> Self {
148 Self {
149 level_idx: level.level_idx,
150 level_type: level.level_type.into(),
151 table_infos: level.table_infos.iter().map(Into::into).collect_vec(),
152 total_file_size: level.total_file_size,
153 sub_level_id: level.sub_level_id,
154 uncompressed_file_size: level.uncompressed_file_size,
155 vnode_partition_count: level.vnode_partition_count,
156 }
157 }
158}
159
160impl<T> From<LevelCommon<T>> for PbLevel
161where
162 PbSstableInfo: From<T>,
163{
164 fn from(level: LevelCommon<T>) -> Self {
165 Self {
166 level_idx: level.level_idx,
167 level_type: level.level_type.into(),
168 table_infos: level.table_infos.into_iter().map(Into::into).collect_vec(),
169 total_file_size: level.total_file_size,
170 sub_level_id: level.sub_level_id,
171 uncompressed_file_size: level.uncompressed_file_size,
172 vnode_partition_count: level.vnode_partition_count,
173 }
174 }
175}
176
177impl<T> From<PbLevel> for LevelCommon<T>
178where
179 T: From<PbSstableInfo>,
180{
181 fn from(pb_level: PbLevel) -> Self {
182 Self {
183 level_idx: pb_level.level_idx,
184 level_type: PbLevelType::try_from(pb_level.level_type).unwrap(),
185 table_infos: pb_level
186 .table_infos
187 .into_iter()
188 .map(Into::into)
189 .collect_vec(),
190 total_file_size: pb_level.total_file_size,
191 sub_level_id: pb_level.sub_level_id,
192 uncompressed_file_size: pb_level.uncompressed_file_size,
193 vnode_partition_count: pb_level.vnode_partition_count,
194 }
195 }
196}
197
198impl Level {
199 pub fn estimated_encode_len(&self) -> usize {
200 size_of::<u32>()
201 + size_of::<u32>()
202 + self
203 .table_infos
204 .iter()
205 .map(|sst| sst.estimated_encode_len())
206 .sum::<usize>()
207 + size_of::<u64>()
208 + size_of::<u64>()
209 + size_of::<u64>()
210 + size_of::<u32>()
211 }
212}
213
214#[derive(Debug, Clone, PartialEq, Default)]
215pub struct LevelsCommon<T> {
216 pub levels: Vec<LevelCommon<T>>,
217 pub l0: OverlappingLevelCommon<T>,
218 pub group_id: u64,
219 pub parent_group_id: u64,
220
221 #[deprecated]
222 pub member_table_ids: Vec<u32>,
223 pub compaction_group_version_id: u64,
224}
225
226pub type Levels = LevelsCommon<SstableInfo>;
227
228impl Levels {
229 pub fn level0(&self) -> &OverlappingLevel {
230 &self.l0
231 }
232
233 pub fn get_level(&self, level_idx: usize) -> &Level {
234 &self.levels[level_idx - 1]
235 }
236
237 pub fn get_level_mut(&mut self, level_idx: usize) -> &mut Level {
238 &mut self.levels[level_idx - 1]
239 }
240
241 pub fn is_last_level(&self, level_idx: u32) -> bool {
242 self.levels
243 .last()
244 .as_ref()
245 .is_some_and(|level| level.level_idx == level_idx)
246 }
247
248 pub fn count_ssts(&self) -> usize {
249 self.level0()
250 .sub_levels
251 .iter()
252 .chain(self.levels.iter())
253 .map(|level| level.table_infos.len())
254 .sum()
255 }
256}
257
258impl<T> LevelsCommon<T>
259where
260 PbLevels: for<'a> From<&'a LevelsCommon<T>>,
261{
262 pub fn to_protobuf(&self) -> PbLevels {
263 self.into()
264 }
265}
266
267impl<T> LevelsCommon<T>
268where
269 LevelsCommon<T>: for<'a> From<&'a PbLevels>,
270{
271 pub fn from_protobuf(pb_levels: &PbLevels) -> LevelsCommon<T> {
272 LevelsCommon::<T>::from(pb_levels)
273 }
274}
275
276impl Levels {
277 pub fn estimated_encode_len(&self) -> usize {
278 let mut basic = self
279 .levels
280 .iter()
281 .map(|level| level.estimated_encode_len())
282 .sum::<usize>()
283 + size_of::<u64>()
284 + size_of::<u64>()
285 + size_of::<u32>();
286 basic += self.l0.estimated_encode_len();
287
288 basic
289 }
290}
291
292impl<T> From<&PbLevels> for LevelsCommon<T>
293where
294 T: for<'a> From<&'a PbSstableInfo>,
295{
296 #[expect(deprecated)]
297 fn from(pb_levels: &PbLevels) -> Self {
298 Self {
299 l0: OverlappingLevelCommon::from(pb_levels.l0.as_ref().unwrap()),
300 levels: pb_levels.levels.iter().map(Into::into).collect_vec(),
301 group_id: pb_levels.group_id,
302 parent_group_id: pb_levels.parent_group_id,
303 member_table_ids: pb_levels.member_table_ids.clone(),
304 compaction_group_version_id: pb_levels.compaction_group_version_id,
305 }
306 }
307}
308
309impl<T> From<&LevelsCommon<T>> for PbLevels
310where
311 PbSstableInfo: for<'a> From<&'a T>,
312{
313 #[expect(deprecated)]
314 fn from(levels: &LevelsCommon<T>) -> Self {
315 Self {
316 l0: Some((&levels.l0).into()),
317 levels: levels.levels.iter().map(PbLevel::from).collect_vec(),
318 group_id: levels.group_id,
319 parent_group_id: levels.parent_group_id,
320 member_table_ids: levels.member_table_ids.clone(),
321 compaction_group_version_id: levels.compaction_group_version_id,
322 }
323 }
324}
325
326impl<T> From<PbLevels> for LevelsCommon<T>
327where
328 T: From<PbSstableInfo>,
329{
330 #[expect(deprecated)]
331 fn from(pb_levels: PbLevels) -> Self {
332 Self {
333 l0: OverlappingLevelCommon::from(pb_levels.l0.unwrap()),
334 levels: pb_levels
335 .levels
336 .into_iter()
337 .map(LevelCommon::from)
338 .collect_vec(),
339 group_id: pb_levels.group_id,
340 parent_group_id: pb_levels.parent_group_id,
341 member_table_ids: pb_levels.member_table_ids,
342 compaction_group_version_id: pb_levels.compaction_group_version_id,
343 }
344 }
345}
346
347impl<T> From<LevelsCommon<T>> for PbLevels
348where
349 PbSstableInfo: From<T>,
350{
351 fn from(levels: LevelsCommon<T>) -> Self {
352 #[expect(deprecated)]
353 Self {
354 l0: Some(levels.l0.into()),
355 levels: levels.levels.into_iter().map(PbLevel::from).collect_vec(),
356 group_id: levels.group_id,
357 parent_group_id: levels.parent_group_id,
358 member_table_ids: levels.member_table_ids,
359 compaction_group_version_id: levels.compaction_group_version_id,
360 }
361 }
362}
363
364#[derive(Clone, PartialEq, Default, Debug)]
365pub struct InputLevel {
366 pub level_idx: u32,
367 pub level_type: PbLevelType,
368 pub table_infos: Vec<SstableInfo>,
369}
370
371impl InputLevel {
372 pub fn estimated_encode_len(&self) -> usize {
373 size_of::<u32>()
374 + size_of::<i32>()
375 + self
376 .table_infos
377 .iter()
378 .map(|sst| sst.estimated_encode_len())
379 .sum::<usize>()
380 }
381}
382
383impl From<PbInputLevel> for InputLevel {
384 fn from(pb_input_level: PbInputLevel) -> Self {
385 Self {
386 level_idx: pb_input_level.level_idx,
387 level_type: PbLevelType::try_from(pb_input_level.level_type).unwrap(),
388 table_infos: pb_input_level
389 .table_infos
390 .into_iter()
391 .map(SstableInfo::from)
392 .collect_vec(),
393 }
394 }
395}
396
397impl From<&PbInputLevel> for InputLevel {
398 fn from(pb_input_level: &PbInputLevel) -> Self {
399 Self {
400 level_idx: pb_input_level.level_idx,
401 level_type: PbLevelType::try_from(pb_input_level.level_type).unwrap(),
402 table_infos: pb_input_level
403 .table_infos
404 .iter()
405 .map(SstableInfo::from)
406 .collect_vec(),
407 }
408 }
409}
410
411impl From<InputLevel> for PbInputLevel {
412 fn from(input_level: InputLevel) -> Self {
413 Self {
414 level_idx: input_level.level_idx,
415 level_type: input_level.level_type.into(),
416 table_infos: input_level
417 .table_infos
418 .into_iter()
419 .map(|sst| sst.into())
420 .collect_vec(),
421 }
422 }
423}
424
425impl From<&InputLevel> for PbInputLevel {
426 fn from(input_level: &InputLevel) -> Self {
427 Self {
428 level_idx: input_level.level_idx,
429 level_type: input_level.level_type.into(),
430 table_infos: input_level
431 .table_infos
432 .iter()
433 .map(|sst| sst.into())
434 .collect_vec(),
435 }
436 }
437}