1use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_common::log::LogSuppressor;
27use risingwave_pb::hummock::{
28 CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
29};
30use tracing::warn;
31
32use super::group_split::split_sst_with_table_ids;
33use super::{StateTableId, group_split};
34use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
35use crate::compact_task::is_compaction_task_expired;
36use crate::compaction_group::StaticCompactionGroupId;
37use crate::key_range::KeyRangeCommon;
38use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
39use crate::sstable_info::SstableInfo;
40use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
41use crate::vector_index::apply_vector_index_delta;
42use crate::version::{
43 GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
44 IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
45};
46use crate::{
47 CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
48};
49
50#[derive(Debug, Clone, Default)]
51pub struct SstDeltaInfo {
52 pub insert_sst_level: u32,
53 pub insert_sst_infos: Vec<SstableInfo>,
54 pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
55}
56
57pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
58
59impl<L> HummockVersionCommon<SstableInfo, L> {
60 pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
61 self.levels
62 .get(&compaction_group_id)
63 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
64 }
65
66 pub fn get_compaction_group_levels_mut(
67 &mut self,
68 compaction_group_id: CompactionGroupId,
69 ) -> &mut Levels {
70 self.levels
71 .get_mut(&compaction_group_id)
72 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
73 }
74
75 pub fn get_sst_ids_by_group_id(
77 &self,
78 compaction_group_id: CompactionGroupId,
79 ) -> impl Iterator<Item = HummockSstableId> + '_ {
80 self.levels
81 .iter()
82 .filter_map(move |(cg_id, level)| {
83 if *cg_id == compaction_group_id {
84 Some(level)
85 } else {
86 None
87 }
88 })
89 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
90 .flat_map(|level| level.table_infos.iter())
91 .map(|s| s.sst_id)
92 }
93
94 pub fn level_iter<F: FnMut(&Level) -> bool>(
95 &self,
96 compaction_group_id: CompactionGroupId,
97 mut f: F,
98 ) {
99 if let Some(levels) = self.levels.get(&compaction_group_id) {
100 for sub_level in &levels.l0.sub_levels {
101 if !f(sub_level) {
102 return;
103 }
104 }
105 for level in &levels.levels {
106 if !f(level) {
107 return;
108 }
109 }
110 }
111 }
112
113 pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
114 self.levels
116 .get(&compaction_group_id)
117 .map(|group| group.levels.len() + 1)
118 .unwrap_or(0)
119 }
120
121 pub fn safe_epoch_table_watermarks(
122 &self,
123 existing_table_ids: &[TableId],
124 ) -> BTreeMap<TableId, TableWatermarks> {
125 safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
126 }
127}
128
129pub fn safe_epoch_table_watermarks_impl(
130 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
131 existing_table_ids: &[TableId],
132) -> BTreeMap<TableId, TableWatermarks> {
133 fn extract_single_table_watermark(
134 table_watermarks: &TableWatermarks,
135 ) -> Option<TableWatermarks> {
136 if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
137 Some(TableWatermarks {
138 watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
139 direction: table_watermarks.direction,
140 watermark_type: table_watermarks.watermark_type,
141 })
142 } else {
143 None
144 }
145 }
146 table_watermarks
147 .iter()
148 .filter_map(|(table_id, table_watermarks)| {
149 if !existing_table_ids.contains(table_id) {
150 None
151 } else {
152 extract_single_table_watermark(table_watermarks)
153 .map(|table_watermarks| (*table_id, table_watermarks))
154 }
155 })
156 .collect()
157}
158
159pub fn safe_epoch_read_table_watermarks_impl(
160 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
161) -> BTreeMap<TableId, ReadTableWatermark> {
162 safe_epoch_watermarks
163 .into_iter()
164 .map(|(table_id, watermarks)| {
165 assert_eq!(watermarks.watermarks.len(), 1);
166 let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
167 let mut vnode_watermark_map = BTreeMap::new();
168 for vnode_watermark in vnode_watermarks.iter() {
169 let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
170 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
171 assert!(
172 vnode_watermark_map
173 .insert(vnode, watermark.clone())
174 .is_none(),
175 "duplicate table watermark on vnode {}",
176 vnode.to_index()
177 );
178 }
179 }
180 (
181 table_id,
182 ReadTableWatermark {
183 direction: watermarks.direction,
184 vnode_watermarks: vnode_watermark_map,
185 },
186 )
187 })
188 .collect()
189}
190
191impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
192 pub fn count_new_ssts_in_group_split(
193 &self,
194 parent_group_id: CompactionGroupId,
195 split_key: Bytes,
196 ) -> u64 {
197 self.levels
198 .get(&parent_group_id)
199 .map_or(0, |parent_levels| {
200 let l0 = &parent_levels.l0;
201 let mut split_count = 0;
202 for sub_level in &l0.sub_levels {
203 assert!(!sub_level.table_infos.is_empty());
204
205 if sub_level.level_type == PbLevelType::Overlapping {
206 split_count += sub_level
208 .table_infos
209 .iter()
210 .map(|sst| {
211 if let group_split::SstSplitType::Both =
212 group_split::need_to_split(sst, split_key.clone())
213 {
214 2
215 } else {
216 0
217 }
218 })
219 .sum::<u64>();
220 continue;
221 }
222
223 let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
224 let sst = sub_level.table_infos.get(pos).unwrap();
225
226 if let group_split::SstSplitType::Both =
227 group_split::need_to_split(sst, split_key.clone())
228 {
229 split_count += 2;
230 }
231 }
232
233 for level in &parent_levels.levels {
234 if level.table_infos.is_empty() {
235 continue;
236 }
237 let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
238 let sst = level.table_infos.get(pos).unwrap();
239 if let group_split::SstSplitType::Both =
240 group_split::need_to_split(sst, split_key.clone())
241 {
242 split_count += 2;
243 }
244 }
245
246 split_count
247 })
248 }
249
250 pub fn init_with_parent_group(
251 &mut self,
252 parent_group_id: CompactionGroupId,
253 group_id: CompactionGroupId,
254 member_table_ids: BTreeSet<StateTableId>,
255 new_sst_start_id: HummockSstableId,
256 ) {
257 let mut new_sst_id = new_sst_start_id;
258 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
259 if new_sst_start_id != 0 {
260 if cfg!(debug_assertions) {
261 panic!(
262 "non-zero sst start id {} for NewCompactionGroup",
263 new_sst_start_id
264 );
265 } else {
266 warn!(
267 %new_sst_start_id,
268 "non-zero sst start id for NewCompactionGroup"
269 );
270 }
271 }
272 return;
273 } else if !self.levels.contains_key(&parent_group_id) {
274 unreachable!(
275 "non-existing parent group id {} to init from",
276 parent_group_id
277 );
278 }
279 let [parent_levels, cur_levels] = self
280 .levels
281 .get_disjoint_mut([&parent_group_id, &group_id])
282 .map(|res| res.unwrap());
283 parent_levels.compaction_group_version_id += 1;
286 cur_levels.compaction_group_version_id += 1;
287 let l0 = &mut parent_levels.l0;
288 {
289 for sub_level in &mut l0.sub_levels {
290 let target_l0 = &mut cur_levels.l0;
291 let insert_table_infos =
294 split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
295 sub_level.normalize();
296 if insert_table_infos.is_empty() {
297 continue;
298 }
299 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
300 Ok(idx) => {
301 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
302 }
303 Err(idx) => {
304 insert_new_sub_level(
305 target_l0,
306 sub_level.sub_level_id,
307 sub_level.level_type,
308 insert_table_infos,
309 Some(idx),
310 );
311 }
312 }
313 }
314 l0.normalize();
315 }
316 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
317 let insert_table_infos =
318 split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
319 cur_levels.levels[idx].total_file_size += insert_table_infos
320 .iter()
321 .map(|sst| sst.sst_size)
322 .sum::<u64>();
323 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
324 .iter()
325 .map(|sst| sst.uncompressed_file_size)
326 .sum::<u64>();
327 cur_levels.levels[idx]
328 .table_infos
329 .extend(insert_table_infos);
330 cur_levels.levels[idx]
331 .table_infos
332 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
333 assert!(can_concat(&cur_levels.levels[idx].table_infos));
334 level.normalize();
335 }
336
337 assert!(
338 parent_levels
339 .l0
340 .sub_levels
341 .iter()
342 .all(|level| !level.table_infos.is_empty())
343 );
344 assert!(
345 cur_levels
346 .l0
347 .sub_levels
348 .iter()
349 .all(|level| !level.table_infos.is_empty())
350 );
351 }
352
353 pub fn build_sst_delta_infos(
354 &self,
355 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
356 ) -> Vec<SstDeltaInfo> {
357 let mut infos = vec![];
358
359 if version_delta.trivial_move {
362 return infos;
363 }
364
365 for (group_id, group_deltas) in &version_delta.group_deltas {
366 let mut info = SstDeltaInfo::default();
367
368 let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
369 let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
370
371 if !group_deltas.group_deltas.iter().all(|delta| {
373 matches!(
374 delta,
375 GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
376 )
377 }) {
378 continue;
379 }
380
381 for group_delta in &group_deltas.group_deltas {
385 match group_delta {
386 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
387 if !inserted_table_infos.is_empty() {
388 info.insert_sst_level = 0;
389 info.insert_sst_infos
390 .extend(inserted_table_infos.iter().cloned());
391 }
392 }
393 GroupDeltaCommon::IntraLevel(intra_level) => {
394 if !intra_level.inserted_table_infos.is_empty() {
395 info.insert_sst_level = intra_level.level_idx;
396 info.insert_sst_infos
397 .extend(intra_level.inserted_table_infos.iter().cloned());
398 }
399 if !intra_level.removed_table_ids.is_empty() {
400 for id in &intra_level.removed_table_ids {
401 if intra_level.level_idx == 0 {
402 removed_l0_ssts.insert(*id);
403 } else {
404 removed_ssts
405 .entry(intra_level.level_idx)
406 .or_default()
407 .insert(*id);
408 }
409 }
410 }
411 }
412 GroupDeltaCommon::GroupConstruct(_)
413 | GroupDeltaCommon::GroupDestroy(_)
414 | GroupDeltaCommon::GroupMerge(_)
415 | GroupDeltaCommon::TruncateTables(_) => {}
416 }
417 }
418
419 let group = self.levels.get(group_id).unwrap();
420 for l0_sub_level in &group.level0().sub_levels {
421 for sst_info in &l0_sub_level.table_infos {
422 if removed_l0_ssts.remove(&sst_info.sst_id) {
423 info.delete_sst_object_ids.push(sst_info.object_id);
424 }
425 }
426 }
427 for level in &group.levels {
428 if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
429 for sst_info in &level.table_infos {
430 if removed_level_ssts.remove(&sst_info.sst_id) {
431 info.delete_sst_object_ids.push(sst_info.object_id);
432 }
433 }
434 if !removed_level_ssts.is_empty() {
435 tracing::error!(
436 "removed_level_ssts is not empty: {:?}",
437 removed_level_ssts,
438 );
439 }
440 debug_assert!(removed_level_ssts.is_empty());
441 }
442 }
443
444 if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
445 tracing::error!(
446 "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
447 removed_l0_ssts,
448 removed_ssts
449 );
450 }
451 debug_assert!(removed_l0_ssts.is_empty());
452 debug_assert!(removed_ssts.is_empty());
453
454 infos.push(info);
455 }
456
457 infos
458 }
459
460 pub fn apply_version_delta(
461 &mut self,
462 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
463 ) {
464 assert_eq!(self.id, version_delta.prev_id);
465
466 let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
467 &version_delta.state_table_info_delta,
468 &version_delta.removed_table_ids,
469 );
470
471 #[expect(deprecated)]
472 {
473 if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
474 is_commit_epoch = true;
475 tracing::trace!(
476 "max committed epoch bumped but no table committed epoch is changed"
477 );
478 }
479 }
480
481 for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
483 let mut is_applied_l0_compact = false;
484 for group_delta in &group_deltas.group_deltas {
485 match group_delta {
486 GroupDeltaCommon::GroupConstruct(group_construct) => {
487 let mut new_levels = build_initial_compaction_group_levels(
488 *compaction_group_id,
489 group_construct.get_group_config().unwrap(),
490 );
491 let parent_group_id = group_construct.parent_group_id;
492 new_levels.parent_group_id = parent_group_id;
493 #[expect(deprecated)]
494 new_levels
496 .member_table_ids
497 .clone_from(&group_construct.table_ids);
498 self.levels.insert(*compaction_group_id, new_levels);
499 let member_table_ids = if group_construct.version()
500 >= CompatibilityVersion::NoMemberTableIds
501 {
502 self.state_table_info
503 .compaction_group_member_table_ids(*compaction_group_id)
504 .iter()
505 .copied()
506 .collect()
507 } else {
508 #[expect(deprecated)]
509 BTreeSet::from_iter(
511 group_construct.table_ids.iter().copied().map(Into::into),
512 )
513 };
514
515 if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
516 let split_key = if group_construct.split_key.is_some() {
517 Some(Bytes::from(group_construct.split_key.clone().unwrap()))
518 } else {
519 None
520 };
521 self.init_with_parent_group_v2(
522 parent_group_id,
523 *compaction_group_id,
524 group_construct.new_sst_start_id,
525 split_key.clone(),
526 );
527 } else {
528 self.init_with_parent_group(
530 parent_group_id,
531 *compaction_group_id,
532 member_table_ids,
533 group_construct.new_sst_start_id,
534 );
535 }
536 }
537 GroupDeltaCommon::GroupMerge(group_merge) => {
538 tracing::info!(
539 "group_merge left {:?} right {:?}",
540 group_merge.left_group_id,
541 group_merge.right_group_id
542 );
543 self.merge_compaction_group(
544 group_merge.left_group_id,
545 group_merge.right_group_id,
546 )
547 }
548 GroupDeltaCommon::IntraLevel(level_delta) => {
549 let levels =
550 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
551 panic!("compaction group {} does not exist", compaction_group_id)
552 });
553 if is_commit_epoch {
554 assert!(
555 level_delta.removed_table_ids.is_empty(),
556 "no sst should be deleted when committing an epoch"
557 );
558
559 let IntraLevelDelta {
560 level_idx,
561 l0_sub_level_id,
562 inserted_table_infos,
563 ..
564 } = level_delta;
565 {
566 assert_eq!(
567 *level_idx, 0,
568 "we should only add to L0 when we commit an epoch."
569 );
570 if !inserted_table_infos.is_empty() {
571 insert_new_sub_level(
572 &mut levels.l0,
573 *l0_sub_level_id,
574 PbLevelType::Overlapping,
575 inserted_table_infos.clone(),
576 None,
577 );
578 }
579 }
580 } else {
581 levels.apply_compact_ssts(
583 level_delta,
584 self.state_table_info
585 .compaction_group_member_table_ids(*compaction_group_id),
586 );
587 if level_delta.level_idx == 0 {
588 is_applied_l0_compact = true;
589 }
590 }
591 }
592 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
593 let levels =
594 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
595 panic!("compaction group {} does not exist", compaction_group_id)
596 });
597 assert!(is_commit_epoch);
598
599 if !inserted_table_infos.is_empty() {
600 let next_l0_sub_level_id = levels
601 .l0
602 .sub_levels
603 .last()
604 .map(|level| level.sub_level_id + 1)
605 .unwrap_or(1);
606
607 insert_new_sub_level(
608 &mut levels.l0,
609 next_l0_sub_level_id,
610 PbLevelType::Overlapping,
611 inserted_table_infos.clone(),
612 None,
613 );
614 }
615 }
616 GroupDeltaCommon::GroupDestroy(_) => {
617 self.levels.remove(compaction_group_id);
618 }
619
620 GroupDeltaCommon::TruncateTables(table_ids) => {
621 self.levels
622 .get_mut(compaction_group_id)
623 .unwrap_or_else(|| {
624 panic!("compaction group {} does not exist", compaction_group_id)
625 })
626 .truncate_tables(table_ids);
627 }
628 }
629 }
630
631 if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
632 {
633 levels.l0.normalize();
634 }
635 }
636 self.id = version_delta.id;
637 #[expect(deprecated)]
638 {
639 self.max_committed_epoch = version_delta.max_committed_epoch;
640 }
641
642 let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
646 HashMap::new();
647
648 for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
650 if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
651 if version_delta.removed_table_ids.contains(table_id) {
652 modified_table_watermarks.insert(*table_id, None);
653 } else {
654 let mut current_table_watermarks = (**current_table_watermarks).clone();
655 current_table_watermarks.apply_new_table_watermarks(table_watermarks);
656 modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
657 }
658 } else {
659 modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
660 }
661 }
662 for (table_id, table_watermarks) in &self.table_watermarks {
663 let safe_epoch = if let Some(state_table_info) =
664 self.state_table_info.info().get(table_id)
665 && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
666 && state_table_info.committed_epoch > *oldest_epoch
667 {
668 state_table_info.committed_epoch
670 } else {
671 continue;
673 };
674 let table_watermarks = modified_table_watermarks
675 .entry(*table_id)
676 .or_insert_with(|| Some((**table_watermarks).clone()));
677 if let Some(table_watermarks) = table_watermarks {
678 table_watermarks.clear_stale_epoch_watermark(safe_epoch);
679 }
680 }
681 for (table_id, table_watermarks) in modified_table_watermarks {
683 if let Some(table_watermarks) = table_watermarks {
684 self.table_watermarks
685 .insert(table_id, Arc::new(table_watermarks));
686 } else {
687 self.table_watermarks.remove(&table_id);
688 }
689 }
690
691 Self::apply_change_log_delta(
693 &mut self.table_change_log,
694 &version_delta.change_log_delta,
695 &version_delta.removed_table_ids,
696 &version_delta.state_table_info_delta,
697 &changed_table_info,
698 );
699
700 apply_vector_index_delta(
702 &mut self.vector_indexes,
703 &version_delta.vector_index_delta,
704 &version_delta.removed_table_ids,
705 );
706 }
707
708 pub fn apply_change_log_delta<T: Clone>(
709 table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
710 change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
711 removed_table_ids: &HashSet<TableId>,
712 state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
713 changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
714 ) {
715 for (table_id, change_log_delta) in change_log_delta {
716 let new_change_log = &change_log_delta.new_log;
717 match table_change_log.entry(*table_id) {
718 Entry::Occupied(entry) => {
719 let change_log = entry.into_mut();
720 change_log.add_change_log(new_change_log.clone());
721 }
722 Entry::Vacant(entry) => {
723 entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
724 }
725 };
726 }
727
728 table_change_log.retain(|table_id, _| {
732 if removed_table_ids.contains(table_id) {
733 return false;
734 }
735 if let Some(table_info_delta) = state_table_info_delta.get(table_id)
736 && let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
737 } else {
739 return true;
741 }
742 let contains = change_log_delta.contains_key(table_id);
743 if !contains {
744 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
745 LazyLock::new(|| LogSuppressor::per_second(1));
746 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
747 warn!(
748 suppressed_count,
749 %table_id,
750 "table change log dropped due to no further change log at newly committed epoch"
751 );
752 }
753 }
754 contains
755 });
756
757 for (table_id, change_log_delta) in change_log_delta {
759 if let Some(change_log) = table_change_log.get_mut(table_id) {
760 change_log.truncate(change_log_delta.truncate_epoch);
761 }
762 }
763 }
764
765 pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
766 let mut ret: BTreeMap<_, _> = BTreeMap::new();
767 for (compaction_group_id, group) in &self.levels {
768 let mut levels = vec![];
769 levels.extend(group.l0.sub_levels.iter());
770 levels.extend(group.levels.iter());
771 for level in levels {
772 for table_info in &level.table_infos {
773 if table_info.sst_id.as_raw_id() == table_info.object_id.as_raw_id() {
774 continue;
775 }
776 let object_id = table_info.object_id;
777 let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
778 entry
779 .entry(*compaction_group_id)
780 .or_default()
781 .push(table_info.sst_id)
782 }
783 }
784 }
785 ret
786 }
787
788 pub fn merge_compaction_group(
789 &mut self,
790 left_group_id: CompactionGroupId,
791 right_group_id: CompactionGroupId,
792 ) {
793 let left_group_id_table_ids = self
795 .state_table_info
796 .compaction_group_member_table_ids(left_group_id)
797 .iter();
798 let right_group_id_table_ids = self
799 .state_table_info
800 .compaction_group_member_table_ids(right_group_id)
801 .iter();
802
803 assert!(
804 left_group_id_table_ids
805 .chain(right_group_id_table_ids)
806 .is_sorted()
807 );
808
809 let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
810 let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
811 panic!(
812 "compaction group should exist right {} all {:?}",
813 right_group_id, total_cg
814 )
815 });
816
817 let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
818 panic!(
819 "compaction group should exist left {} all {:?}",
820 left_group_id, total_cg
821 )
822 });
823
824 group_split::merge_levels(left_levels, right_levels);
825 }
826
827 pub fn init_with_parent_group_v2(
828 &mut self,
829 parent_group_id: CompactionGroupId,
830 group_id: CompactionGroupId,
831 new_sst_start_id: HummockSstableId,
832 split_key: Option<Bytes>,
833 ) {
834 let mut new_sst_id = new_sst_start_id;
835 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
836 if new_sst_start_id != 0 {
837 if cfg!(debug_assertions) {
838 panic!(
839 "non-zero sst start id {} for NewCompactionGroup",
840 new_sst_start_id
841 );
842 } else {
843 warn!(
844 %new_sst_start_id,
845 "non-zero sst start id for NewCompactionGroup"
846 );
847 }
848 }
849 return;
850 } else if !self.levels.contains_key(&parent_group_id) {
851 unreachable!(
852 "non-existing parent group id {} to init from (V2)",
853 parent_group_id
854 );
855 }
856
857 let [parent_levels, cur_levels] = self
858 .levels
859 .get_disjoint_mut([&parent_group_id, &group_id])
860 .map(|res| res.unwrap());
861 parent_levels.compaction_group_version_id += 1;
864 cur_levels.compaction_group_version_id += 1;
865
866 let l0 = &mut parent_levels.l0;
867 {
868 for sub_level in &mut l0.sub_levels {
869 let target_l0 = &mut cur_levels.l0;
870 let insert_table_infos = if let Some(split_key) = &split_key {
873 group_split::split_sst_info_for_level_v2(
874 sub_level,
875 &mut new_sst_id,
876 split_key.clone(),
877 )
878 } else {
879 vec![]
880 };
881
882 if insert_table_infos.is_empty() {
883 continue;
884 }
885
886 sub_level.normalize();
887 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
888 Ok(idx) => {
889 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
890 }
891 Err(idx) => {
892 insert_new_sub_level(
893 target_l0,
894 sub_level.sub_level_id,
895 sub_level.level_type,
896 insert_table_infos,
897 Some(idx),
898 );
899 }
900 }
901 }
902 l0.normalize();
903 }
904
905 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
906 let insert_table_infos = if let Some(split_key) = &split_key {
907 group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
908 } else {
909 vec![]
910 };
911
912 if insert_table_infos.is_empty() {
913 continue;
914 }
915
916 cur_levels.levels[idx].total_file_size += insert_table_infos
917 .iter()
918 .map(|sst| sst.sst_size)
919 .sum::<u64>();
920 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
921 .iter()
922 .map(|sst| sst.uncompressed_file_size)
923 .sum::<u64>();
924 cur_levels.levels[idx]
925 .table_infos
926 .extend(insert_table_infos);
927 cur_levels.levels[idx]
928 .table_infos
929 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
930 assert!(can_concat(&cur_levels.levels[idx].table_infos));
931 level.normalize();
932 }
933
934 assert!(
935 parent_levels
936 .l0
937 .sub_levels
938 .iter()
939 .all(|level| !level.table_infos.is_empty())
940 );
941 assert!(
942 cur_levels
943 .l0
944 .sub_levels
945 .iter()
946 .all(|level| !level.table_infos.is_empty())
947 );
948 }
949}
950
951impl<T> HummockVersionCommon<T>
952where
953 T: SstableIdReader + ObjectIdReader,
954{
955 pub fn get_sst_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
956 self.get_sst_infos(exclude_change_log)
957 .map(|s| s.object_id())
958 .collect()
959 }
960
961 pub fn get_object_ids(
962 &self,
963 exclude_change_log: bool,
964 ) -> impl Iterator<Item = HummockObjectId> + '_ {
965 match HummockObjectId::Sstable(0.into()) {
969 HummockObjectId::Sstable(_) => {}
970 HummockObjectId::VectorFile(_) => {}
971 HummockObjectId::HnswGraphFile(_) => {}
972 };
973 self.get_sst_infos(exclude_change_log)
974 .map(|s| HummockObjectId::Sstable(s.object_id()))
975 .chain(
976 self.vector_indexes
977 .values()
978 .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
979 )
980 }
981
982 pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableId> {
983 self.get_sst_infos(exclude_change_log)
984 .map(|s| s.sst_id())
985 .collect()
986 }
987
988 pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
989 let may_table_change_log = if exclude_change_log {
990 None
991 } else {
992 Some(self.table_change_log.values())
993 };
994 self.get_combined_levels()
995 .flat_map(|level| level.table_infos.iter())
996 .chain(
997 may_table_change_log
998 .map(|v| {
999 v.flat_map(|table_change_log| {
1000 table_change_log.iter().flat_map(|epoch_change_log| {
1001 epoch_change_log
1002 .old_value
1003 .iter()
1004 .chain(epoch_change_log.new_value.iter())
1005 })
1006 })
1007 })
1008 .into_iter()
1009 .flatten(),
1010 )
1011 }
1012}
1013
1014impl Levels {
1015 pub(crate) fn apply_compact_ssts(
1016 &mut self,
1017 level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1018 member_table_ids: &BTreeSet<TableId>,
1019 ) {
1020 let IntraLevelDeltaCommon {
1021 level_idx,
1022 l0_sub_level_id,
1023 inserted_table_infos: insert_table_infos,
1024 vnode_partition_count,
1025 removed_table_ids: delete_sst_ids_set,
1026 compaction_group_version_id,
1027 } = level_delta;
1028 let new_vnode_partition_count = *vnode_partition_count;
1029
1030 if is_compaction_task_expired(
1031 self.compaction_group_version_id,
1032 *compaction_group_version_id,
1033 ) {
1034 warn!(
1035 current_compaction_group_version_id = self.compaction_group_version_id,
1036 delta_compaction_group_version_id = compaction_group_version_id,
1037 level_idx,
1038 l0_sub_level_id,
1039 insert_table_infos = ?insert_table_infos
1040 .iter()
1041 .map(|sst| (sst.sst_id, sst.object_id))
1042 .collect_vec(),
1043 ?delete_sst_ids_set,
1044 "This VersionDelta may be committed by an expired compact task. Please check it."
1045 );
1046 return;
1047 }
1048 if !delete_sst_ids_set.is_empty() {
1049 if *level_idx == 0 {
1050 for level in &mut self.l0.sub_levels {
1051 level.delete_ssts(delete_sst_ids_set);
1052 }
1053 } else {
1054 let idx = *level_idx as usize - 1;
1055 self.levels[idx].delete_ssts(delete_sst_ids_set);
1056 }
1057 }
1058
1059 if !insert_table_infos.is_empty() {
1060 let insert_sst_level_id = *level_idx;
1061 let insert_sub_level_id = *l0_sub_level_id;
1062 if insert_sst_level_id == 0 {
1063 let l0 = &mut self.l0;
1064 let index = l0
1065 .sub_levels
1066 .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1067 assert!(
1068 index < l0.sub_levels.len()
1069 && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1070 "should find the level to insert into when applying compaction generated delta. sub level idx: {}, removed sst ids: {:?}, sub levels: {:?},",
1071 insert_sub_level_id,
1072 delete_sst_ids_set,
1073 l0.sub_levels
1074 .iter()
1075 .map(|level| level.sub_level_id)
1076 .collect_vec()
1077 );
1078 if l0.sub_levels[index].table_infos.is_empty()
1079 && member_table_ids.len() == 1
1080 && insert_table_infos.iter().all(|sst| {
1081 sst.table_ids.len() == 1
1082 && sst.table_ids[0]
1083 == *member_table_ids.iter().next().expect("non-empty")
1084 })
1085 {
1086 l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1089 }
1090 level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1091 } else {
1092 let idx = insert_sst_level_id as usize - 1;
1093 if self.levels[idx].table_infos.is_empty()
1094 && insert_table_infos
1095 .iter()
1096 .all(|sst| sst.table_ids.len() == 1)
1097 {
1098 self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1099 } else if self.levels[idx].vnode_partition_count != 0
1100 && new_vnode_partition_count == 0
1101 && member_table_ids.len() > 1
1102 {
1103 self.levels[idx].vnode_partition_count = 0;
1104 }
1105 level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1106 }
1107 }
1108 }
1109
1110 pub(crate) fn truncate_tables(&mut self, table_ids: &HashSet<TableId>) {
1113 for level in self.l0.sub_levels.iter_mut().chain(self.levels.iter_mut()) {
1114 level.truncate_tables(table_ids);
1115 }
1116 self.l0.normalize();
1117 self.compaction_group_version_id += 1;
1118 }
1119}
1120
1121impl<T, L> HummockVersionCommon<T, L> {
1122 pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1123 self.levels
1124 .values()
1125 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1126 }
1127}
1128
1129pub fn build_initial_compaction_group_levels(
1130 group_id: impl Into<CompactionGroupId>,
1131 compaction_config: &CompactionConfig,
1132) -> Levels {
1133 let mut levels = vec![];
1134 for l in 0..compaction_config.get_max_level() {
1135 levels.push(Level {
1136 level_idx: (l + 1) as u32,
1137 level_type: PbLevelType::Nonoverlapping,
1138 table_infos: vec![],
1139 total_file_size: 0,
1140 sub_level_id: 0,
1141 uncompressed_file_size: 0,
1142 vnode_partition_count: 0,
1143 });
1144 }
1145 #[expect(deprecated)] Levels {
1147 levels,
1148 l0: OverlappingLevel {
1149 sub_levels: vec![],
1150 total_file_size: 0,
1151 uncompressed_file_size: 0,
1152 },
1153 group_id: group_id.into(),
1154 parent_group_id: 0.into(),
1155 member_table_ids: vec![],
1156 compaction_group_version_id: 0,
1157 }
1158}
1159
1160fn split_sst_info_for_level(
1161 member_table_ids: &BTreeSet<TableId>,
1162 level: &mut Level,
1163 new_sst_id: &mut HummockSstableId,
1164) -> Vec<SstableInfo> {
1165 let mut insert_table_infos = vec![];
1168 for sst_info in &mut level.table_infos {
1169 let removed_table_ids = sst_info
1170 .table_ids
1171 .iter()
1172 .filter(|table_id| member_table_ids.contains(*table_id))
1173 .cloned()
1174 .collect_vec();
1175 let sst_size = sst_info.sst_size;
1176 if sst_size / 2 == 0 {
1177 tracing::warn!(
1178 id = %sst_info.sst_id,
1179 object_id = %sst_info.object_id,
1180 sst_size = sst_info.sst_size,
1181 file_size = sst_info.file_size,
1182 "Sstable sst_size is under expected",
1183 );
1184 };
1185 if !removed_table_ids.is_empty() {
1186 let (modified_sst, branch_sst) = split_sst_with_table_ids(
1187 sst_info,
1188 new_sst_id,
1189 sst_size / 2,
1190 sst_size / 2,
1191 member_table_ids.iter().cloned().collect_vec(),
1192 );
1193 *sst_info = modified_sst;
1194 insert_table_infos.push(branch_sst);
1195 }
1196 }
1197 insert_table_infos
1198}
1199
1200pub fn get_compaction_group_ids(
1202 version: &HummockVersion,
1203) -> impl Iterator<Item = CompactionGroupId> + '_ {
1204 version.levels.keys().cloned()
1205}
1206
1207pub fn get_table_compaction_group_id_mapping(
1208 version: &HummockVersion,
1209) -> HashMap<StateTableId, CompactionGroupId> {
1210 version
1211 .state_table_info
1212 .info()
1213 .iter()
1214 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
1215 .collect()
1216}
1217
1218pub fn get_compaction_group_ssts(
1220 version: &HummockVersion,
1221 group_id: CompactionGroupId,
1222) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1223 let group_levels = version.get_compaction_group_levels(group_id);
1224 group_levels
1225 .l0
1226 .sub_levels
1227 .iter()
1228 .rev()
1229 .chain(group_levels.levels.iter())
1230 .flat_map(|level| {
1231 level
1232 .table_infos
1233 .iter()
1234 .map(|table_info| (table_info.object_id, table_info.sst_id))
1235 })
1236}
1237
1238pub fn new_sub_level(
1239 sub_level_id: u64,
1240 level_type: PbLevelType,
1241 table_infos: Vec<SstableInfo>,
1242) -> Level {
1243 if level_type == PbLevelType::Nonoverlapping {
1244 debug_assert!(
1245 can_concat(&table_infos),
1246 "sst of non-overlapping level is not concat-able: {:?}",
1247 table_infos
1248 );
1249 }
1250 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1251 let uncompressed_file_size = table_infos
1252 .iter()
1253 .map(|table| table.uncompressed_file_size)
1254 .sum();
1255 Level {
1256 level_idx: 0,
1257 level_type,
1258 table_infos,
1259 total_file_size,
1260 sub_level_id,
1261 uncompressed_file_size,
1262 vnode_partition_count: 0,
1263 }
1264}
1265
1266pub fn add_ssts_to_sub_level(
1267 l0: &mut OverlappingLevel,
1268 sub_level_idx: usize,
1269 insert_table_infos: Vec<SstableInfo>,
1270) {
1271 insert_table_infos.iter().for_each(|sst| {
1272 l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1273 l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1274 l0.total_file_size += sst.sst_size;
1275 l0.uncompressed_file_size += sst.uncompressed_file_size;
1276 });
1277 l0.sub_levels[sub_level_idx]
1278 .table_infos
1279 .extend(insert_table_infos);
1280 if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1281 l0.sub_levels[sub_level_idx]
1282 .table_infos
1283 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1284 assert!(
1285 can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1286 "sstable ids: {:?}",
1287 l0.sub_levels[sub_level_idx]
1288 .table_infos
1289 .iter()
1290 .map(|sst| sst.sst_id)
1291 .collect_vec()
1292 );
1293 }
1294}
1295
1296pub fn insert_new_sub_level(
1298 l0: &mut OverlappingLevel,
1299 insert_sub_level_id: u64,
1300 level_type: PbLevelType,
1301 insert_table_infos: Vec<SstableInfo>,
1302 sub_level_insert_hint: Option<usize>,
1303) {
1304 if insert_sub_level_id == u64::MAX {
1305 return;
1306 }
1307 let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1308 insert_pos
1309 } else {
1310 if let Some(newest_level) = l0.sub_levels.last() {
1311 assert!(
1312 newest_level.sub_level_id < insert_sub_level_id,
1313 "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1314 newest_level.sub_level_id,
1315 insert_sub_level_id,
1316 l0,
1317 );
1318 }
1319 l0.sub_levels.len()
1320 };
1321 #[cfg(debug_assertions)]
1322 {
1323 if insert_pos > 0
1324 && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1325 {
1326 debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1327 }
1328 if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1329 debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1330 }
1331 }
1332 let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1335 l0.total_file_size += level.total_file_size;
1336 l0.uncompressed_file_size += level.uncompressed_file_size;
1337 l0.sub_levels.insert(insert_pos, level);
1338}
1339
1340impl Level {
1341 fn recompute_size(&mut self) {
1342 self.total_file_size = self
1343 .table_infos
1344 .iter()
1345 .map(|table| table.sst_size)
1346 .sum::<u64>();
1347 self.uncompressed_file_size = self
1348 .table_infos
1349 .iter()
1350 .map(|table| table.uncompressed_file_size)
1351 .sum::<u64>();
1352 }
1353
1354 fn normalize(&mut self) {
1356 self.table_infos
1357 .retain(|sst_info| !sst_info.table_ids.is_empty());
1358 self.recompute_size();
1359 }
1360
1361 fn delete_ssts(&mut self, ids: &HashSet<HummockSstableId>) -> bool {
1363 let original_len = self.table_infos.len();
1364 self.table_infos
1365 .retain(|table| !ids.contains(&table.sst_id));
1366 self.recompute_size();
1367 original_len != self.table_infos.len()
1368 }
1369
1370 fn truncate_tables(&mut self, table_ids: &HashSet<TableId>) {
1373 for sstable_info in &mut self.table_infos {
1374 let mut inner = sstable_info.get_inner();
1375 inner.table_ids.retain(|id| !table_ids.contains(id));
1376 sstable_info.set_inner(inner);
1377 }
1378 self.normalize();
1379 }
1380}
1381
1382impl OverlappingLevel {
1383 fn normalize(&mut self) {
1385 self.sub_levels
1386 .retain(|level| !level.table_infos.is_empty());
1387 self.total_file_size = self
1388 .sub_levels
1389 .iter()
1390 .map(|level| level.total_file_size)
1391 .sum::<u64>();
1392 self.uncompressed_file_size = self
1393 .sub_levels
1394 .iter()
1395 .map(|level| level.uncompressed_file_size)
1396 .sum::<u64>();
1397 }
1398}
1399
1400fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1401 fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1402 format!(
1403 "sstable ids: {:?}",
1404 ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1405 )
1406 }
1407 operand.total_file_size += insert_table_infos
1408 .iter()
1409 .map(|sst| sst.sst_size)
1410 .sum::<u64>();
1411 operand.uncompressed_file_size += insert_table_infos
1412 .iter()
1413 .map(|sst| sst.uncompressed_file_size)
1414 .sum::<u64>();
1415 if operand.level_type == PbLevelType::Overlapping {
1416 operand.level_type = PbLevelType::Nonoverlapping;
1417 operand
1418 .table_infos
1419 .extend(insert_table_infos.iter().cloned());
1420 operand
1421 .table_infos
1422 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1423 assert!(
1424 can_concat(&operand.table_infos),
1425 "{}",
1426 display_sstable_infos(&operand.table_infos)
1427 );
1428 } else if !insert_table_infos.is_empty() {
1429 let sorted_insert: Vec<_> = insert_table_infos
1430 .iter()
1431 .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1432 .cloned()
1433 .collect();
1434 let first = &sorted_insert[0];
1435 let last = &sorted_insert[sorted_insert.len() - 1];
1436 let pos = operand
1437 .table_infos
1438 .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1439 if pos >= operand.table_infos.len()
1440 || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1441 {
1442 operand.table_infos.splice(pos..pos, sorted_insert);
1443 let validate_range = operand
1445 .table_infos
1446 .iter()
1447 .skip(pos.saturating_sub(1))
1448 .take(insert_table_infos.len() + 2)
1449 .collect_vec();
1450 assert!(
1451 can_concat(&validate_range),
1452 "{}",
1453 display_sstable_infos(&validate_range),
1454 );
1455 } else {
1456 warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1459 for i in insert_table_infos {
1460 let pos = operand
1461 .table_infos
1462 .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1463 operand.table_infos.insert(pos, i.clone());
1464 }
1465 assert!(
1466 can_concat(&operand.table_infos),
1467 "{}",
1468 display_sstable_infos(&operand.table_infos)
1469 );
1470 }
1471 }
1472}
1473
1474pub fn object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1475 match HummockObjectId::Sstable(0.into()) {
1479 HummockObjectId::Sstable(_) => {}
1480 HummockObjectId::VectorFile(_) => {}
1481 HummockObjectId::HnswGraphFile(_) => {}
1482 };
1483 version
1484 .levels
1485 .values()
1486 .flat_map(|cg| {
1487 cg.level0()
1488 .sub_levels
1489 .iter()
1490 .chain(cg.levels.iter())
1491 .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1492 })
1493 .chain(version.table_change_log.values().flat_map(|c| {
1494 c.iter().flat_map(|l| {
1495 l.old_value
1496 .iter()
1497 .chain(l.new_value.iter())
1498 .map(|t| (t.object_id, t.file_size))
1499 })
1500 }))
1501 .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1502 .chain(
1503 version
1504 .vector_indexes
1505 .values()
1506 .flat_map(|index| index.get_objects()),
1507 )
1508 .collect()
1509}
1510
1511pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1514 let mut res = Vec::new();
1515 for (group_id, levels) in &version.levels {
1517 if levels.group_id != *group_id {
1519 res.push(format!(
1520 "GROUP {}: inconsistent group id {} in Levels",
1521 group_id, levels.group_id
1522 ));
1523 }
1524
1525 let validate_level = |group: CompactionGroupId,
1526 expected_level_idx: u32,
1527 level: &Level,
1528 res: &mut Vec<String>| {
1529 let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1530 if level.level_idx == 0 {
1531 level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1532 if level.table_infos.is_empty() {
1534 res.push(format!("{}: empty level", level_identifier));
1535 }
1536 } else if level.level_type != PbLevelType::Nonoverlapping {
1537 res.push(format!(
1539 "{}: level type {:?} is not non-overlapping",
1540 level_identifier, level.level_type
1541 ));
1542 }
1543
1544 if level.level_idx != expected_level_idx {
1546 res.push(format!(
1547 "{}: mismatched level idx {}",
1548 level_identifier, expected_level_idx
1549 ));
1550 }
1551
1552 let mut prev_table_info: Option<&SstableInfo> = None;
1553 for table_info in &level.table_infos {
1554 if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1556 res.push(format!(
1557 "{} SST {}: table_ids not sorted",
1558 level_identifier, table_info.object_id
1559 ));
1560 }
1561
1562 if level.level_type == PbLevelType::Nonoverlapping {
1564 if let Some(prev) = prev_table_info.take()
1565 && prev
1566 .key_range
1567 .compare_right_with(&table_info.key_range.left)
1568 != Ordering::Less
1569 {
1570 res.push(format!(
1571 "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1572 level_identifier, table_info.object_id, prev, table_info
1573 ));
1574 }
1575 let _ = prev_table_info.insert(table_info);
1576 }
1577 }
1578 };
1579
1580 let l0 = &levels.l0;
1581 let mut prev_sub_level_id = u64::MAX;
1582 for sub_level in &l0.sub_levels {
1583 if sub_level.sub_level_id >= prev_sub_level_id {
1585 res.push(format!(
1586 "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1587 group_id, sub_level.level_idx, prev_sub_level_id
1588 ));
1589 }
1590 prev_sub_level_id = sub_level.sub_level_id;
1591
1592 validate_level(*group_id, 0, sub_level, &mut res);
1593 }
1594
1595 for idx in 1..=levels.levels.len() {
1596 validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1597 }
1598 }
1599 res
1600}
1601
1602#[cfg(test)]
1603mod tests {
1604 use std::collections::{HashMap, HashSet};
1605
1606 use bytes::Bytes;
1607 use risingwave_common::catalog::TableId;
1608 use risingwave_common::hash::VirtualNode;
1609 use risingwave_common::util::epoch::test_epoch;
1610 use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1611
1612 use super::group_split;
1613 use crate::HummockVersionId;
1614 use crate::compaction_group::group_split::*;
1615 use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1616 use crate::key::{FullKey, gen_key_from_str};
1617 use crate::key_range::KeyRange;
1618 use crate::level::{Level, Levels, OverlappingLevel};
1619 use crate::sstable_info::{SstableInfo, SstableInfoInner};
1620 use crate::version::{
1621 GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1622 };
1623
1624 fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1625 gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1626 }
1627
1628 fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1629 let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1630 let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1631 let full_key_l = FullKey::for_test(
1632 TableId::new(*table_ids.first().unwrap()),
1633 table_key_l,
1634 epoch,
1635 )
1636 .encode();
1637 let full_key_r =
1638 FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1639 .encode();
1640
1641 SstableInfoInner {
1642 sst_id: sst_id.into(),
1643 key_range: KeyRange {
1644 left: full_key_l.into(),
1645 right: full_key_r.into(),
1646 right_exclusive: false,
1647 },
1648 table_ids: table_ids.into_iter().map(Into::into).collect(),
1649 object_id: sst_id.into(),
1650 min_epoch: 20,
1651 max_epoch: 20,
1652 file_size: 100,
1653 sst_size: 100,
1654 ..Default::default()
1655 }
1656 }
1657
1658 #[test]
1659 fn test_get_sst_object_ids() {
1660 let mut version = HummockVersion {
1661 id: HummockVersionId::new(0),
1662 levels: HashMap::from_iter([(
1663 0.into(),
1664 Levels {
1665 levels: vec![],
1666 l0: OverlappingLevel {
1667 sub_levels: vec![],
1668 total_file_size: 0,
1669 uncompressed_file_size: 0,
1670 },
1671 ..Default::default()
1672 },
1673 )]),
1674 ..Default::default()
1675 };
1676 assert_eq!(version.get_object_ids(false).count(), 0);
1677
1678 version
1680 .levels
1681 .get_mut(&0)
1682 .unwrap()
1683 .l0
1684 .sub_levels
1685 .push(Level {
1686 table_infos: vec![
1687 SstableInfoInner {
1688 object_id: 11.into(),
1689 sst_id: 11.into(),
1690 ..Default::default()
1691 }
1692 .into(),
1693 ],
1694 ..Default::default()
1695 });
1696 assert_eq!(version.get_object_ids(false).count(), 1);
1697
1698 version.levels.get_mut(&0).unwrap().levels.push(Level {
1700 table_infos: vec![
1701 SstableInfoInner {
1702 object_id: 22.into(),
1703 sst_id: 22.into(),
1704 ..Default::default()
1705 }
1706 .into(),
1707 ],
1708 ..Default::default()
1709 });
1710 assert_eq!(version.get_object_ids(false).count(), 2);
1711 }
1712
1713 #[test]
1714 fn test_apply_version_delta() {
1715 let mut version = HummockVersion {
1716 id: HummockVersionId::new(0),
1717 levels: HashMap::from_iter([
1718 (
1719 0.into(),
1720 build_initial_compaction_group_levels(
1721 0,
1722 &CompactionConfig {
1723 max_level: 6,
1724 ..Default::default()
1725 },
1726 ),
1727 ),
1728 (
1729 1.into(),
1730 build_initial_compaction_group_levels(
1731 1,
1732 &CompactionConfig {
1733 max_level: 6,
1734 ..Default::default()
1735 },
1736 ),
1737 ),
1738 ]),
1739 ..Default::default()
1740 };
1741 let version_delta = HummockVersionDelta {
1742 id: HummockVersionId::new(1),
1743 group_deltas: HashMap::from_iter([
1744 (
1745 2.into(),
1746 GroupDeltas {
1747 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1748 group_config: Some(CompactionConfig {
1749 max_level: 6,
1750 ..Default::default()
1751 }),
1752 ..Default::default()
1753 }))],
1754 },
1755 ),
1756 (
1757 0.into(),
1758 GroupDeltas {
1759 group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1760 },
1761 ),
1762 (
1763 1.into(),
1764 GroupDeltas {
1765 group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1766 1,
1767 0,
1768 HashSet::new(),
1769 vec![
1770 SstableInfoInner {
1771 object_id: 1.into(),
1772 sst_id: 1.into(),
1773 ..Default::default()
1774 }
1775 .into(),
1776 ],
1777 0,
1778 version
1779 .levels
1780 .get(&1)
1781 .as_ref()
1782 .unwrap()
1783 .compaction_group_version_id,
1784 ))],
1785 },
1786 ),
1787 ]),
1788 ..Default::default()
1789 };
1790 let version_delta = version_delta;
1791
1792 version.apply_version_delta(&version_delta);
1793 let mut cg1 = build_initial_compaction_group_levels(
1794 1,
1795 &CompactionConfig {
1796 max_level: 6,
1797 ..Default::default()
1798 },
1799 );
1800 cg1.levels[0] = Level {
1801 level_idx: 1,
1802 level_type: LevelType::Nonoverlapping,
1803 table_infos: vec![
1804 SstableInfoInner {
1805 object_id: 1.into(),
1806 sst_id: 1.into(),
1807 ..Default::default()
1808 }
1809 .into(),
1810 ],
1811 ..Default::default()
1812 };
1813 assert_eq!(
1814 version,
1815 HummockVersion {
1816 id: HummockVersionId::new(1),
1817 levels: HashMap::from_iter([
1818 (
1819 2.into(),
1820 build_initial_compaction_group_levels(
1821 2,
1822 &CompactionConfig {
1823 max_level: 6,
1824 ..Default::default()
1825 },
1826 ),
1827 ),
1828 (1.into(), cg1),
1829 ]),
1830 ..Default::default()
1831 }
1832 );
1833 }
1834
1835 fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1836 gen_sst_info_impl(object_id, table_ids, left, right).into()
1837 }
1838
1839 fn gen_sst_info_impl(
1840 object_id: u64,
1841 table_ids: Vec<u32>,
1842 left: Bytes,
1843 right: Bytes,
1844 ) -> SstableInfoInner {
1845 SstableInfoInner {
1846 object_id: object_id.into(),
1847 sst_id: object_id.into(),
1848 key_range: KeyRange {
1849 left,
1850 right,
1851 right_exclusive: false,
1852 },
1853 table_ids: table_ids.into_iter().map(Into::into).collect(),
1854 file_size: 100,
1855 sst_size: 100,
1856 uncompressed_file_size: 100,
1857 ..Default::default()
1858 }
1859 }
1860
1861 #[test]
1862 fn test_merge_levels() {
1863 let mut left_levels = build_initial_compaction_group_levels(
1864 1,
1865 &CompactionConfig {
1866 max_level: 6,
1867 ..Default::default()
1868 },
1869 );
1870
1871 let mut right_levels = build_initial_compaction_group_levels(
1872 2,
1873 &CompactionConfig {
1874 max_level: 6,
1875 ..Default::default()
1876 },
1877 );
1878
1879 left_levels.levels[0] = Level {
1880 level_idx: 1,
1881 level_type: LevelType::Nonoverlapping,
1882 table_infos: vec![
1883 gen_sst_info(
1884 1,
1885 vec![3],
1886 FullKey::for_test(
1887 TableId::new(3),
1888 gen_key_from_str(VirtualNode::from_index(1), "1"),
1889 0,
1890 )
1891 .encode()
1892 .into(),
1893 FullKey::for_test(
1894 TableId::new(3),
1895 gen_key_from_str(VirtualNode::from_index(200), "1"),
1896 0,
1897 )
1898 .encode()
1899 .into(),
1900 ),
1901 gen_sst_info(
1902 10,
1903 vec![3, 4],
1904 FullKey::for_test(
1905 TableId::new(3),
1906 gen_key_from_str(VirtualNode::from_index(201), "1"),
1907 0,
1908 )
1909 .encode()
1910 .into(),
1911 FullKey::for_test(
1912 TableId::new(4),
1913 gen_key_from_str(VirtualNode::from_index(10), "1"),
1914 0,
1915 )
1916 .encode()
1917 .into(),
1918 ),
1919 gen_sst_info(
1920 11,
1921 vec![4],
1922 FullKey::for_test(
1923 TableId::new(4),
1924 gen_key_from_str(VirtualNode::from_index(11), "1"),
1925 0,
1926 )
1927 .encode()
1928 .into(),
1929 FullKey::for_test(
1930 TableId::new(4),
1931 gen_key_from_str(VirtualNode::from_index(200), "1"),
1932 0,
1933 )
1934 .encode()
1935 .into(),
1936 ),
1937 ],
1938 total_file_size: 300,
1939 ..Default::default()
1940 };
1941
1942 left_levels.l0.sub_levels.push(Level {
1943 level_idx: 0,
1944 table_infos: vec![gen_sst_info(
1945 3,
1946 vec![3],
1947 FullKey::for_test(
1948 TableId::new(3),
1949 gen_key_from_str(VirtualNode::from_index(1), "1"),
1950 0,
1951 )
1952 .encode()
1953 .into(),
1954 FullKey::for_test(
1955 TableId::new(3),
1956 gen_key_from_str(VirtualNode::from_index(200), "1"),
1957 0,
1958 )
1959 .encode()
1960 .into(),
1961 )],
1962 sub_level_id: 101,
1963 level_type: LevelType::Overlapping,
1964 total_file_size: 100,
1965 ..Default::default()
1966 });
1967
1968 left_levels.l0.sub_levels.push(Level {
1969 level_idx: 0,
1970 table_infos: vec![gen_sst_info(
1971 3,
1972 vec![3],
1973 FullKey::for_test(
1974 TableId::new(3),
1975 gen_key_from_str(VirtualNode::from_index(1), "1"),
1976 0,
1977 )
1978 .encode()
1979 .into(),
1980 FullKey::for_test(
1981 TableId::new(3),
1982 gen_key_from_str(VirtualNode::from_index(200), "1"),
1983 0,
1984 )
1985 .encode()
1986 .into(),
1987 )],
1988 sub_level_id: 103,
1989 level_type: LevelType::Overlapping,
1990 total_file_size: 100,
1991 ..Default::default()
1992 });
1993
1994 left_levels.l0.sub_levels.push(Level {
1995 level_idx: 0,
1996 table_infos: vec![gen_sst_info(
1997 3,
1998 vec![3],
1999 FullKey::for_test(
2000 TableId::new(3),
2001 gen_key_from_str(VirtualNode::from_index(1), "1"),
2002 0,
2003 )
2004 .encode()
2005 .into(),
2006 FullKey::for_test(
2007 TableId::new(3),
2008 gen_key_from_str(VirtualNode::from_index(200), "1"),
2009 0,
2010 )
2011 .encode()
2012 .into(),
2013 )],
2014 sub_level_id: 105,
2015 level_type: LevelType::Nonoverlapping,
2016 total_file_size: 100,
2017 ..Default::default()
2018 });
2019
2020 right_levels.levels[0] = Level {
2021 level_idx: 1,
2022 level_type: LevelType::Nonoverlapping,
2023 table_infos: vec![
2024 gen_sst_info(
2025 1,
2026 vec![5],
2027 FullKey::for_test(
2028 TableId::new(5),
2029 gen_key_from_str(VirtualNode::from_index(1), "1"),
2030 0,
2031 )
2032 .encode()
2033 .into(),
2034 FullKey::for_test(
2035 TableId::new(5),
2036 gen_key_from_str(VirtualNode::from_index(200), "1"),
2037 0,
2038 )
2039 .encode()
2040 .into(),
2041 ),
2042 gen_sst_info(
2043 10,
2044 vec![5, 6],
2045 FullKey::for_test(
2046 TableId::new(5),
2047 gen_key_from_str(VirtualNode::from_index(201), "1"),
2048 0,
2049 )
2050 .encode()
2051 .into(),
2052 FullKey::for_test(
2053 TableId::new(6),
2054 gen_key_from_str(VirtualNode::from_index(10), "1"),
2055 0,
2056 )
2057 .encode()
2058 .into(),
2059 ),
2060 gen_sst_info(
2061 11,
2062 vec![6],
2063 FullKey::for_test(
2064 TableId::new(6),
2065 gen_key_from_str(VirtualNode::from_index(11), "1"),
2066 0,
2067 )
2068 .encode()
2069 .into(),
2070 FullKey::for_test(
2071 TableId::new(6),
2072 gen_key_from_str(VirtualNode::from_index(200), "1"),
2073 0,
2074 )
2075 .encode()
2076 .into(),
2077 ),
2078 ],
2079 total_file_size: 300,
2080 ..Default::default()
2081 };
2082
2083 right_levels.l0.sub_levels.push(Level {
2084 level_idx: 0,
2085 table_infos: vec![gen_sst_info(
2086 3,
2087 vec![5],
2088 FullKey::for_test(
2089 TableId::new(5),
2090 gen_key_from_str(VirtualNode::from_index(1), "1"),
2091 0,
2092 )
2093 .encode()
2094 .into(),
2095 FullKey::for_test(
2096 TableId::new(5),
2097 gen_key_from_str(VirtualNode::from_index(200), "1"),
2098 0,
2099 )
2100 .encode()
2101 .into(),
2102 )],
2103 sub_level_id: 101,
2104 level_type: LevelType::Overlapping,
2105 total_file_size: 100,
2106 ..Default::default()
2107 });
2108
2109 right_levels.l0.sub_levels.push(Level {
2110 level_idx: 0,
2111 table_infos: vec![gen_sst_info(
2112 5,
2113 vec![5],
2114 FullKey::for_test(
2115 TableId::new(5),
2116 gen_key_from_str(VirtualNode::from_index(1), "1"),
2117 0,
2118 )
2119 .encode()
2120 .into(),
2121 FullKey::for_test(
2122 TableId::new(5),
2123 gen_key_from_str(VirtualNode::from_index(200), "1"),
2124 0,
2125 )
2126 .encode()
2127 .into(),
2128 )],
2129 sub_level_id: 102,
2130 level_type: LevelType::Overlapping,
2131 total_file_size: 100,
2132 ..Default::default()
2133 });
2134
2135 right_levels.l0.sub_levels.push(Level {
2136 level_idx: 0,
2137 table_infos: vec![gen_sst_info(
2138 3,
2139 vec![5],
2140 FullKey::for_test(
2141 TableId::new(5),
2142 gen_key_from_str(VirtualNode::from_index(1), "1"),
2143 0,
2144 )
2145 .encode()
2146 .into(),
2147 FullKey::for_test(
2148 TableId::new(5),
2149 gen_key_from_str(VirtualNode::from_index(200), "1"),
2150 0,
2151 )
2152 .encode()
2153 .into(),
2154 )],
2155 sub_level_id: 103,
2156 level_type: LevelType::Nonoverlapping,
2157 total_file_size: 100,
2158 ..Default::default()
2159 });
2160
2161 {
2162 let mut left_levels = Levels::default();
2164 let right_levels = Levels::default();
2165
2166 group_split::merge_levels(&mut left_levels, right_levels);
2167 }
2168
2169 {
2170 let mut left_levels = build_initial_compaction_group_levels(
2172 1,
2173 &CompactionConfig {
2174 max_level: 6,
2175 ..Default::default()
2176 },
2177 );
2178 let right_levels = right_levels.clone();
2179
2180 group_split::merge_levels(&mut left_levels, right_levels);
2181
2182 assert!(left_levels.l0.sub_levels.len() == 3);
2183 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2184 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2185 assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2186 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2187 assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2188 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2189
2190 assert!(left_levels.levels[0].level_idx == 1);
2191 assert_eq!(300, left_levels.levels[0].total_file_size);
2192 }
2193
2194 {
2195 let mut left_levels = left_levels.clone();
2197 let right_levels = build_initial_compaction_group_levels(
2198 2,
2199 &CompactionConfig {
2200 max_level: 6,
2201 ..Default::default()
2202 },
2203 );
2204
2205 group_split::merge_levels(&mut left_levels, right_levels);
2206
2207 assert!(left_levels.l0.sub_levels.len() == 3);
2208 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2209 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2210 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2211 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2212 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2213 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2214
2215 assert!(left_levels.levels[0].level_idx == 1);
2216 assert_eq!(300, left_levels.levels[0].total_file_size);
2217 }
2218
2219 {
2220 let mut left_levels = left_levels.clone();
2221 let right_levels = right_levels.clone();
2222
2223 group_split::merge_levels(&mut left_levels, right_levels);
2224
2225 assert!(left_levels.l0.sub_levels.len() == 6);
2226 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2227 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2228 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2229 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2230 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2231 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2232 assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2233 assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2234 assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2235 assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2236 assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2237 assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2238
2239 assert!(left_levels.levels[0].level_idx == 1);
2240 assert_eq!(600, left_levels.levels[0].total_file_size);
2241 }
2242 }
2243
2244 #[test]
2245 fn test_get_split_pos() {
2246 let epoch = test_epoch(1);
2247 let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2248 let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2249 let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2250
2251 let ssts = vec![s1, s2, s3];
2252 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2253
2254 let pos = group_split::get_split_pos(&ssts, split_key.clone());
2255 assert_eq!(1, pos);
2256
2257 let pos = group_split::get_split_pos(&vec![], split_key);
2258 assert_eq!(0, pos);
2259 }
2260
2261 #[test]
2262 fn test_split_sst() {
2263 let epoch = test_epoch(1);
2264 let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2265
2266 {
2267 let split_key = group_split::build_split_key(3.into(), VirtualNode::ZERO);
2268 let origin_sst = sst.clone();
2269 let sst_size = origin_sst.sst_size;
2270 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2271 assert_eq!(SstSplitType::Both, split_type);
2272
2273 let mut new_sst_id = 10.into();
2274 let (origin_sst, branched_sst) = group_split::split_sst(
2275 origin_sst,
2276 &mut new_sst_id,
2277 split_key,
2278 sst_size / 2,
2279 sst_size / 2,
2280 );
2281
2282 let origin_sst = origin_sst.unwrap();
2283 let branched_sst = branched_sst.unwrap();
2284
2285 assert!(origin_sst.key_range.right_exclusive);
2286 assert!(
2287 origin_sst
2288 .key_range
2289 .right
2290 .cmp(&branched_sst.key_range.left)
2291 .is_le()
2292 );
2293 assert!(origin_sst.table_ids.is_sorted());
2294 assert!(branched_sst.table_ids.is_sorted());
2295 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2296 assert!(branched_sst.sst_size < origin_sst.file_size);
2297 assert_eq!(10, branched_sst.sst_id);
2298 assert_eq!(11, origin_sst.sst_id);
2299 assert_eq!(3, branched_sst.table_ids.first().unwrap().as_raw_id()); }
2301
2302 {
2303 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2305 let origin_sst = sst.clone();
2306 let sst_size = origin_sst.sst_size;
2307 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2308 assert_eq!(SstSplitType::Both, split_type);
2309
2310 let mut new_sst_id = 10.into();
2311 let (origin_sst, branched_sst) = group_split::split_sst(
2312 origin_sst,
2313 &mut new_sst_id,
2314 split_key,
2315 sst_size / 2,
2316 sst_size / 2,
2317 );
2318
2319 let origin_sst = origin_sst.unwrap();
2320 let branched_sst = branched_sst.unwrap();
2321
2322 assert!(origin_sst.key_range.right_exclusive);
2323 assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2324 assert!(origin_sst.table_ids.is_sorted());
2325 assert!(branched_sst.table_ids.is_sorted());
2326 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2327 assert!(branched_sst.sst_size < origin_sst.file_size);
2328 assert_eq!(10, branched_sst.sst_id);
2329 assert_eq!(11, origin_sst.sst_id);
2330 assert_eq!(5, branched_sst.table_ids.first().unwrap().as_raw_id()); }
2332
2333 {
2334 let split_key = group_split::build_split_key(6.into(), VirtualNode::ZERO);
2335 let split_type = group_split::need_to_split(&sst, split_key);
2336 assert_eq!(SstSplitType::Left, split_type);
2337 }
2338
2339 {
2340 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2341 let origin_sst = sst.clone();
2342 let split_type = group_split::need_to_split(&origin_sst, split_key);
2343 assert_eq!(SstSplitType::Both, split_type);
2344
2345 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2346 let origin_sst = sst;
2347 let split_type = group_split::need_to_split(&origin_sst, split_key);
2348 assert_eq!(SstSplitType::Right, split_type);
2349 }
2350
2351 {
2352 let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2354 sst.key_range.right = sst.key_range.left.clone();
2355 let sst: SstableInfo = sst.into();
2356 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2357 let origin_sst = sst;
2358 let sst_size = origin_sst.sst_size;
2359
2360 let mut new_sst_id = 10.into();
2361 let (origin_sst, branched_sst) = group_split::split_sst(
2362 origin_sst,
2363 &mut new_sst_id,
2364 split_key,
2365 sst_size / 2,
2366 sst_size / 2,
2367 );
2368
2369 assert!(origin_sst.is_none());
2370 assert!(branched_sst.is_some());
2371 }
2372 }
2373
2374 #[test]
2375 fn test_split_sst_info_for_level() {
2376 let mut version = HummockVersion {
2377 id: HummockVersionId::new(0),
2378 levels: HashMap::from_iter([(
2379 1.into(),
2380 build_initial_compaction_group_levels(
2381 1,
2382 &CompactionConfig {
2383 max_level: 6,
2384 ..Default::default()
2385 },
2386 ),
2387 )]),
2388 ..Default::default()
2389 };
2390
2391 let cg1 = version.levels.get_mut(&1).unwrap();
2392
2393 cg1.levels[0] = Level {
2394 level_idx: 1,
2395 level_type: LevelType::Nonoverlapping,
2396 table_infos: vec![
2397 gen_sst_info(
2398 1,
2399 vec![3],
2400 FullKey::for_test(
2401 TableId::new(3),
2402 gen_key_from_str(VirtualNode::from_index(1), "1"),
2403 0,
2404 )
2405 .encode()
2406 .into(),
2407 FullKey::for_test(
2408 TableId::new(3),
2409 gen_key_from_str(VirtualNode::from_index(200), "1"),
2410 0,
2411 )
2412 .encode()
2413 .into(),
2414 ),
2415 gen_sst_info(
2416 10,
2417 vec![3, 4],
2418 FullKey::for_test(
2419 TableId::new(3),
2420 gen_key_from_str(VirtualNode::from_index(201), "1"),
2421 0,
2422 )
2423 .encode()
2424 .into(),
2425 FullKey::for_test(
2426 TableId::new(4),
2427 gen_key_from_str(VirtualNode::from_index(10), "1"),
2428 0,
2429 )
2430 .encode()
2431 .into(),
2432 ),
2433 gen_sst_info(
2434 11,
2435 vec![4],
2436 FullKey::for_test(
2437 TableId::new(4),
2438 gen_key_from_str(VirtualNode::from_index(11), "1"),
2439 0,
2440 )
2441 .encode()
2442 .into(),
2443 FullKey::for_test(
2444 TableId::new(4),
2445 gen_key_from_str(VirtualNode::from_index(200), "1"),
2446 0,
2447 )
2448 .encode()
2449 .into(),
2450 ),
2451 ],
2452 total_file_size: 300,
2453 ..Default::default()
2454 };
2455
2456 cg1.l0.sub_levels.push(Level {
2457 level_idx: 0,
2458 table_infos: vec![
2459 gen_sst_info(
2460 2,
2461 vec![2],
2462 FullKey::for_test(
2463 TableId::new(0),
2464 gen_key_from_str(VirtualNode::from_index(1), "1"),
2465 0,
2466 )
2467 .encode()
2468 .into(),
2469 FullKey::for_test(
2470 TableId::new(2),
2471 gen_key_from_str(VirtualNode::from_index(200), "1"),
2472 0,
2473 )
2474 .encode()
2475 .into(),
2476 ),
2477 gen_sst_info(
2478 22,
2479 vec![2],
2480 FullKey::for_test(
2481 TableId::new(0),
2482 gen_key_from_str(VirtualNode::from_index(1), "1"),
2483 0,
2484 )
2485 .encode()
2486 .into(),
2487 FullKey::for_test(
2488 TableId::new(2),
2489 gen_key_from_str(VirtualNode::from_index(200), "1"),
2490 0,
2491 )
2492 .encode()
2493 .into(),
2494 ),
2495 gen_sst_info(
2496 23,
2497 vec![2],
2498 FullKey::for_test(
2499 TableId::new(0),
2500 gen_key_from_str(VirtualNode::from_index(1), "1"),
2501 0,
2502 )
2503 .encode()
2504 .into(),
2505 FullKey::for_test(
2506 TableId::new(2),
2507 gen_key_from_str(VirtualNode::from_index(200), "1"),
2508 0,
2509 )
2510 .encode()
2511 .into(),
2512 ),
2513 gen_sst_info(
2514 24,
2515 vec![2],
2516 FullKey::for_test(
2517 TableId::new(2),
2518 gen_key_from_str(VirtualNode::from_index(1), "1"),
2519 0,
2520 )
2521 .encode()
2522 .into(),
2523 FullKey::for_test(
2524 TableId::new(2),
2525 gen_key_from_str(VirtualNode::from_index(200), "1"),
2526 0,
2527 )
2528 .encode()
2529 .into(),
2530 ),
2531 gen_sst_info(
2532 25,
2533 vec![2],
2534 FullKey::for_test(
2535 TableId::new(0),
2536 gen_key_from_str(VirtualNode::from_index(1), "1"),
2537 0,
2538 )
2539 .encode()
2540 .into(),
2541 FullKey::for_test(
2542 TableId::new(0),
2543 gen_key_from_str(VirtualNode::from_index(200), "1"),
2544 0,
2545 )
2546 .encode()
2547 .into(),
2548 ),
2549 ],
2550 sub_level_id: 101,
2551 level_type: LevelType::Overlapping,
2552 total_file_size: 300,
2553 ..Default::default()
2554 });
2555
2556 {
2557 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2559
2560 let mut new_sst_id = 100.into();
2561 let x = group_split::split_sst_info_for_level_v2(
2562 &mut cg1.l0.sub_levels[0],
2563 &mut new_sst_id,
2564 split_key,
2565 );
2566 let mut right_l0 = OverlappingLevel {
2575 sub_levels: vec![],
2576 total_file_size: 0,
2577 uncompressed_file_size: 0,
2578 };
2579
2580 right_l0.sub_levels.push(Level {
2581 level_idx: 0,
2582 table_infos: x,
2583 sub_level_id: 101,
2584 total_file_size: 100,
2585 level_type: LevelType::Overlapping,
2586 ..Default::default()
2587 });
2588
2589 let right_levels = Levels {
2590 levels: vec![],
2591 l0: right_l0,
2592 ..Default::default()
2593 };
2594
2595 merge_levels(cg1, right_levels);
2596 }
2597
2598 {
2599 let mut new_sst_id = 100.into();
2601 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2602 let x = group_split::split_sst_info_for_level_v2(
2603 &mut cg1.levels[2],
2604 &mut new_sst_id,
2605 split_key,
2606 );
2607
2608 assert!(x.is_empty());
2609 }
2610
2611 {
2612 let mut cg1 = cg1.clone();
2614 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2615
2616 let mut new_sst_id = 100.into();
2617 let x = group_split::split_sst_info_for_level_v2(
2618 &mut cg1.levels[0],
2619 &mut new_sst_id,
2620 split_key,
2621 );
2622
2623 assert_eq!(3, x.len());
2624 assert_eq!(1, x[0].sst_id);
2625 assert_eq!(100, x[0].sst_size);
2626 assert_eq!(10, x[1].sst_id);
2627 assert_eq!(100, x[1].sst_size);
2628 assert_eq!(11, x[2].sst_id);
2629 assert_eq!(100, x[2].sst_size);
2630
2631 assert_eq!(0, cg1.levels[0].table_infos.len());
2632 }
2633
2634 {
2635 let mut cg1 = cg1.clone();
2637 let split_key = group_split::build_split_key(5.into(), VirtualNode::ZERO);
2638
2639 let mut new_sst_id = 100.into();
2640 let x = group_split::split_sst_info_for_level_v2(
2641 &mut cg1.levels[0],
2642 &mut new_sst_id,
2643 split_key,
2644 );
2645
2646 assert_eq!(0, x.len());
2647 assert_eq!(3, cg1.levels[0].table_infos.len());
2648 }
2649
2650 {
2676 let mut cg1 = cg1.clone();
2678 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2679
2680 let mut new_sst_id = 100.into();
2681 let x = group_split::split_sst_info_for_level_v2(
2682 &mut cg1.levels[0],
2683 &mut new_sst_id,
2684 split_key,
2685 );
2686
2687 assert_eq!(2, x.len());
2688 assert_eq!(100, x[0].sst_id);
2689 assert_eq!(100 / 2, x[0].sst_size);
2690 assert_eq!(11, x[1].sst_id);
2691 assert_eq!(100, x[1].sst_size);
2692 assert_eq!(vec![TableId::new(4)], x[1].table_ids);
2693
2694 assert_eq!(2, cg1.levels[0].table_infos.len());
2695 assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2696 assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2697 assert_eq!(
2698 vec![TableId::new(3)],
2699 cg1.levels[0].table_infos[1].table_ids
2700 );
2701 }
2702 }
2703
2704 fn make_sst(sst_id: u64, table_ids: Vec<u32>, sst_size: u64) -> SstableInfo {
2705 SstableInfoInner {
2706 sst_id: sst_id.into(),
2707 object_id: sst_id.into(),
2708 table_ids: table_ids.into_iter().map(TableId::new).collect(),
2709 file_size: sst_size,
2710 sst_size,
2711 uncompressed_file_size: sst_size * 2,
2712 ..Default::default()
2713 }
2714 .into()
2715 }
2716
2717 #[test]
2718 fn test_level_normalize() {
2719 let mut level = Level {
2721 level_idx: 1,
2722 level_type: LevelType::Nonoverlapping,
2723 table_infos: vec![
2724 make_sst(1, vec![1, 2], 100),
2725 make_sst(2, vec![], 200), make_sst(3, vec![3], 300),
2727 make_sst(4, vec![], 400), ],
2729 total_file_size: 9999, uncompressed_file_size: 9999,
2731 ..Default::default()
2732 };
2733
2734 level.normalize();
2735
2736 assert_eq!(level.table_infos.len(), 2);
2737 assert_eq!(1, level.table_infos[0].sst_id);
2738 assert_eq!(3, level.table_infos[1].sst_id);
2739 assert_eq!(level.total_file_size, 400);
2740 assert_eq!(level.uncompressed_file_size, 800);
2741
2742 level.total_file_size = 0;
2744 level.uncompressed_file_size = 0;
2745 level.normalize();
2746 assert_eq!(level.table_infos.len(), 2);
2747 assert_eq!(level.total_file_size, 400);
2748 assert_eq!(level.uncompressed_file_size, 800);
2749
2750 level.table_infos = vec![make_sst(10, vec![], 100), make_sst(11, vec![], 200)];
2752 level.normalize();
2753 assert!(level.table_infos.is_empty());
2754 assert_eq!(level.total_file_size, 0);
2755 assert_eq!(level.uncompressed_file_size, 0);
2756 }
2757
2758 #[test]
2759 fn test_level_delete_ssts() {
2760 let mut level = Level {
2761 level_idx: 1,
2762 level_type: LevelType::Nonoverlapping,
2763 table_infos: vec![
2764 make_sst(1, vec![1], 100),
2765 make_sst(2, vec![2], 200),
2766 make_sst(3, vec![3], 300),
2767 ],
2768 total_file_size: 600,
2769 uncompressed_file_size: 1200,
2770 ..Default::default()
2771 };
2772
2773 let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([2.into()]);
2774 let changed = level.delete_ssts(&delete_ids);
2775
2776 assert!(changed);
2777 assert_eq!(level.table_infos.len(), 2);
2778 assert_eq!(1, level.table_infos[0].sst_id);
2779 assert_eq!(3, level.table_infos[1].sst_id);
2780 assert_eq!(level.total_file_size, 400);
2781 assert_eq!(level.uncompressed_file_size, 800);
2782
2783 let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([999.into()]);
2785 let changed = level.delete_ssts(&delete_ids);
2786 assert!(!changed);
2787 assert_eq!(level.table_infos.len(), 2);
2788 }
2789
2790 #[test]
2791 fn test_level_truncate_tables() {
2792 let mut level = Level {
2793 level_idx: 1,
2794 level_type: LevelType::Nonoverlapping,
2795 table_infos: vec![
2796 make_sst(1, vec![1, 2], 100),
2797 make_sst(2, vec![2, 3], 200),
2798 make_sst(3, vec![2], 300),
2799 ],
2800 total_file_size: 600,
2801 uncompressed_file_size: 1200,
2802 ..Default::default()
2803 };
2804
2805 let truncate_ids = HashSet::from([TableId::new(2)]);
2806 level.truncate_tables(&truncate_ids);
2807
2808 assert_eq!(level.table_infos.len(), 2);
2810 assert_eq!(level.table_infos[0].table_ids, vec![TableId::new(1)]);
2811 assert_eq!(level.table_infos[1].table_ids, vec![TableId::new(3)]);
2812 assert_eq!(level.total_file_size, 100 + 200);
2813 assert_eq!(level.uncompressed_file_size, 200 + 400);
2814
2815 let truncate_ids = HashSet::from([TableId::new(1), TableId::new(3)]);
2817 level.truncate_tables(&truncate_ids);
2818 assert!(level.table_infos.is_empty());
2819 assert_eq!(level.total_file_size, 0);
2820 assert_eq!(level.uncompressed_file_size, 0);
2821 }
2822
2823 #[test]
2824 fn test_overlapping_level_normalize() {
2825 let mut l0 = OverlappingLevel {
2826 sub_levels: vec![
2827 Level {
2828 level_idx: 0,
2829 table_infos: vec![make_sst(1, vec![1], 100)],
2830 total_file_size: 100,
2831 uncompressed_file_size: 200,
2832 sub_level_id: 1,
2833 ..Default::default()
2834 },
2835 Level {
2836 level_idx: 0,
2837 table_infos: vec![], total_file_size: 0,
2839 uncompressed_file_size: 0,
2840 sub_level_id: 2,
2841 ..Default::default()
2842 },
2843 Level {
2844 level_idx: 0,
2845 table_infos: vec![make_sst(3, vec![3], 300)],
2846 total_file_size: 300,
2847 uncompressed_file_size: 600,
2848 sub_level_id: 3,
2849 ..Default::default()
2850 },
2851 ],
2852 total_file_size: 9999, uncompressed_file_size: 9999,
2854 };
2855
2856 l0.normalize();
2857
2858 assert_eq!(l0.sub_levels.len(), 2);
2859 assert_eq!(l0.sub_levels[0].sub_level_id, 1);
2860 assert_eq!(l0.sub_levels[1].sub_level_id, 3);
2861 assert_eq!(l0.total_file_size, 100 + 300);
2862 assert_eq!(l0.uncompressed_file_size, 200 + 600);
2863
2864 l0.sub_levels = vec![Level {
2866 level_idx: 0,
2867 table_infos: vec![],
2868 ..Default::default()
2869 }];
2870 l0.normalize();
2871 assert!(l0.sub_levels.is_empty());
2872 assert_eq!(l0.total_file_size, 0);
2873 assert_eq!(l0.uncompressed_file_size, 0);
2874 }
2875
2876 #[test]
2877 fn test_levels_truncate_tables() {
2878 #[expect(deprecated)]
2879 let mut levels = Levels {
2880 l0: OverlappingLevel {
2881 sub_levels: vec![
2882 Level {
2883 level_idx: 0,
2884 table_infos: vec![
2885 make_sst(1, vec![10], 100), make_sst(2, vec![20], 200), ],
2888 total_file_size: 300,
2889 uncompressed_file_size: 600,
2890 sub_level_id: 1,
2891 ..Default::default()
2892 },
2893 Level {
2894 level_idx: 0,
2895 table_infos: vec![
2896 make_sst(3, vec![10], 150), ],
2898 total_file_size: 150,
2899 uncompressed_file_size: 300,
2900 sub_level_id: 2,
2901 ..Default::default()
2902 },
2903 ],
2904 total_file_size: 450,
2905 uncompressed_file_size: 900,
2906 },
2907 levels: vec![Level {
2908 level_idx: 1,
2909 level_type: LevelType::Nonoverlapping,
2910 table_infos: vec![
2911 make_sst(4, vec![10, 20], 400), make_sst(5, vec![10], 500), ],
2914 total_file_size: 900,
2915 uncompressed_file_size: 1800,
2916 ..Default::default()
2917 }],
2918 group_id: 1.into(),
2919 parent_group_id: 0.into(),
2920 member_table_ids: vec![],
2921 compaction_group_version_id: 0,
2922 };
2923
2924 levels.truncate_tables(&HashSet::from([TableId::new(10)]));
2926
2927 assert_eq!(levels.l0.sub_levels.len(), 1);
2928 assert_eq!(levels.l0.sub_levels[0].sub_level_id, 1);
2929 assert_eq!(levels.l0.sub_levels[0].table_infos.len(), 1);
2930 assert_eq!(2, levels.l0.sub_levels[0].table_infos[0].sst_id);
2931 assert_eq!(levels.l0.sub_levels[0].total_file_size, 200);
2932 assert_eq!(levels.l0.sub_levels[0].uncompressed_file_size, 400);
2933
2934 assert_eq!(levels.l0.total_file_size, 200);
2935 assert_eq!(levels.l0.uncompressed_file_size, 400);
2936
2937 assert_eq!(levels.levels[0].table_infos.len(), 1);
2938 assert_eq!(4, levels.levels[0].table_infos[0].sst_id);
2939 assert_eq!(
2940 levels.levels[0].table_infos[0].table_ids,
2941 vec![TableId::new(20)]
2942 );
2943 assert_eq!(levels.levels[0].total_file_size, 400);
2944 assert_eq!(levels.levels[0].uncompressed_file_size, 800);
2945
2946 assert_eq!(levels.compaction_group_version_id, 1);
2947 }
2948
2949 #[test]
2950 fn test_apply_version_delta_truncate_tables() {
2951 let mut version = HummockVersion {
2952 id: HummockVersionId::new(0),
2953 levels: HashMap::from_iter([(1.into(), {
2954 #[expect(deprecated)]
2955 let levels = Levels {
2956 l0: OverlappingLevel {
2957 sub_levels: vec![
2958 Level {
2959 level_idx: 0,
2960 level_type: LevelType::Overlapping,
2961 table_infos: vec![
2962 make_sst(1, vec![100], 50), make_sst(2, vec![200], 60), ],
2965 total_file_size: 110,
2966 uncompressed_file_size: 220,
2967 sub_level_id: 1,
2968 ..Default::default()
2969 },
2970 Level {
2971 level_idx: 0,
2972 level_type: LevelType::Overlapping,
2973 table_infos: vec![
2974 make_sst(3, vec![100], 70), ],
2976 total_file_size: 70,
2977 uncompressed_file_size: 140,
2978 sub_level_id: 2,
2979 ..Default::default()
2980 },
2981 ],
2982 total_file_size: 180,
2983 uncompressed_file_size: 360,
2984 },
2985 levels: vec![Level {
2986 level_idx: 1,
2987 level_type: LevelType::Nonoverlapping,
2988 table_infos: vec![
2989 make_sst(4, vec![100, 200], 80), make_sst(5, vec![100], 90), ],
2992 total_file_size: 170,
2993 uncompressed_file_size: 340,
2994 ..Default::default()
2995 }],
2996 group_id: 1.into(),
2997 parent_group_id: 0.into(),
2998 member_table_ids: vec![],
2999 compaction_group_version_id: 0,
3000 };
3001 levels
3002 })]),
3003 ..Default::default()
3004 };
3005
3006 let version_delta = HummockVersionDelta {
3007 id: HummockVersionId::new(1),
3008 group_deltas: HashMap::from_iter([(
3009 1.into(),
3010 GroupDeltas {
3011 group_deltas: vec![GroupDelta::TruncateTables(HashSet::from([TableId::new(
3012 100,
3013 )]))],
3014 },
3015 )]),
3016 ..Default::default()
3017 };
3018
3019 version.apply_version_delta(&version_delta);
3020
3021 let cg = version.get_compaction_group_levels(1.into());
3022
3023 assert_eq!(
3024 cg.l0.sub_levels.len(),
3025 1,
3026 "empty sub-level should be removed"
3027 );
3028 assert_eq!(cg.l0.sub_levels[0].sub_level_id, 1);
3029 assert_eq!(cg.l0.sub_levels[0].table_infos.len(), 1);
3030 assert_eq!(2, cg.l0.sub_levels[0].table_infos[0].sst_id);
3031 for sub_level in &cg.l0.sub_levels {
3032 for sst in &sub_level.table_infos {
3033 assert!(
3034 !sst.table_ids.is_empty(),
3035 "SST {} should not have empty table_ids",
3036 sst.sst_id
3037 );
3038 }
3039 }
3040
3041 assert_eq!(cg.l0.total_file_size, 60);
3042 assert_eq!(cg.l0.uncompressed_file_size, 120);
3043
3044 assert_eq!(cg.levels[0].table_infos.len(), 1);
3045 assert_eq!(4, cg.levels[0].table_infos[0].sst_id);
3046 assert_eq!(
3047 cg.levels[0].table_infos[0].table_ids,
3048 vec![TableId::new(200)]
3049 );
3050 assert_eq!(cg.levels[0].total_file_size, 80);
3051 assert_eq!(cg.levels[0].uncompressed_file_size, 160);
3052
3053 assert_eq!(cg.compaction_group_version_id, 1);
3054 }
3055
3056 #[test]
3057 fn test_apply_version_delta_compact_l0() {
3058 let mut version = HummockVersion {
3059 id: HummockVersionId::new(0),
3060 levels: HashMap::from_iter([(1.into(), {
3061 #[expect(deprecated)]
3062 let levels = Levels {
3063 l0: OverlappingLevel {
3064 sub_levels: vec![
3065 Level {
3066 level_idx: 0,
3067 level_type: LevelType::Nonoverlapping,
3068 table_infos: vec![
3069 make_sst(1, vec![1], 100),
3070 make_sst(2, vec![2], 200),
3071 ],
3072 total_file_size: 300,
3073 uncompressed_file_size: 600,
3074 sub_level_id: 1,
3075 ..Default::default()
3076 },
3077 Level {
3078 level_idx: 0,
3079 level_type: LevelType::Nonoverlapping,
3080 table_infos: vec![make_sst(3, vec![3], 300)],
3081 total_file_size: 300,
3082 uncompressed_file_size: 600,
3083 sub_level_id: 2,
3084 ..Default::default()
3085 },
3086 ],
3087 total_file_size: 600,
3088 uncompressed_file_size: 1200,
3089 },
3090 levels: vec![Level {
3091 level_idx: 1,
3092 level_type: LevelType::Nonoverlapping,
3093 table_infos: vec![],
3094 total_file_size: 0,
3095 uncompressed_file_size: 0,
3096 ..Default::default()
3097 }],
3098 group_id: 1.into(),
3099 parent_group_id: 0.into(),
3100 member_table_ids: vec![],
3101 compaction_group_version_id: 0,
3102 };
3103 levels
3104 })]),
3105 ..Default::default()
3106 };
3107
3108 let version_delta = HummockVersionDelta {
3109 id: HummockVersionId::new(1),
3110 group_deltas: HashMap::from_iter([(
3111 1.into(),
3112 GroupDeltas {
3113 group_deltas: vec![
3114 GroupDelta::IntraLevel(IntraLevelDelta::new(
3115 0, 0,
3117 HashSet::from([1.into(), 2.into(), 3.into()]),
3118 vec![],
3119 0,
3120 0,
3121 )),
3122 GroupDelta::IntraLevel(IntraLevelDelta::new(
3123 1, 0,
3125 HashSet::new(),
3126 vec![make_sst(10, vec![1, 2, 3], 500)],
3127 0,
3128 0,
3129 )),
3130 ],
3131 },
3132 )]),
3133 ..Default::default()
3134 };
3135
3136 version.apply_version_delta(&version_delta);
3137
3138 let cg = version.get_compaction_group_levels(1.into());
3139
3140 assert!(cg.l0.sub_levels.is_empty());
3141 assert_eq!(cg.l0.total_file_size, 0);
3142 assert_eq!(cg.l0.uncompressed_file_size, 0);
3143
3144 assert_eq!(cg.levels[0].table_infos.len(), 1);
3145 assert_eq!(10, cg.levels[0].table_infos[0].sst_id);
3146 assert_eq!(cg.levels[0].total_file_size, 500);
3147 }
3148}