1use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_common::log::LogSuppressor;
27use risingwave_pb::hummock::{
28 CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
29};
30use tracing::warn;
31
32use super::group_split::split_sst_with_table_ids;
33use super::{StateTableId, group_split};
34use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
35use crate::compact_task::is_compaction_task_expired;
36use crate::compaction_group::StaticCompactionGroupId;
37use crate::key_range::KeyRangeCommon;
38use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
39use crate::sstable_info::SstableInfo;
40use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
41use crate::vector_index::apply_vector_index_delta;
42use crate::version::{
43 GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
44 IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
45};
46use crate::{
47 CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
48};
49
50#[derive(Debug, Clone, Default)]
51pub struct SstDeltaInfo {
52 pub insert_sst_level: u32,
53 pub insert_sst_infos: Vec<SstableInfo>,
54 pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
55}
56
57pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
58
59impl<L> HummockVersionCommon<SstableInfo, L> {
60 pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
61 self.levels
62 .get(&compaction_group_id)
63 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
64 }
65
66 pub fn get_compaction_group_levels_mut(
67 &mut self,
68 compaction_group_id: CompactionGroupId,
69 ) -> &mut Levels {
70 self.levels
71 .get_mut(&compaction_group_id)
72 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
73 }
74
75 pub fn get_sst_ids_by_group_id(
77 &self,
78 compaction_group_id: CompactionGroupId,
79 ) -> impl Iterator<Item = HummockSstableId> + '_ {
80 self.levels
81 .iter()
82 .filter_map(move |(cg_id, level)| {
83 if *cg_id == compaction_group_id {
84 Some(level)
85 } else {
86 None
87 }
88 })
89 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
90 .flat_map(|level| level.table_infos.iter())
91 .map(|s| s.sst_id)
92 }
93
94 pub fn level_iter<F: FnMut(&Level) -> bool>(
95 &self,
96 compaction_group_id: CompactionGroupId,
97 mut f: F,
98 ) {
99 if let Some(levels) = self.levels.get(&compaction_group_id) {
100 for sub_level in &levels.l0.sub_levels {
101 if !f(sub_level) {
102 return;
103 }
104 }
105 for level in &levels.levels {
106 if !f(level) {
107 return;
108 }
109 }
110 }
111 }
112
113 pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
114 self.levels
116 .get(&compaction_group_id)
117 .map(|group| group.levels.len() + 1)
118 .unwrap_or(0)
119 }
120
121 pub fn safe_epoch_table_watermarks(
122 &self,
123 existing_table_ids: &[TableId],
124 ) -> BTreeMap<TableId, TableWatermarks> {
125 safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
126 }
127}
128
129pub fn safe_epoch_table_watermarks_impl(
130 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
131 existing_table_ids: &[TableId],
132) -> BTreeMap<TableId, TableWatermarks> {
133 fn extract_single_table_watermark(
134 table_watermarks: &TableWatermarks,
135 ) -> Option<TableWatermarks> {
136 if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
137 Some(TableWatermarks {
138 watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
139 direction: table_watermarks.direction,
140 watermark_type: table_watermarks.watermark_type,
141 })
142 } else {
143 None
144 }
145 }
146 table_watermarks
147 .iter()
148 .filter_map(|(table_id, table_watermarks)| {
149 if !existing_table_ids.contains(table_id) {
150 None
151 } else {
152 extract_single_table_watermark(table_watermarks)
153 .map(|table_watermarks| (*table_id, table_watermarks))
154 }
155 })
156 .collect()
157}
158
159pub fn safe_epoch_read_table_watermarks_impl(
160 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
161) -> BTreeMap<TableId, ReadTableWatermark> {
162 safe_epoch_watermarks
163 .into_iter()
164 .map(|(table_id, watermarks)| {
165 assert_eq!(watermarks.watermarks.len(), 1);
166 let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
167 let mut vnode_watermark_map = BTreeMap::new();
168 for vnode_watermark in vnode_watermarks.iter() {
169 let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
170 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
171 assert!(
172 vnode_watermark_map
173 .insert(vnode, watermark.clone())
174 .is_none(),
175 "duplicate table watermark on vnode {}",
176 vnode.to_index()
177 );
178 }
179 }
180 (
181 table_id,
182 ReadTableWatermark {
183 direction: watermarks.direction,
184 vnode_watermarks: vnode_watermark_map,
185 },
186 )
187 })
188 .collect()
189}
190
191impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
192 pub fn count_new_ssts_in_group_split(
193 &self,
194 parent_group_id: CompactionGroupId,
195 split_key: Bytes,
196 ) -> u64 {
197 self.levels
198 .get(&parent_group_id)
199 .map_or(0, |parent_levels| {
200 let l0 = &parent_levels.l0;
201 let mut split_count = 0;
202 for sub_level in &l0.sub_levels {
203 assert!(!sub_level.table_infos.is_empty());
204
205 if sub_level.level_type == PbLevelType::Overlapping {
206 split_count += sub_level
208 .table_infos
209 .iter()
210 .map(|sst| {
211 if let group_split::SstSplitType::Both =
212 group_split::need_to_split(sst, split_key.clone())
213 {
214 2
215 } else {
216 0
217 }
218 })
219 .sum::<u64>();
220 continue;
221 }
222
223 let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
224 let sst = sub_level.table_infos.get(pos).unwrap();
225
226 if let group_split::SstSplitType::Both =
227 group_split::need_to_split(sst, split_key.clone())
228 {
229 split_count += 2;
230 }
231 }
232
233 for level in &parent_levels.levels {
234 if level.table_infos.is_empty() {
235 continue;
236 }
237 let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
238 let sst = level.table_infos.get(pos).unwrap();
239 if let group_split::SstSplitType::Both =
240 group_split::need_to_split(sst, split_key.clone())
241 {
242 split_count += 2;
243 }
244 }
245
246 split_count
247 })
248 }
249
250 pub fn init_with_parent_group(
251 &mut self,
252 parent_group_id: CompactionGroupId,
253 group_id: CompactionGroupId,
254 member_table_ids: BTreeSet<StateTableId>,
255 new_sst_start_id: HummockSstableId,
256 ) {
257 let mut new_sst_id = new_sst_start_id;
258 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
259 if new_sst_start_id != 0 {
260 if cfg!(debug_assertions) {
261 panic!(
262 "non-zero sst start id {} for NewCompactionGroup",
263 new_sst_start_id
264 );
265 } else {
266 warn!(
267 %new_sst_start_id,
268 "non-zero sst start id for NewCompactionGroup"
269 );
270 }
271 }
272 return;
273 } else if !self.levels.contains_key(&parent_group_id) {
274 unreachable!(
275 "non-existing parent group id {} to init from",
276 parent_group_id
277 );
278 }
279 let [parent_levels, cur_levels] = self
280 .levels
281 .get_disjoint_mut([&parent_group_id, &group_id])
282 .map(|res| res.unwrap());
283 parent_levels.compaction_group_version_id += 1;
286 cur_levels.compaction_group_version_id += 1;
287 let l0 = &mut parent_levels.l0;
288 {
289 for sub_level in &mut l0.sub_levels {
290 let target_l0 = &mut cur_levels.l0;
291 let insert_table_infos =
294 split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
295 sub_level.normalize();
296 if insert_table_infos.is_empty() {
297 continue;
298 }
299 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
300 Ok(idx) => {
301 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
302 }
303 Err(idx) => {
304 insert_new_sub_level(
305 target_l0,
306 sub_level.sub_level_id,
307 sub_level.level_type,
308 insert_table_infos,
309 Some(idx),
310 );
311 }
312 }
313 }
314 l0.normalize();
315 }
316 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
317 let insert_table_infos =
318 split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
319 cur_levels.levels[idx].total_file_size += insert_table_infos
320 .iter()
321 .map(|sst| sst.sst_size)
322 .sum::<u64>();
323 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
324 .iter()
325 .map(|sst| sst.uncompressed_file_size)
326 .sum::<u64>();
327 cur_levels.levels[idx]
328 .table_infos
329 .extend(insert_table_infos);
330 cur_levels.levels[idx]
331 .table_infos
332 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
333 assert!(can_concat(&cur_levels.levels[idx].table_infos));
334 level.normalize();
335 }
336
337 assert!(
338 parent_levels
339 .l0
340 .sub_levels
341 .iter()
342 .all(|level| !level.table_infos.is_empty())
343 );
344 assert!(
345 cur_levels
346 .l0
347 .sub_levels
348 .iter()
349 .all(|level| !level.table_infos.is_empty())
350 );
351 }
352
353 pub fn build_sst_delta_infos(
354 &self,
355 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
356 ) -> Vec<SstDeltaInfo> {
357 let mut infos = vec![];
358
359 if version_delta.trivial_move {
362 return infos;
363 }
364
365 for (group_id, group_deltas) in &version_delta.group_deltas {
366 let mut info = SstDeltaInfo::default();
367
368 let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
369 let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
370
371 if !group_deltas.group_deltas.iter().all(|delta| {
373 matches!(
374 delta,
375 GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
376 )
377 }) {
378 continue;
379 }
380
381 for group_delta in &group_deltas.group_deltas {
385 match group_delta {
386 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
387 if !inserted_table_infos.is_empty() {
388 info.insert_sst_level = 0;
389 info.insert_sst_infos
390 .extend(inserted_table_infos.iter().cloned());
391 }
392 }
393 GroupDeltaCommon::IntraLevel(intra_level) => {
394 if !intra_level.inserted_table_infos.is_empty() {
395 info.insert_sst_level = intra_level.level_idx;
396 info.insert_sst_infos
397 .extend(intra_level.inserted_table_infos.iter().cloned());
398 }
399 if !intra_level.removed_table_ids.is_empty() {
400 for id in &intra_level.removed_table_ids {
401 if intra_level.level_idx == 0 {
402 removed_l0_ssts.insert(*id);
403 } else {
404 removed_ssts
405 .entry(intra_level.level_idx)
406 .or_default()
407 .insert(*id);
408 }
409 }
410 }
411 }
412 GroupDeltaCommon::GroupConstruct(_)
413 | GroupDeltaCommon::GroupDestroy(_)
414 | GroupDeltaCommon::GroupMerge(_)
415 | GroupDeltaCommon::TruncateTables(_) => {}
416 }
417 }
418
419 let group = self.levels.get(group_id).unwrap();
420 for l0_sub_level in &group.level0().sub_levels {
421 for sst_info in &l0_sub_level.table_infos {
422 if removed_l0_ssts.remove(&sst_info.sst_id) {
423 info.delete_sst_object_ids.push(sst_info.object_id);
424 }
425 }
426 }
427 for level in &group.levels {
428 if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
429 for sst_info in &level.table_infos {
430 if removed_level_ssts.remove(&sst_info.sst_id) {
431 info.delete_sst_object_ids.push(sst_info.object_id);
432 }
433 }
434 if !removed_level_ssts.is_empty() {
435 tracing::error!(
436 "removed_level_ssts is not empty: {:?}",
437 removed_level_ssts,
438 );
439 }
440 debug_assert!(removed_level_ssts.is_empty());
441 }
442 }
443
444 if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
445 tracing::error!(
446 "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
447 removed_l0_ssts,
448 removed_ssts
449 );
450 }
451 debug_assert!(removed_l0_ssts.is_empty());
452 debug_assert!(removed_ssts.is_empty());
453
454 infos.push(info);
455 }
456
457 infos
458 }
459
460 pub fn apply_version_delta(
461 &mut self,
462 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
463 ) -> HashMap<TableId, Option<StateTableInfo>> {
464 assert_eq!(self.id, version_delta.prev_id);
465
466 let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
467 &version_delta.state_table_info_delta,
468 &version_delta.removed_table_ids,
469 );
470
471 #[expect(deprecated)]
472 {
473 if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
474 is_commit_epoch = true;
475 tracing::trace!(
476 "max committed epoch bumped but no table committed epoch is changed"
477 );
478 }
479 }
480
481 for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
483 let mut is_applied_l0_compact = false;
484 for group_delta in &group_deltas.group_deltas {
485 match group_delta {
486 GroupDeltaCommon::GroupConstruct(group_construct) => {
487 let mut new_levels = build_initial_compaction_group_levels(
488 *compaction_group_id,
489 group_construct.get_group_config().unwrap(),
490 );
491 let parent_group_id = group_construct.parent_group_id;
492 new_levels.parent_group_id = parent_group_id;
493 #[expect(deprecated)]
494 new_levels
496 .member_table_ids
497 .clone_from(&group_construct.table_ids);
498 self.levels.insert(*compaction_group_id, new_levels);
499 let member_table_ids = if group_construct.version()
500 >= CompatibilityVersion::NoMemberTableIds
501 {
502 self.state_table_info
503 .compaction_group_member_table_ids(*compaction_group_id)
504 .iter()
505 .copied()
506 .collect()
507 } else {
508 #[expect(deprecated)]
509 BTreeSet::from_iter(
511 group_construct.table_ids.iter().copied().map(Into::into),
512 )
513 };
514
515 if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
516 let split_key = if group_construct.split_key.is_some() {
517 Some(Bytes::from(group_construct.split_key.clone().unwrap()))
518 } else {
519 None
520 };
521 self.init_with_parent_group_v2(
522 parent_group_id,
523 *compaction_group_id,
524 group_construct.new_sst_start_id,
525 split_key.clone(),
526 );
527 } else {
528 self.init_with_parent_group(
530 parent_group_id,
531 *compaction_group_id,
532 member_table_ids,
533 group_construct.new_sst_start_id,
534 );
535 }
536 }
537 GroupDeltaCommon::GroupMerge(group_merge) => {
538 tracing::info!(
539 "group_merge left {:?} right {:?}",
540 group_merge.left_group_id,
541 group_merge.right_group_id
542 );
543 self.merge_compaction_group(
544 group_merge.left_group_id,
545 group_merge.right_group_id,
546 )
547 }
548 GroupDeltaCommon::IntraLevel(level_delta) => {
549 let levels =
550 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
551 panic!("compaction group {} does not exist", compaction_group_id)
552 });
553 if is_commit_epoch {
554 assert!(
555 level_delta.removed_table_ids.is_empty(),
556 "no sst should be deleted when committing an epoch"
557 );
558
559 let IntraLevelDelta {
560 level_idx,
561 l0_sub_level_id,
562 inserted_table_infos,
563 ..
564 } = level_delta;
565 {
566 assert_eq!(
567 *level_idx, 0,
568 "we should only add to L0 when we commit an epoch."
569 );
570 if !inserted_table_infos.is_empty() {
571 insert_new_sub_level(
572 &mut levels.l0,
573 *l0_sub_level_id,
574 PbLevelType::Overlapping,
575 inserted_table_infos.clone(),
576 None,
577 );
578 }
579 }
580 } else {
581 levels.apply_compact_ssts(
583 level_delta,
584 self.state_table_info
585 .compaction_group_member_table_ids(*compaction_group_id),
586 );
587 if level_delta.level_idx == 0 {
588 is_applied_l0_compact = true;
589 }
590 }
591 }
592 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
593 let levels =
594 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
595 panic!("compaction group {} does not exist", compaction_group_id)
596 });
597 assert!(is_commit_epoch);
598
599 if !inserted_table_infos.is_empty() {
600 let next_l0_sub_level_id = levels
601 .l0
602 .sub_levels
603 .last()
604 .map(|level| level.sub_level_id + 1)
605 .unwrap_or(1);
606
607 insert_new_sub_level(
608 &mut levels.l0,
609 next_l0_sub_level_id,
610 PbLevelType::Overlapping,
611 inserted_table_infos.clone(),
612 None,
613 );
614 }
615 }
616 GroupDeltaCommon::GroupDestroy(_) => {
617 self.levels.remove(compaction_group_id);
618 }
619
620 GroupDeltaCommon::TruncateTables(table_ids) => {
621 self.levels
622 .get_mut(compaction_group_id)
623 .unwrap_or_else(|| {
624 panic!("compaction group {} does not exist", compaction_group_id)
625 })
626 .truncate_tables(table_ids);
627 }
628 }
629 }
630
631 if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
632 {
633 levels.l0.normalize();
634 }
635 }
636 self.id = version_delta.id;
637 #[expect(deprecated)]
638 {
639 self.max_committed_epoch = version_delta.max_committed_epoch;
640 }
641
642 let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
646 HashMap::new();
647
648 for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
650 if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
651 if version_delta.removed_table_ids.contains(table_id) {
652 modified_table_watermarks.insert(*table_id, None);
653 } else {
654 let mut current_table_watermarks = (**current_table_watermarks).clone();
655 current_table_watermarks.apply_new_table_watermarks(table_watermarks);
656 modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
657 }
658 } else {
659 modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
660 }
661 }
662 for (table_id, table_watermarks) in &self.table_watermarks {
663 let safe_epoch = if let Some(state_table_info) =
664 self.state_table_info.info().get(table_id)
665 && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
666 && state_table_info.committed_epoch > *oldest_epoch
667 {
668 state_table_info.committed_epoch
670 } else {
671 continue;
673 };
674 let table_watermarks = modified_table_watermarks
675 .entry(*table_id)
676 .or_insert_with(|| Some((**table_watermarks).clone()));
677 if let Some(table_watermarks) = table_watermarks {
678 table_watermarks.clear_stale_epoch_watermark(safe_epoch);
679 }
680 }
681 for (table_id, table_watermarks) in modified_table_watermarks {
683 if let Some(table_watermarks) = table_watermarks {
684 self.table_watermarks
685 .insert(table_id, Arc::new(table_watermarks));
686 } else {
687 self.table_watermarks.remove(&table_id);
688 }
689 }
690 apply_vector_index_delta(
692 &mut self.vector_indexes,
693 &version_delta.vector_index_delta,
694 &version_delta.removed_table_ids,
695 );
696
697 changed_table_info
698 }
699
700 pub fn apply_change_log_delta<T: Clone>(
701 table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
702 change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
703 ) {
704 for (table_id, change_log_delta) in change_log_delta {
705 let new_change_log = &change_log_delta.new_log;
706 match table_change_log.entry(*table_id) {
707 Entry::Occupied(entry) => {
708 let change_log = entry.into_mut();
709 change_log.add_change_log(new_change_log.clone());
710 }
711 Entry::Vacant(entry) => {
712 entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
713 }
714 };
715 }
716
717 for (table_id, change_log_delta) in change_log_delta {
719 if let Some(change_log) = table_change_log.get_mut(table_id) {
720 change_log.truncate(change_log_delta.truncate_epoch);
721 }
722 }
723 }
724
725 pub fn collect_gc_change_log_delta<'a, T: Clone>(
727 current_change_log_table_ids: impl Iterator<Item = &'a TableId>,
728 change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
729 removed_table_ids: &HashSet<TableId>,
730 state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
731 changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
732 ) -> HashSet<TableId> {
733 let mut gc_change_log_delta = HashSet::new();
734 for table_id in current_change_log_table_ids {
738 if removed_table_ids.contains(table_id) {
739 gc_change_log_delta.insert(*table_id);
740 continue;
741 }
742 if let Some(table_info_delta) = state_table_info_delta.get(table_id)
743 && let Some(Some(prev_table_info)) = changed_table_info.get(table_id)
744 && table_info_delta.committed_epoch > prev_table_info.committed_epoch
745 {
746 } else {
748 continue;
750 }
751 let contains = change_log_delta.contains_key(table_id);
752 if !contains {
753 gc_change_log_delta.insert(*table_id);
754 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
755 LazyLock::new(|| LogSuppressor::per_second(1));
756 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
757 warn!(
758 suppressed_count,
759 %table_id,
760 "table change log dropped due to no further change log at newly committed epoch"
761 );
762 }
763 }
764 }
765 gc_change_log_delta
766 }
767
768 pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
769 let mut ret: BTreeMap<_, _> = BTreeMap::new();
770 for (compaction_group_id, group) in &self.levels {
771 let mut levels = vec![];
772 levels.extend(group.l0.sub_levels.iter());
773 levels.extend(group.levels.iter());
774 for level in levels {
775 for table_info in &level.table_infos {
776 if table_info.sst_id.as_raw_id() == table_info.object_id.as_raw_id() {
777 continue;
778 }
779 let object_id = table_info.object_id;
780 let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
781 entry
782 .entry(*compaction_group_id)
783 .or_default()
784 .push(table_info.sst_id)
785 }
786 }
787 }
788 ret
789 }
790
791 pub fn merge_compaction_group(
792 &mut self,
793 left_group_id: CompactionGroupId,
794 right_group_id: CompactionGroupId,
795 ) {
796 let left_group_id_table_ids = self
798 .state_table_info
799 .compaction_group_member_table_ids(left_group_id)
800 .iter();
801 let right_group_id_table_ids = self
802 .state_table_info
803 .compaction_group_member_table_ids(right_group_id)
804 .iter();
805
806 assert!(
807 left_group_id_table_ids
808 .chain(right_group_id_table_ids)
809 .is_sorted()
810 );
811
812 let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
813 let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
814 panic!(
815 "compaction group should exist right {} all {:?}",
816 right_group_id, total_cg
817 )
818 });
819
820 let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
821 panic!(
822 "compaction group should exist left {} all {:?}",
823 left_group_id, total_cg
824 )
825 });
826
827 group_split::merge_levels(left_levels, right_levels);
828 }
829
830 pub fn init_with_parent_group_v2(
831 &mut self,
832 parent_group_id: CompactionGroupId,
833 group_id: CompactionGroupId,
834 new_sst_start_id: HummockSstableId,
835 split_key: Option<Bytes>,
836 ) {
837 let mut new_sst_id = new_sst_start_id;
838 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
839 if new_sst_start_id != 0 {
840 if cfg!(debug_assertions) {
841 panic!(
842 "non-zero sst start id {} for NewCompactionGroup",
843 new_sst_start_id
844 );
845 } else {
846 warn!(
847 %new_sst_start_id,
848 "non-zero sst start id for NewCompactionGroup"
849 );
850 }
851 }
852 return;
853 } else if !self.levels.contains_key(&parent_group_id) {
854 unreachable!(
855 "non-existing parent group id {} to init from (V2)",
856 parent_group_id
857 );
858 }
859
860 let [parent_levels, cur_levels] = self
861 .levels
862 .get_disjoint_mut([&parent_group_id, &group_id])
863 .map(|res| res.unwrap());
864 parent_levels.compaction_group_version_id += 1;
867 cur_levels.compaction_group_version_id += 1;
868
869 let l0 = &mut parent_levels.l0;
870 {
871 for sub_level in &mut l0.sub_levels {
872 let target_l0 = &mut cur_levels.l0;
873 let insert_table_infos = if let Some(split_key) = &split_key {
876 group_split::split_sst_info_for_level_v2(
877 sub_level,
878 &mut new_sst_id,
879 split_key.clone(),
880 )
881 } else {
882 vec![]
883 };
884
885 if insert_table_infos.is_empty() {
886 continue;
887 }
888
889 sub_level.normalize();
890 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
891 Ok(idx) => {
892 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
893 }
894 Err(idx) => {
895 insert_new_sub_level(
896 target_l0,
897 sub_level.sub_level_id,
898 sub_level.level_type,
899 insert_table_infos,
900 Some(idx),
901 );
902 }
903 }
904 }
905 l0.normalize();
906 }
907
908 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
909 let insert_table_infos = if let Some(split_key) = &split_key {
910 group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
911 } else {
912 vec![]
913 };
914
915 if insert_table_infos.is_empty() {
916 continue;
917 }
918
919 cur_levels.levels[idx].total_file_size += insert_table_infos
920 .iter()
921 .map(|sst| sst.sst_size)
922 .sum::<u64>();
923 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
924 .iter()
925 .map(|sst| sst.uncompressed_file_size)
926 .sum::<u64>();
927 cur_levels.levels[idx]
928 .table_infos
929 .extend(insert_table_infos);
930 cur_levels.levels[idx]
931 .table_infos
932 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
933 assert!(can_concat(&cur_levels.levels[idx].table_infos));
934 level.normalize();
935 }
936
937 assert!(
938 parent_levels
939 .l0
940 .sub_levels
941 .iter()
942 .all(|level| !level.table_infos.is_empty())
943 );
944 assert!(
945 cur_levels
946 .l0
947 .sub_levels
948 .iter()
949 .all(|level| !level.table_infos.is_empty())
950 );
951 }
952}
953
954impl<T> HummockVersionCommon<T>
955where
956 T: SstableIdReader + ObjectIdReader,
957{
958 pub fn get_object_ids(&self) -> impl Iterator<Item = HummockObjectId> + '_ {
959 match HummockObjectId::Sstable(0.into()) {
963 HummockObjectId::Sstable(_) => {}
964 HummockObjectId::VectorFile(_) => {}
965 HummockObjectId::HnswGraphFile(_) => {}
966 };
967 self.get_sst_infos()
968 .map(|s| HummockObjectId::Sstable(s.object_id()))
969 .chain(
970 self.vector_indexes
971 .values()
972 .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
973 )
974 }
975
976 pub fn get_sst_ids(&self) -> HashSet<HummockSstableId> {
977 self.get_sst_infos().map(|s| s.sst_id()).collect()
978 }
979
980 pub fn get_sst_infos(&self) -> impl Iterator<Item = &T> {
981 self.get_combined_levels()
982 .flat_map(|level| level.table_infos.iter())
983 }
984}
985
986impl Levels {
987 pub(crate) fn apply_compact_ssts(
988 &mut self,
989 level_delta: &IntraLevelDeltaCommon<SstableInfo>,
990 member_table_ids: &BTreeSet<TableId>,
991 ) {
992 let IntraLevelDeltaCommon {
993 level_idx,
994 l0_sub_level_id,
995 inserted_table_infos: insert_table_infos,
996 vnode_partition_count,
997 removed_table_ids: delete_sst_ids_set,
998 compaction_group_version_id,
999 } = level_delta;
1000 let new_vnode_partition_count = *vnode_partition_count;
1001
1002 if is_compaction_task_expired(
1003 self.compaction_group_version_id,
1004 *compaction_group_version_id,
1005 ) {
1006 warn!(
1007 current_compaction_group_version_id = self.compaction_group_version_id,
1008 delta_compaction_group_version_id = compaction_group_version_id,
1009 level_idx,
1010 l0_sub_level_id,
1011 insert_table_infos = ?insert_table_infos
1012 .iter()
1013 .map(|sst| (sst.sst_id, sst.object_id))
1014 .collect_vec(),
1015 ?delete_sst_ids_set,
1016 "This VersionDelta may be committed by an expired compact task. Please check it."
1017 );
1018 return;
1019 }
1020 if !delete_sst_ids_set.is_empty() {
1021 if *level_idx == 0 {
1022 for level in &mut self.l0.sub_levels {
1023 level.delete_ssts(delete_sst_ids_set);
1024 }
1025 } else {
1026 let idx = *level_idx as usize - 1;
1027 self.levels[idx].delete_ssts(delete_sst_ids_set);
1028 }
1029 }
1030
1031 if !insert_table_infos.is_empty() {
1032 let insert_sst_level_id = *level_idx;
1033 let insert_sub_level_id = *l0_sub_level_id;
1034 if insert_sst_level_id == 0 {
1035 let l0 = &mut self.l0;
1036 let index = l0
1037 .sub_levels
1038 .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1039 assert!(
1040 index < l0.sub_levels.len()
1041 && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1042 "should find the level to insert into when applying compaction generated delta. sub level idx: {}, removed sst ids: {:?}, sub levels: {:?},",
1043 insert_sub_level_id,
1044 delete_sst_ids_set,
1045 l0.sub_levels
1046 .iter()
1047 .map(|level| level.sub_level_id)
1048 .collect_vec()
1049 );
1050 if l0.sub_levels[index].table_infos.is_empty()
1051 && member_table_ids.len() == 1
1052 && insert_table_infos.iter().all(|sst| {
1053 sst.table_ids.len() == 1
1054 && sst.table_ids[0]
1055 == *member_table_ids.iter().next().expect("non-empty")
1056 })
1057 {
1058 l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1061 }
1062 level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1063 } else {
1064 let idx = insert_sst_level_id as usize - 1;
1065 if self.levels[idx].table_infos.is_empty()
1066 && insert_table_infos
1067 .iter()
1068 .all(|sst| sst.table_ids.len() == 1)
1069 {
1070 self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1071 } else if self.levels[idx].vnode_partition_count != 0
1072 && new_vnode_partition_count == 0
1073 && member_table_ids.len() > 1
1074 {
1075 self.levels[idx].vnode_partition_count = 0;
1076 }
1077 level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1078 }
1079 }
1080 }
1081
1082 pub(crate) fn truncate_tables(&mut self, table_ids: &HashSet<TableId>) {
1085 for level in self.l0.sub_levels.iter_mut().chain(self.levels.iter_mut()) {
1086 level.truncate_tables(table_ids);
1087 }
1088 self.l0.normalize();
1089 self.compaction_group_version_id += 1;
1090 }
1091}
1092
1093impl<T, L> HummockVersionCommon<T, L> {
1094 pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1095 self.levels
1096 .values()
1097 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1098 }
1099}
1100
1101pub fn build_initial_compaction_group_levels(
1102 group_id: impl Into<CompactionGroupId>,
1103 compaction_config: &CompactionConfig,
1104) -> Levels {
1105 let mut levels = vec![];
1106 for l in 0..compaction_config.get_max_level() {
1107 levels.push(Level {
1108 level_idx: (l + 1) as u32,
1109 level_type: PbLevelType::Nonoverlapping,
1110 table_infos: vec![],
1111 total_file_size: 0,
1112 sub_level_id: 0,
1113 uncompressed_file_size: 0,
1114 vnode_partition_count: 0,
1115 });
1116 }
1117 #[expect(deprecated)] Levels {
1119 levels,
1120 l0: OverlappingLevel {
1121 sub_levels: vec![],
1122 total_file_size: 0,
1123 uncompressed_file_size: 0,
1124 },
1125 group_id: group_id.into(),
1126 parent_group_id: 0.into(),
1127 member_table_ids: vec![],
1128 compaction_group_version_id: 0,
1129 }
1130}
1131
1132fn split_sst_info_for_level(
1133 member_table_ids: &BTreeSet<TableId>,
1134 level: &mut Level,
1135 new_sst_id: &mut HummockSstableId,
1136) -> Vec<SstableInfo> {
1137 let mut insert_table_infos = vec![];
1140 for sst_info in &mut level.table_infos {
1141 let removed_table_ids = sst_info
1142 .table_ids
1143 .iter()
1144 .filter(|table_id| member_table_ids.contains(*table_id))
1145 .cloned()
1146 .collect_vec();
1147 let sst_size = sst_info.sst_size;
1148 if sst_size / 2 == 0 {
1149 tracing::warn!(
1150 id = %sst_info.sst_id,
1151 object_id = %sst_info.object_id,
1152 sst_size = sst_info.sst_size,
1153 file_size = sst_info.file_size,
1154 "Sstable sst_size is under expected",
1155 );
1156 };
1157 if !removed_table_ids.is_empty() {
1158 let (modified_sst, branch_sst) = split_sst_with_table_ids(
1159 sst_info,
1160 new_sst_id,
1161 sst_size / 2,
1162 sst_size / 2,
1163 member_table_ids.iter().cloned().collect_vec(),
1164 );
1165 *sst_info = modified_sst;
1166 insert_table_infos.push(branch_sst);
1167 }
1168 }
1169 insert_table_infos
1170}
1171
1172pub fn get_compaction_group_ids(
1174 version: &HummockVersion,
1175) -> impl Iterator<Item = CompactionGroupId> + '_ {
1176 version.levels.keys().cloned()
1177}
1178
1179pub fn get_table_compaction_group_id_mapping(
1180 version: &HummockVersion,
1181) -> HashMap<StateTableId, CompactionGroupId> {
1182 version
1183 .state_table_info
1184 .info()
1185 .iter()
1186 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
1187 .collect()
1188}
1189
1190pub fn get_compaction_group_ssts(
1192 version: &HummockVersion,
1193 group_id: CompactionGroupId,
1194) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1195 let group_levels = version.get_compaction_group_levels(group_id);
1196 group_levels
1197 .l0
1198 .sub_levels
1199 .iter()
1200 .rev()
1201 .chain(group_levels.levels.iter())
1202 .flat_map(|level| {
1203 level
1204 .table_infos
1205 .iter()
1206 .map(|table_info| (table_info.object_id, table_info.sst_id))
1207 })
1208}
1209
1210pub fn new_sub_level(
1211 sub_level_id: u64,
1212 level_type: PbLevelType,
1213 table_infos: Vec<SstableInfo>,
1214) -> Level {
1215 if level_type == PbLevelType::Nonoverlapping {
1216 debug_assert!(
1217 can_concat(&table_infos),
1218 "sst of non-overlapping level is not concat-able: {:?}",
1219 table_infos
1220 );
1221 }
1222 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1223 let uncompressed_file_size = table_infos
1224 .iter()
1225 .map(|table| table.uncompressed_file_size)
1226 .sum();
1227 Level {
1228 level_idx: 0,
1229 level_type,
1230 table_infos,
1231 total_file_size,
1232 sub_level_id,
1233 uncompressed_file_size,
1234 vnode_partition_count: 0,
1235 }
1236}
1237
1238pub fn add_ssts_to_sub_level(
1239 l0: &mut OverlappingLevel,
1240 sub_level_idx: usize,
1241 insert_table_infos: Vec<SstableInfo>,
1242) {
1243 insert_table_infos.iter().for_each(|sst| {
1244 l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1245 l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1246 l0.total_file_size += sst.sst_size;
1247 l0.uncompressed_file_size += sst.uncompressed_file_size;
1248 });
1249 l0.sub_levels[sub_level_idx]
1250 .table_infos
1251 .extend(insert_table_infos);
1252 if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1253 l0.sub_levels[sub_level_idx]
1254 .table_infos
1255 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1256 assert!(
1257 can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1258 "sstable ids: {:?}",
1259 l0.sub_levels[sub_level_idx]
1260 .table_infos
1261 .iter()
1262 .map(|sst| sst.sst_id)
1263 .collect_vec()
1264 );
1265 }
1266}
1267
1268pub fn insert_new_sub_level(
1270 l0: &mut OverlappingLevel,
1271 insert_sub_level_id: u64,
1272 level_type: PbLevelType,
1273 insert_table_infos: Vec<SstableInfo>,
1274 sub_level_insert_hint: Option<usize>,
1275) {
1276 if insert_sub_level_id == u64::MAX {
1277 return;
1278 }
1279 let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1280 insert_pos
1281 } else {
1282 if let Some(newest_level) = l0.sub_levels.last() {
1283 assert!(
1284 newest_level.sub_level_id < insert_sub_level_id,
1285 "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1286 newest_level.sub_level_id,
1287 insert_sub_level_id,
1288 l0,
1289 );
1290 }
1291 l0.sub_levels.len()
1292 };
1293 #[cfg(debug_assertions)]
1294 {
1295 if insert_pos > 0
1296 && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1297 {
1298 debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1299 }
1300 if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1301 debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1302 }
1303 }
1304 let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1307 l0.total_file_size += level.total_file_size;
1308 l0.uncompressed_file_size += level.uncompressed_file_size;
1309 l0.sub_levels.insert(insert_pos, level);
1310}
1311
1312impl Level {
1313 fn recompute_size(&mut self) {
1314 self.total_file_size = self
1315 .table_infos
1316 .iter()
1317 .map(|table| table.sst_size)
1318 .sum::<u64>();
1319 self.uncompressed_file_size = self
1320 .table_infos
1321 .iter()
1322 .map(|table| table.uncompressed_file_size)
1323 .sum::<u64>();
1324 }
1325
1326 fn normalize(&mut self) {
1328 self.table_infos
1329 .retain(|sst_info| !sst_info.table_ids.is_empty());
1330 self.recompute_size();
1331 }
1332
1333 fn delete_ssts(&mut self, ids: &HashSet<HummockSstableId>) -> bool {
1335 let original_len = self.table_infos.len();
1336 self.table_infos
1337 .retain(|table| !ids.contains(&table.sst_id));
1338 self.recompute_size();
1339 original_len != self.table_infos.len()
1340 }
1341
1342 fn truncate_tables(&mut self, table_ids: &HashSet<TableId>) {
1345 for sstable_info in &mut self.table_infos {
1346 let mut inner = sstable_info.get_inner();
1347 inner.table_ids.retain(|id| !table_ids.contains(id));
1348 sstable_info.set_inner(inner);
1349 }
1350 self.normalize();
1351 }
1352}
1353
1354impl OverlappingLevel {
1355 fn normalize(&mut self) {
1357 self.sub_levels
1358 .retain(|level| !level.table_infos.is_empty());
1359 self.total_file_size = self
1360 .sub_levels
1361 .iter()
1362 .map(|level| level.total_file_size)
1363 .sum::<u64>();
1364 self.uncompressed_file_size = self
1365 .sub_levels
1366 .iter()
1367 .map(|level| level.uncompressed_file_size)
1368 .sum::<u64>();
1369 }
1370}
1371
1372fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1373 fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1374 format!(
1375 "sstable ids: {:?}",
1376 ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1377 )
1378 }
1379 operand.total_file_size += insert_table_infos
1380 .iter()
1381 .map(|sst| sst.sst_size)
1382 .sum::<u64>();
1383 operand.uncompressed_file_size += insert_table_infos
1384 .iter()
1385 .map(|sst| sst.uncompressed_file_size)
1386 .sum::<u64>();
1387 if operand.level_type == PbLevelType::Overlapping {
1388 operand.level_type = PbLevelType::Nonoverlapping;
1389 operand
1390 .table_infos
1391 .extend(insert_table_infos.iter().cloned());
1392 operand
1393 .table_infos
1394 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1395 assert!(
1396 can_concat(&operand.table_infos),
1397 "{}",
1398 display_sstable_infos(&operand.table_infos)
1399 );
1400 } else if !insert_table_infos.is_empty() {
1401 let sorted_insert: Vec<_> = insert_table_infos
1402 .iter()
1403 .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1404 .cloned()
1405 .collect();
1406 let first = &sorted_insert[0];
1407 let last = &sorted_insert[sorted_insert.len() - 1];
1408 let pos = operand
1409 .table_infos
1410 .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1411 if pos >= operand.table_infos.len()
1412 || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1413 {
1414 operand.table_infos.splice(pos..pos, sorted_insert);
1415 let validate_range = operand
1417 .table_infos
1418 .iter()
1419 .skip(pos.saturating_sub(1))
1420 .take(insert_table_infos.len() + 2)
1421 .collect_vec();
1422 assert!(
1423 can_concat(&validate_range),
1424 "{}",
1425 display_sstable_infos(&validate_range),
1426 );
1427 } else {
1428 warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1431 for i in insert_table_infos {
1432 let pos = operand
1433 .table_infos
1434 .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1435 operand.table_infos.insert(pos, i.clone());
1436 }
1437 assert!(
1438 can_concat(&operand.table_infos),
1439 "{}",
1440 display_sstable_infos(&operand.table_infos)
1441 );
1442 }
1443 }
1444}
1445
1446pub fn version_object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1447 match HummockObjectId::Sstable(0.into()) {
1451 HummockObjectId::Sstable(_) => {}
1452 HummockObjectId::VectorFile(_) => {}
1453 HummockObjectId::HnswGraphFile(_) => {}
1454 };
1455 version
1456 .levels
1457 .values()
1458 .flat_map(|cg| {
1459 cg.level0()
1460 .sub_levels
1461 .iter()
1462 .chain(cg.levels.iter())
1463 .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1464 })
1465 .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1466 .chain(
1467 version
1468 .vector_indexes
1469 .values()
1470 .flat_map(|index| index.get_objects()),
1471 )
1472 .collect()
1473}
1474
1475pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1478 let mut res = Vec::new();
1479 for (group_id, levels) in &version.levels {
1481 if levels.group_id != *group_id {
1483 res.push(format!(
1484 "GROUP {}: inconsistent group id {} in Levels",
1485 group_id, levels.group_id
1486 ));
1487 }
1488
1489 let validate_level = |group: CompactionGroupId,
1490 expected_level_idx: u32,
1491 level: &Level,
1492 res: &mut Vec<String>| {
1493 let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1494 if level.level_idx == 0 {
1495 level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1496 if level.table_infos.is_empty() {
1498 res.push(format!("{}: empty level", level_identifier));
1499 }
1500 } else if level.level_type != PbLevelType::Nonoverlapping {
1501 res.push(format!(
1503 "{}: level type {:?} is not non-overlapping",
1504 level_identifier, level.level_type
1505 ));
1506 }
1507
1508 if level.level_idx != expected_level_idx {
1510 res.push(format!(
1511 "{}: mismatched level idx {}",
1512 level_identifier, expected_level_idx
1513 ));
1514 }
1515
1516 let mut prev_table_info: Option<&SstableInfo> = None;
1517 for table_info in &level.table_infos {
1518 if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1520 res.push(format!(
1521 "{} SST {}: table_ids not sorted",
1522 level_identifier, table_info.object_id
1523 ));
1524 }
1525
1526 if level.level_type == PbLevelType::Nonoverlapping {
1528 if let Some(prev) = prev_table_info.take()
1529 && prev
1530 .key_range
1531 .compare_right_with(&table_info.key_range.left)
1532 != Ordering::Less
1533 {
1534 res.push(format!(
1535 "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1536 level_identifier, table_info.object_id, prev, table_info
1537 ));
1538 }
1539 let _ = prev_table_info.insert(table_info);
1540 }
1541 }
1542 };
1543
1544 let l0 = &levels.l0;
1545 let mut prev_sub_level_id = u64::MAX;
1546 for sub_level in &l0.sub_levels {
1547 if sub_level.sub_level_id >= prev_sub_level_id {
1549 res.push(format!(
1550 "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1551 group_id, sub_level.level_idx, prev_sub_level_id
1552 ));
1553 }
1554 prev_sub_level_id = sub_level.sub_level_id;
1555
1556 validate_level(*group_id, 0, sub_level, &mut res);
1557 }
1558
1559 for idx in 1..=levels.levels.len() {
1560 validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1561 }
1562 }
1563 res
1564}
1565
1566#[cfg(test)]
1567mod tests {
1568 use std::collections::{HashMap, HashSet};
1569
1570 use bytes::Bytes;
1571 use risingwave_common::catalog::TableId;
1572 use risingwave_common::hash::VirtualNode;
1573 use risingwave_common::util::epoch::test_epoch;
1574 use risingwave_pb::hummock::{CompactionConfig, GroupConstruct, GroupDestroy, LevelType};
1575
1576 use super::group_split;
1577 use crate::HummockVersionId;
1578 use crate::compaction_group::group_split::*;
1579 use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1580 use crate::key::{FullKey, gen_key_from_str};
1581 use crate::key_range::KeyRange;
1582 use crate::level::{Level, Levels, OverlappingLevel};
1583 use crate::sstable_info::{SstableInfo, SstableInfoInner};
1584 use crate::version::{
1585 GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, IntraLevelDelta,
1586 };
1587
1588 fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1589 gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1590 }
1591
1592 fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1593 let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1594 let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1595 let full_key_l = FullKey::for_test(
1596 TableId::new(*table_ids.first().unwrap()),
1597 table_key_l,
1598 epoch,
1599 )
1600 .encode();
1601 let full_key_r =
1602 FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1603 .encode();
1604
1605 SstableInfoInner {
1606 sst_id: sst_id.into(),
1607 key_range: KeyRange {
1608 left: full_key_l.into(),
1609 right: full_key_r.into(),
1610 right_exclusive: false,
1611 },
1612 table_ids: table_ids.into_iter().map(Into::into).collect(),
1613 object_id: sst_id.into(),
1614 min_epoch: 20,
1615 max_epoch: 20,
1616 file_size: 100,
1617 sst_size: 100,
1618 ..Default::default()
1619 }
1620 }
1621
1622 #[test]
1623 fn test_get_sst_object_ids() {
1624 let mut version = HummockVersion {
1625 id: HummockVersionId::new(0),
1626 levels: HashMap::from_iter([(
1627 0.into(),
1628 Levels {
1629 levels: vec![],
1630 l0: OverlappingLevel {
1631 sub_levels: vec![],
1632 total_file_size: 0,
1633 uncompressed_file_size: 0,
1634 },
1635 ..Default::default()
1636 },
1637 )]),
1638 ..Default::default()
1639 };
1640 assert_eq!(version.get_object_ids().count(), 0);
1641
1642 version
1644 .levels
1645 .get_mut(&0)
1646 .unwrap()
1647 .l0
1648 .sub_levels
1649 .push(Level {
1650 table_infos: vec![
1651 SstableInfoInner {
1652 object_id: 11.into(),
1653 sst_id: 11.into(),
1654 ..Default::default()
1655 }
1656 .into(),
1657 ],
1658 ..Default::default()
1659 });
1660 assert_eq!(version.get_object_ids().count(), 1);
1661
1662 version.levels.get_mut(&0).unwrap().levels.push(Level {
1664 table_infos: vec![
1665 SstableInfoInner {
1666 object_id: 22.into(),
1667 sst_id: 22.into(),
1668 ..Default::default()
1669 }
1670 .into(),
1671 ],
1672 ..Default::default()
1673 });
1674 assert_eq!(version.get_object_ids().count(), 2);
1675 }
1676
1677 #[test]
1678 fn test_apply_version_delta() {
1679 let mut version = HummockVersion {
1680 id: HummockVersionId::new(0),
1681 levels: HashMap::from_iter([
1682 (
1683 0.into(),
1684 build_initial_compaction_group_levels(
1685 0,
1686 &CompactionConfig {
1687 max_level: 6,
1688 ..Default::default()
1689 },
1690 ),
1691 ),
1692 (
1693 1.into(),
1694 build_initial_compaction_group_levels(
1695 1,
1696 &CompactionConfig {
1697 max_level: 6,
1698 ..Default::default()
1699 },
1700 ),
1701 ),
1702 ]),
1703 ..Default::default()
1704 };
1705 let version_delta = HummockVersionDelta {
1706 id: HummockVersionId::new(1),
1707 group_deltas: HashMap::from_iter([
1708 (
1709 2.into(),
1710 GroupDeltas {
1711 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1712 group_config: Some(CompactionConfig {
1713 max_level: 6,
1714 ..Default::default()
1715 }),
1716 ..Default::default()
1717 }))],
1718 },
1719 ),
1720 (
1721 0.into(),
1722 GroupDeltas {
1723 group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1724 },
1725 ),
1726 (
1727 1.into(),
1728 GroupDeltas {
1729 group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1730 1,
1731 0,
1732 HashSet::new(),
1733 vec![
1734 SstableInfoInner {
1735 object_id: 1.into(),
1736 sst_id: 1.into(),
1737 ..Default::default()
1738 }
1739 .into(),
1740 ],
1741 0,
1742 version
1743 .levels
1744 .get(&1)
1745 .as_ref()
1746 .unwrap()
1747 .compaction_group_version_id,
1748 ))],
1749 },
1750 ),
1751 ]),
1752 ..Default::default()
1753 };
1754 let version_delta = version_delta;
1755
1756 version.apply_version_delta(&version_delta);
1757 let mut cg1 = build_initial_compaction_group_levels(
1758 1,
1759 &CompactionConfig {
1760 max_level: 6,
1761 ..Default::default()
1762 },
1763 );
1764 cg1.levels[0] = Level {
1765 level_idx: 1,
1766 level_type: LevelType::Nonoverlapping,
1767 table_infos: vec![
1768 SstableInfoInner {
1769 object_id: 1.into(),
1770 sst_id: 1.into(),
1771 ..Default::default()
1772 }
1773 .into(),
1774 ],
1775 ..Default::default()
1776 };
1777 assert_eq!(
1778 version,
1779 HummockVersion {
1780 id: HummockVersionId::new(1),
1781 levels: HashMap::from_iter([
1782 (
1783 2.into(),
1784 build_initial_compaction_group_levels(
1785 2,
1786 &CompactionConfig {
1787 max_level: 6,
1788 ..Default::default()
1789 },
1790 ),
1791 ),
1792 (1.into(), cg1),
1793 ]),
1794 ..Default::default()
1795 }
1796 );
1797 }
1798
1799 fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1800 gen_sst_info_impl(object_id, table_ids, left, right).into()
1801 }
1802
1803 fn gen_sst_info_impl(
1804 object_id: u64,
1805 table_ids: Vec<u32>,
1806 left: Bytes,
1807 right: Bytes,
1808 ) -> SstableInfoInner {
1809 SstableInfoInner {
1810 object_id: object_id.into(),
1811 sst_id: object_id.into(),
1812 key_range: KeyRange {
1813 left,
1814 right,
1815 right_exclusive: false,
1816 },
1817 table_ids: table_ids.into_iter().map(Into::into).collect(),
1818 file_size: 100,
1819 sst_size: 100,
1820 uncompressed_file_size: 100,
1821 ..Default::default()
1822 }
1823 }
1824
1825 #[test]
1826 fn test_merge_levels() {
1827 let mut left_levels = build_initial_compaction_group_levels(
1828 1,
1829 &CompactionConfig {
1830 max_level: 6,
1831 ..Default::default()
1832 },
1833 );
1834
1835 let mut right_levels = build_initial_compaction_group_levels(
1836 2,
1837 &CompactionConfig {
1838 max_level: 6,
1839 ..Default::default()
1840 },
1841 );
1842
1843 left_levels.levels[0] = Level {
1844 level_idx: 1,
1845 level_type: LevelType::Nonoverlapping,
1846 table_infos: vec![
1847 gen_sst_info(
1848 1,
1849 vec![3],
1850 FullKey::for_test(
1851 TableId::new(3),
1852 gen_key_from_str(VirtualNode::from_index(1), "1"),
1853 0,
1854 )
1855 .encode()
1856 .into(),
1857 FullKey::for_test(
1858 TableId::new(3),
1859 gen_key_from_str(VirtualNode::from_index(200), "1"),
1860 0,
1861 )
1862 .encode()
1863 .into(),
1864 ),
1865 gen_sst_info(
1866 10,
1867 vec![3, 4],
1868 FullKey::for_test(
1869 TableId::new(3),
1870 gen_key_from_str(VirtualNode::from_index(201), "1"),
1871 0,
1872 )
1873 .encode()
1874 .into(),
1875 FullKey::for_test(
1876 TableId::new(4),
1877 gen_key_from_str(VirtualNode::from_index(10), "1"),
1878 0,
1879 )
1880 .encode()
1881 .into(),
1882 ),
1883 gen_sst_info(
1884 11,
1885 vec![4],
1886 FullKey::for_test(
1887 TableId::new(4),
1888 gen_key_from_str(VirtualNode::from_index(11), "1"),
1889 0,
1890 )
1891 .encode()
1892 .into(),
1893 FullKey::for_test(
1894 TableId::new(4),
1895 gen_key_from_str(VirtualNode::from_index(200), "1"),
1896 0,
1897 )
1898 .encode()
1899 .into(),
1900 ),
1901 ],
1902 total_file_size: 300,
1903 ..Default::default()
1904 };
1905
1906 left_levels.l0.sub_levels.push(Level {
1907 level_idx: 0,
1908 table_infos: vec![gen_sst_info(
1909 3,
1910 vec![3],
1911 FullKey::for_test(
1912 TableId::new(3),
1913 gen_key_from_str(VirtualNode::from_index(1), "1"),
1914 0,
1915 )
1916 .encode()
1917 .into(),
1918 FullKey::for_test(
1919 TableId::new(3),
1920 gen_key_from_str(VirtualNode::from_index(200), "1"),
1921 0,
1922 )
1923 .encode()
1924 .into(),
1925 )],
1926 sub_level_id: 101,
1927 level_type: LevelType::Overlapping,
1928 total_file_size: 100,
1929 ..Default::default()
1930 });
1931
1932 left_levels.l0.sub_levels.push(Level {
1933 level_idx: 0,
1934 table_infos: vec![gen_sst_info(
1935 3,
1936 vec![3],
1937 FullKey::for_test(
1938 TableId::new(3),
1939 gen_key_from_str(VirtualNode::from_index(1), "1"),
1940 0,
1941 )
1942 .encode()
1943 .into(),
1944 FullKey::for_test(
1945 TableId::new(3),
1946 gen_key_from_str(VirtualNode::from_index(200), "1"),
1947 0,
1948 )
1949 .encode()
1950 .into(),
1951 )],
1952 sub_level_id: 103,
1953 level_type: LevelType::Overlapping,
1954 total_file_size: 100,
1955 ..Default::default()
1956 });
1957
1958 left_levels.l0.sub_levels.push(Level {
1959 level_idx: 0,
1960 table_infos: vec![gen_sst_info(
1961 3,
1962 vec![3],
1963 FullKey::for_test(
1964 TableId::new(3),
1965 gen_key_from_str(VirtualNode::from_index(1), "1"),
1966 0,
1967 )
1968 .encode()
1969 .into(),
1970 FullKey::for_test(
1971 TableId::new(3),
1972 gen_key_from_str(VirtualNode::from_index(200), "1"),
1973 0,
1974 )
1975 .encode()
1976 .into(),
1977 )],
1978 sub_level_id: 105,
1979 level_type: LevelType::Nonoverlapping,
1980 total_file_size: 100,
1981 ..Default::default()
1982 });
1983
1984 right_levels.levels[0] = Level {
1985 level_idx: 1,
1986 level_type: LevelType::Nonoverlapping,
1987 table_infos: vec![
1988 gen_sst_info(
1989 1,
1990 vec![5],
1991 FullKey::for_test(
1992 TableId::new(5),
1993 gen_key_from_str(VirtualNode::from_index(1), "1"),
1994 0,
1995 )
1996 .encode()
1997 .into(),
1998 FullKey::for_test(
1999 TableId::new(5),
2000 gen_key_from_str(VirtualNode::from_index(200), "1"),
2001 0,
2002 )
2003 .encode()
2004 .into(),
2005 ),
2006 gen_sst_info(
2007 10,
2008 vec![5, 6],
2009 FullKey::for_test(
2010 TableId::new(5),
2011 gen_key_from_str(VirtualNode::from_index(201), "1"),
2012 0,
2013 )
2014 .encode()
2015 .into(),
2016 FullKey::for_test(
2017 TableId::new(6),
2018 gen_key_from_str(VirtualNode::from_index(10), "1"),
2019 0,
2020 )
2021 .encode()
2022 .into(),
2023 ),
2024 gen_sst_info(
2025 11,
2026 vec![6],
2027 FullKey::for_test(
2028 TableId::new(6),
2029 gen_key_from_str(VirtualNode::from_index(11), "1"),
2030 0,
2031 )
2032 .encode()
2033 .into(),
2034 FullKey::for_test(
2035 TableId::new(6),
2036 gen_key_from_str(VirtualNode::from_index(200), "1"),
2037 0,
2038 )
2039 .encode()
2040 .into(),
2041 ),
2042 ],
2043 total_file_size: 300,
2044 ..Default::default()
2045 };
2046
2047 right_levels.l0.sub_levels.push(Level {
2048 level_idx: 0,
2049 table_infos: vec![gen_sst_info(
2050 3,
2051 vec![5],
2052 FullKey::for_test(
2053 TableId::new(5),
2054 gen_key_from_str(VirtualNode::from_index(1), "1"),
2055 0,
2056 )
2057 .encode()
2058 .into(),
2059 FullKey::for_test(
2060 TableId::new(5),
2061 gen_key_from_str(VirtualNode::from_index(200), "1"),
2062 0,
2063 )
2064 .encode()
2065 .into(),
2066 )],
2067 sub_level_id: 101,
2068 level_type: LevelType::Overlapping,
2069 total_file_size: 100,
2070 ..Default::default()
2071 });
2072
2073 right_levels.l0.sub_levels.push(Level {
2074 level_idx: 0,
2075 table_infos: vec![gen_sst_info(
2076 5,
2077 vec![5],
2078 FullKey::for_test(
2079 TableId::new(5),
2080 gen_key_from_str(VirtualNode::from_index(1), "1"),
2081 0,
2082 )
2083 .encode()
2084 .into(),
2085 FullKey::for_test(
2086 TableId::new(5),
2087 gen_key_from_str(VirtualNode::from_index(200), "1"),
2088 0,
2089 )
2090 .encode()
2091 .into(),
2092 )],
2093 sub_level_id: 102,
2094 level_type: LevelType::Overlapping,
2095 total_file_size: 100,
2096 ..Default::default()
2097 });
2098
2099 right_levels.l0.sub_levels.push(Level {
2100 level_idx: 0,
2101 table_infos: vec![gen_sst_info(
2102 3,
2103 vec![5],
2104 FullKey::for_test(
2105 TableId::new(5),
2106 gen_key_from_str(VirtualNode::from_index(1), "1"),
2107 0,
2108 )
2109 .encode()
2110 .into(),
2111 FullKey::for_test(
2112 TableId::new(5),
2113 gen_key_from_str(VirtualNode::from_index(200), "1"),
2114 0,
2115 )
2116 .encode()
2117 .into(),
2118 )],
2119 sub_level_id: 103,
2120 level_type: LevelType::Nonoverlapping,
2121 total_file_size: 100,
2122 ..Default::default()
2123 });
2124
2125 {
2126 let mut left_levels = Levels::default();
2128 let right_levels = Levels::default();
2129
2130 group_split::merge_levels(&mut left_levels, right_levels);
2131 }
2132
2133 {
2134 let mut left_levels = build_initial_compaction_group_levels(
2136 1,
2137 &CompactionConfig {
2138 max_level: 6,
2139 ..Default::default()
2140 },
2141 );
2142 let right_levels = right_levels.clone();
2143
2144 group_split::merge_levels(&mut left_levels, right_levels);
2145
2146 assert!(left_levels.l0.sub_levels.len() == 3);
2147 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2148 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2149 assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2150 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2151 assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2152 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2153
2154 assert!(left_levels.levels[0].level_idx == 1);
2155 assert_eq!(300, left_levels.levels[0].total_file_size);
2156 }
2157
2158 {
2159 let mut left_levels = left_levels.clone();
2161 let right_levels = build_initial_compaction_group_levels(
2162 2,
2163 &CompactionConfig {
2164 max_level: 6,
2165 ..Default::default()
2166 },
2167 );
2168
2169 group_split::merge_levels(&mut left_levels, right_levels);
2170
2171 assert!(left_levels.l0.sub_levels.len() == 3);
2172 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2173 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2174 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2175 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2176 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2177 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2178
2179 assert!(left_levels.levels[0].level_idx == 1);
2180 assert_eq!(300, left_levels.levels[0].total_file_size);
2181 }
2182
2183 {
2184 let mut left_levels = left_levels.clone();
2185 let right_levels = right_levels.clone();
2186
2187 group_split::merge_levels(&mut left_levels, right_levels);
2188
2189 assert!(left_levels.l0.sub_levels.len() == 6);
2190 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2191 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2192 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2193 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2194 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2195 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2196 assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2197 assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2198 assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2199 assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2200 assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2201 assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2202
2203 assert!(left_levels.levels[0].level_idx == 1);
2204 assert_eq!(600, left_levels.levels[0].total_file_size);
2205 }
2206 }
2207
2208 #[test]
2209 fn test_get_split_pos() {
2210 let epoch = test_epoch(1);
2211 let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2212 let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2213 let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2214
2215 let ssts = vec![s1, s2, s3];
2216 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2217
2218 let pos = group_split::get_split_pos(&ssts, split_key.clone());
2219 assert_eq!(1, pos);
2220
2221 let pos = group_split::get_split_pos(&vec![], split_key);
2222 assert_eq!(0, pos);
2223 }
2224
2225 #[test]
2226 fn test_split_sst() {
2227 let epoch = test_epoch(1);
2228 let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2229
2230 {
2231 let split_key = group_split::build_split_key(3.into(), VirtualNode::ZERO);
2232 let origin_sst = sst.clone();
2233 let sst_size = origin_sst.sst_size;
2234 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2235 assert_eq!(SstSplitType::Both, split_type);
2236
2237 let mut new_sst_id = 10.into();
2238 let (origin_sst, branched_sst) = group_split::split_sst(
2239 origin_sst,
2240 &mut new_sst_id,
2241 split_key,
2242 sst_size / 2,
2243 sst_size / 2,
2244 );
2245
2246 let origin_sst = origin_sst.unwrap();
2247 let branched_sst = branched_sst.unwrap();
2248
2249 assert!(origin_sst.key_range.right_exclusive);
2250 assert!(
2251 origin_sst
2252 .key_range
2253 .right
2254 .cmp(&branched_sst.key_range.left)
2255 .is_le()
2256 );
2257 assert!(origin_sst.table_ids.is_sorted());
2258 assert!(branched_sst.table_ids.is_sorted());
2259 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2260 assert!(branched_sst.sst_size < origin_sst.file_size);
2261 assert_eq!(10, branched_sst.sst_id);
2262 assert_eq!(11, origin_sst.sst_id);
2263 assert_eq!(3, branched_sst.table_ids.first().unwrap().as_raw_id()); }
2265
2266 {
2267 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2269 let origin_sst = sst.clone();
2270 let sst_size = origin_sst.sst_size;
2271 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2272 assert_eq!(SstSplitType::Both, split_type);
2273
2274 let mut new_sst_id = 10.into();
2275 let (origin_sst, branched_sst) = group_split::split_sst(
2276 origin_sst,
2277 &mut new_sst_id,
2278 split_key,
2279 sst_size / 2,
2280 sst_size / 2,
2281 );
2282
2283 let origin_sst = origin_sst.unwrap();
2284 let branched_sst = branched_sst.unwrap();
2285
2286 assert!(origin_sst.key_range.right_exclusive);
2287 assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2288 assert!(origin_sst.table_ids.is_sorted());
2289 assert!(branched_sst.table_ids.is_sorted());
2290 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2291 assert!(branched_sst.sst_size < origin_sst.file_size);
2292 assert_eq!(10, branched_sst.sst_id);
2293 assert_eq!(11, origin_sst.sst_id);
2294 assert_eq!(5, branched_sst.table_ids.first().unwrap().as_raw_id()); }
2296
2297 {
2298 let split_key = group_split::build_split_key(6.into(), VirtualNode::ZERO);
2299 let split_type = group_split::need_to_split(&sst, split_key);
2300 assert_eq!(SstSplitType::Left, split_type);
2301 }
2302
2303 {
2304 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2305 let origin_sst = sst.clone();
2306 let split_type = group_split::need_to_split(&origin_sst, split_key);
2307 assert_eq!(SstSplitType::Both, split_type);
2308
2309 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2310 let origin_sst = sst;
2311 let split_type = group_split::need_to_split(&origin_sst, split_key);
2312 assert_eq!(SstSplitType::Right, split_type);
2313 }
2314
2315 {
2316 let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2318 sst.key_range.right = sst.key_range.left.clone();
2319 let sst: SstableInfo = sst.into();
2320 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2321 let origin_sst = sst;
2322 let sst_size = origin_sst.sst_size;
2323
2324 let mut new_sst_id = 10.into();
2325 let (origin_sst, branched_sst) = group_split::split_sst(
2326 origin_sst,
2327 &mut new_sst_id,
2328 split_key,
2329 sst_size / 2,
2330 sst_size / 2,
2331 );
2332
2333 assert!(origin_sst.is_none());
2334 assert!(branched_sst.is_some());
2335 }
2336 }
2337
2338 #[test]
2339 fn test_split_sst_info_for_level() {
2340 let mut version = HummockVersion {
2341 id: HummockVersionId::new(0),
2342 levels: HashMap::from_iter([(
2343 1.into(),
2344 build_initial_compaction_group_levels(
2345 1,
2346 &CompactionConfig {
2347 max_level: 6,
2348 ..Default::default()
2349 },
2350 ),
2351 )]),
2352 ..Default::default()
2353 };
2354
2355 let cg1 = version.levels.get_mut(&1).unwrap();
2356
2357 cg1.levels[0] = Level {
2358 level_idx: 1,
2359 level_type: LevelType::Nonoverlapping,
2360 table_infos: vec![
2361 gen_sst_info(
2362 1,
2363 vec![3],
2364 FullKey::for_test(
2365 TableId::new(3),
2366 gen_key_from_str(VirtualNode::from_index(1), "1"),
2367 0,
2368 )
2369 .encode()
2370 .into(),
2371 FullKey::for_test(
2372 TableId::new(3),
2373 gen_key_from_str(VirtualNode::from_index(200), "1"),
2374 0,
2375 )
2376 .encode()
2377 .into(),
2378 ),
2379 gen_sst_info(
2380 10,
2381 vec![3, 4],
2382 FullKey::for_test(
2383 TableId::new(3),
2384 gen_key_from_str(VirtualNode::from_index(201), "1"),
2385 0,
2386 )
2387 .encode()
2388 .into(),
2389 FullKey::for_test(
2390 TableId::new(4),
2391 gen_key_from_str(VirtualNode::from_index(10), "1"),
2392 0,
2393 )
2394 .encode()
2395 .into(),
2396 ),
2397 gen_sst_info(
2398 11,
2399 vec![4],
2400 FullKey::for_test(
2401 TableId::new(4),
2402 gen_key_from_str(VirtualNode::from_index(11), "1"),
2403 0,
2404 )
2405 .encode()
2406 .into(),
2407 FullKey::for_test(
2408 TableId::new(4),
2409 gen_key_from_str(VirtualNode::from_index(200), "1"),
2410 0,
2411 )
2412 .encode()
2413 .into(),
2414 ),
2415 ],
2416 total_file_size: 300,
2417 ..Default::default()
2418 };
2419
2420 cg1.l0.sub_levels.push(Level {
2421 level_idx: 0,
2422 table_infos: vec![
2423 gen_sst_info(
2424 2,
2425 vec![2],
2426 FullKey::for_test(
2427 TableId::new(0),
2428 gen_key_from_str(VirtualNode::from_index(1), "1"),
2429 0,
2430 )
2431 .encode()
2432 .into(),
2433 FullKey::for_test(
2434 TableId::new(2),
2435 gen_key_from_str(VirtualNode::from_index(200), "1"),
2436 0,
2437 )
2438 .encode()
2439 .into(),
2440 ),
2441 gen_sst_info(
2442 22,
2443 vec![2],
2444 FullKey::for_test(
2445 TableId::new(0),
2446 gen_key_from_str(VirtualNode::from_index(1), "1"),
2447 0,
2448 )
2449 .encode()
2450 .into(),
2451 FullKey::for_test(
2452 TableId::new(2),
2453 gen_key_from_str(VirtualNode::from_index(200), "1"),
2454 0,
2455 )
2456 .encode()
2457 .into(),
2458 ),
2459 gen_sst_info(
2460 23,
2461 vec![2],
2462 FullKey::for_test(
2463 TableId::new(0),
2464 gen_key_from_str(VirtualNode::from_index(1), "1"),
2465 0,
2466 )
2467 .encode()
2468 .into(),
2469 FullKey::for_test(
2470 TableId::new(2),
2471 gen_key_from_str(VirtualNode::from_index(200), "1"),
2472 0,
2473 )
2474 .encode()
2475 .into(),
2476 ),
2477 gen_sst_info(
2478 24,
2479 vec![2],
2480 FullKey::for_test(
2481 TableId::new(2),
2482 gen_key_from_str(VirtualNode::from_index(1), "1"),
2483 0,
2484 )
2485 .encode()
2486 .into(),
2487 FullKey::for_test(
2488 TableId::new(2),
2489 gen_key_from_str(VirtualNode::from_index(200), "1"),
2490 0,
2491 )
2492 .encode()
2493 .into(),
2494 ),
2495 gen_sst_info(
2496 25,
2497 vec![2],
2498 FullKey::for_test(
2499 TableId::new(0),
2500 gen_key_from_str(VirtualNode::from_index(1), "1"),
2501 0,
2502 )
2503 .encode()
2504 .into(),
2505 FullKey::for_test(
2506 TableId::new(0),
2507 gen_key_from_str(VirtualNode::from_index(200), "1"),
2508 0,
2509 )
2510 .encode()
2511 .into(),
2512 ),
2513 ],
2514 sub_level_id: 101,
2515 level_type: LevelType::Overlapping,
2516 total_file_size: 300,
2517 ..Default::default()
2518 });
2519
2520 {
2521 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2523
2524 let mut new_sst_id = 100.into();
2525 let x = group_split::split_sst_info_for_level_v2(
2526 &mut cg1.l0.sub_levels[0],
2527 &mut new_sst_id,
2528 split_key,
2529 );
2530 let mut right_l0 = OverlappingLevel {
2539 sub_levels: vec![],
2540 total_file_size: 0,
2541 uncompressed_file_size: 0,
2542 };
2543
2544 right_l0.sub_levels.push(Level {
2545 level_idx: 0,
2546 table_infos: x,
2547 sub_level_id: 101,
2548 total_file_size: 100,
2549 level_type: LevelType::Overlapping,
2550 ..Default::default()
2551 });
2552
2553 let right_levels = Levels {
2554 levels: vec![],
2555 l0: right_l0,
2556 ..Default::default()
2557 };
2558
2559 merge_levels(cg1, right_levels);
2560 }
2561
2562 {
2563 let mut new_sst_id = 100.into();
2565 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2566 let x = group_split::split_sst_info_for_level_v2(
2567 &mut cg1.levels[2],
2568 &mut new_sst_id,
2569 split_key,
2570 );
2571
2572 assert!(x.is_empty());
2573 }
2574
2575 {
2576 let mut cg1 = cg1.clone();
2578 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2579
2580 let mut new_sst_id = 100.into();
2581 let x = group_split::split_sst_info_for_level_v2(
2582 &mut cg1.levels[0],
2583 &mut new_sst_id,
2584 split_key,
2585 );
2586
2587 assert_eq!(3, x.len());
2588 assert_eq!(1, x[0].sst_id);
2589 assert_eq!(100, x[0].sst_size);
2590 assert_eq!(10, x[1].sst_id);
2591 assert_eq!(100, x[1].sst_size);
2592 assert_eq!(11, x[2].sst_id);
2593 assert_eq!(100, x[2].sst_size);
2594
2595 assert_eq!(0, cg1.levels[0].table_infos.len());
2596 }
2597
2598 {
2599 let mut cg1 = cg1.clone();
2601 let split_key = group_split::build_split_key(5.into(), VirtualNode::ZERO);
2602
2603 let mut new_sst_id = 100.into();
2604 let x = group_split::split_sst_info_for_level_v2(
2605 &mut cg1.levels[0],
2606 &mut new_sst_id,
2607 split_key,
2608 );
2609
2610 assert_eq!(0, x.len());
2611 assert_eq!(3, cg1.levels[0].table_infos.len());
2612 }
2613
2614 {
2640 let mut cg1 = cg1.clone();
2642 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2643
2644 let mut new_sst_id = 100.into();
2645 let x = group_split::split_sst_info_for_level_v2(
2646 &mut cg1.levels[0],
2647 &mut new_sst_id,
2648 split_key,
2649 );
2650
2651 assert_eq!(2, x.len());
2652 assert_eq!(100, x[0].sst_id);
2653 assert_eq!(100 / 2, x[0].sst_size);
2654 assert_eq!(11, x[1].sst_id);
2655 assert_eq!(100, x[1].sst_size);
2656 assert_eq!(vec![TableId::new(4)], x[1].table_ids);
2657
2658 assert_eq!(2, cg1.levels[0].table_infos.len());
2659 assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2660 assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2661 assert_eq!(
2662 vec![TableId::new(3)],
2663 cg1.levels[0].table_infos[1].table_ids
2664 );
2665 }
2666 }
2667
2668 fn make_sst(sst_id: u64, table_ids: Vec<u32>, sst_size: u64) -> SstableInfo {
2669 SstableInfoInner {
2670 sst_id: sst_id.into(),
2671 object_id: sst_id.into(),
2672 table_ids: table_ids.into_iter().map(TableId::new).collect(),
2673 file_size: sst_size,
2674 sst_size,
2675 uncompressed_file_size: sst_size * 2,
2676 ..Default::default()
2677 }
2678 .into()
2679 }
2680
2681 #[test]
2682 fn test_level_normalize() {
2683 let mut level = Level {
2685 level_idx: 1,
2686 level_type: LevelType::Nonoverlapping,
2687 table_infos: vec![
2688 make_sst(1, vec![1, 2], 100),
2689 make_sst(2, vec![], 200), make_sst(3, vec![3], 300),
2691 make_sst(4, vec![], 400), ],
2693 total_file_size: 9999, uncompressed_file_size: 9999,
2695 ..Default::default()
2696 };
2697
2698 level.normalize();
2699
2700 assert_eq!(level.table_infos.len(), 2);
2701 assert_eq!(1, level.table_infos[0].sst_id);
2702 assert_eq!(3, level.table_infos[1].sst_id);
2703 assert_eq!(level.total_file_size, 400);
2704 assert_eq!(level.uncompressed_file_size, 800);
2705
2706 level.total_file_size = 0;
2708 level.uncompressed_file_size = 0;
2709 level.normalize();
2710 assert_eq!(level.table_infos.len(), 2);
2711 assert_eq!(level.total_file_size, 400);
2712 assert_eq!(level.uncompressed_file_size, 800);
2713
2714 level.table_infos = vec![make_sst(10, vec![], 100), make_sst(11, vec![], 200)];
2716 level.normalize();
2717 assert!(level.table_infos.is_empty());
2718 assert_eq!(level.total_file_size, 0);
2719 assert_eq!(level.uncompressed_file_size, 0);
2720 }
2721
2722 #[test]
2723 fn test_level_delete_ssts() {
2724 let mut level = Level {
2725 level_idx: 1,
2726 level_type: LevelType::Nonoverlapping,
2727 table_infos: vec![
2728 make_sst(1, vec![1], 100),
2729 make_sst(2, vec![2], 200),
2730 make_sst(3, vec![3], 300),
2731 ],
2732 total_file_size: 600,
2733 uncompressed_file_size: 1200,
2734 ..Default::default()
2735 };
2736
2737 let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([2.into()]);
2738 let changed = level.delete_ssts(&delete_ids);
2739
2740 assert!(changed);
2741 assert_eq!(level.table_infos.len(), 2);
2742 assert_eq!(1, level.table_infos[0].sst_id);
2743 assert_eq!(3, level.table_infos[1].sst_id);
2744 assert_eq!(level.total_file_size, 400);
2745 assert_eq!(level.uncompressed_file_size, 800);
2746
2747 let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([999.into()]);
2749 let changed = level.delete_ssts(&delete_ids);
2750 assert!(!changed);
2751 assert_eq!(level.table_infos.len(), 2);
2752 }
2753
2754 #[test]
2755 fn test_level_truncate_tables() {
2756 let mut level = Level {
2757 level_idx: 1,
2758 level_type: LevelType::Nonoverlapping,
2759 table_infos: vec![
2760 make_sst(1, vec![1, 2], 100),
2761 make_sst(2, vec![2, 3], 200),
2762 make_sst(3, vec![2], 300),
2763 ],
2764 total_file_size: 600,
2765 uncompressed_file_size: 1200,
2766 ..Default::default()
2767 };
2768
2769 let truncate_ids = HashSet::from([TableId::new(2)]);
2770 level.truncate_tables(&truncate_ids);
2771
2772 assert_eq!(level.table_infos.len(), 2);
2774 assert_eq!(level.table_infos[0].table_ids, vec![TableId::new(1)]);
2775 assert_eq!(level.table_infos[1].table_ids, vec![TableId::new(3)]);
2776 assert_eq!(level.total_file_size, 100 + 200);
2777 assert_eq!(level.uncompressed_file_size, 200 + 400);
2778
2779 let truncate_ids = HashSet::from([TableId::new(1), TableId::new(3)]);
2781 level.truncate_tables(&truncate_ids);
2782 assert!(level.table_infos.is_empty());
2783 assert_eq!(level.total_file_size, 0);
2784 assert_eq!(level.uncompressed_file_size, 0);
2785 }
2786
2787 #[test]
2788 fn test_overlapping_level_normalize() {
2789 let mut l0 = OverlappingLevel {
2790 sub_levels: vec![
2791 Level {
2792 level_idx: 0,
2793 table_infos: vec![make_sst(1, vec![1], 100)],
2794 total_file_size: 100,
2795 uncompressed_file_size: 200,
2796 sub_level_id: 1,
2797 ..Default::default()
2798 },
2799 Level {
2800 level_idx: 0,
2801 table_infos: vec![], total_file_size: 0,
2803 uncompressed_file_size: 0,
2804 sub_level_id: 2,
2805 ..Default::default()
2806 },
2807 Level {
2808 level_idx: 0,
2809 table_infos: vec![make_sst(3, vec![3], 300)],
2810 total_file_size: 300,
2811 uncompressed_file_size: 600,
2812 sub_level_id: 3,
2813 ..Default::default()
2814 },
2815 ],
2816 total_file_size: 9999, uncompressed_file_size: 9999,
2818 };
2819
2820 l0.normalize();
2821
2822 assert_eq!(l0.sub_levels.len(), 2);
2823 assert_eq!(l0.sub_levels[0].sub_level_id, 1);
2824 assert_eq!(l0.sub_levels[1].sub_level_id, 3);
2825 assert_eq!(l0.total_file_size, 100 + 300);
2826 assert_eq!(l0.uncompressed_file_size, 200 + 600);
2827
2828 l0.sub_levels = vec![Level {
2830 level_idx: 0,
2831 table_infos: vec![],
2832 ..Default::default()
2833 }];
2834 l0.normalize();
2835 assert!(l0.sub_levels.is_empty());
2836 assert_eq!(l0.total_file_size, 0);
2837 assert_eq!(l0.uncompressed_file_size, 0);
2838 }
2839
2840 #[test]
2841 fn test_levels_truncate_tables() {
2842 #[expect(deprecated)]
2843 let mut levels = Levels {
2844 l0: OverlappingLevel {
2845 sub_levels: vec![
2846 Level {
2847 level_idx: 0,
2848 table_infos: vec![
2849 make_sst(1, vec![10], 100), make_sst(2, vec![20], 200), ],
2852 total_file_size: 300,
2853 uncompressed_file_size: 600,
2854 sub_level_id: 1,
2855 ..Default::default()
2856 },
2857 Level {
2858 level_idx: 0,
2859 table_infos: vec![
2860 make_sst(3, vec![10], 150), ],
2862 total_file_size: 150,
2863 uncompressed_file_size: 300,
2864 sub_level_id: 2,
2865 ..Default::default()
2866 },
2867 ],
2868 total_file_size: 450,
2869 uncompressed_file_size: 900,
2870 },
2871 levels: vec![Level {
2872 level_idx: 1,
2873 level_type: LevelType::Nonoverlapping,
2874 table_infos: vec![
2875 make_sst(4, vec![10, 20], 400), make_sst(5, vec![10], 500), ],
2878 total_file_size: 900,
2879 uncompressed_file_size: 1800,
2880 ..Default::default()
2881 }],
2882 group_id: 1.into(),
2883 parent_group_id: 0.into(),
2884 member_table_ids: vec![],
2885 compaction_group_version_id: 0,
2886 };
2887
2888 levels.truncate_tables(&HashSet::from([TableId::new(10)]));
2890
2891 assert_eq!(levels.l0.sub_levels.len(), 1);
2892 assert_eq!(levels.l0.sub_levels[0].sub_level_id, 1);
2893 assert_eq!(levels.l0.sub_levels[0].table_infos.len(), 1);
2894 assert_eq!(2, levels.l0.sub_levels[0].table_infos[0].sst_id);
2895 assert_eq!(levels.l0.sub_levels[0].total_file_size, 200);
2896 assert_eq!(levels.l0.sub_levels[0].uncompressed_file_size, 400);
2897
2898 assert_eq!(levels.l0.total_file_size, 200);
2899 assert_eq!(levels.l0.uncompressed_file_size, 400);
2900
2901 assert_eq!(levels.levels[0].table_infos.len(), 1);
2902 assert_eq!(4, levels.levels[0].table_infos[0].sst_id);
2903 assert_eq!(
2904 levels.levels[0].table_infos[0].table_ids,
2905 vec![TableId::new(20)]
2906 );
2907 assert_eq!(levels.levels[0].total_file_size, 400);
2908 assert_eq!(levels.levels[0].uncompressed_file_size, 800);
2909
2910 assert_eq!(levels.compaction_group_version_id, 1);
2911 }
2912
2913 #[test]
2914 fn test_apply_version_delta_truncate_tables() {
2915 let mut version = HummockVersion {
2916 id: HummockVersionId::new(0),
2917 levels: HashMap::from_iter([(1.into(), {
2918 #[expect(deprecated)]
2919 let levels = Levels {
2920 l0: OverlappingLevel {
2921 sub_levels: vec![
2922 Level {
2923 level_idx: 0,
2924 level_type: LevelType::Overlapping,
2925 table_infos: vec![
2926 make_sst(1, vec![100], 50), make_sst(2, vec![200], 60), ],
2929 total_file_size: 110,
2930 uncompressed_file_size: 220,
2931 sub_level_id: 1,
2932 ..Default::default()
2933 },
2934 Level {
2935 level_idx: 0,
2936 level_type: LevelType::Overlapping,
2937 table_infos: vec![
2938 make_sst(3, vec![100], 70), ],
2940 total_file_size: 70,
2941 uncompressed_file_size: 140,
2942 sub_level_id: 2,
2943 ..Default::default()
2944 },
2945 ],
2946 total_file_size: 180,
2947 uncompressed_file_size: 360,
2948 },
2949 levels: vec![Level {
2950 level_idx: 1,
2951 level_type: LevelType::Nonoverlapping,
2952 table_infos: vec![
2953 make_sst(4, vec![100, 200], 80), make_sst(5, vec![100], 90), ],
2956 total_file_size: 170,
2957 uncompressed_file_size: 340,
2958 ..Default::default()
2959 }],
2960 group_id: 1.into(),
2961 parent_group_id: 0.into(),
2962 member_table_ids: vec![],
2963 compaction_group_version_id: 0,
2964 };
2965 levels
2966 })]),
2967 ..Default::default()
2968 };
2969
2970 let version_delta = HummockVersionDelta {
2971 id: HummockVersionId::new(1),
2972 group_deltas: HashMap::from_iter([(
2973 1.into(),
2974 GroupDeltas {
2975 group_deltas: vec![GroupDelta::TruncateTables(HashSet::from([TableId::new(
2976 100,
2977 )]))],
2978 },
2979 )]),
2980 ..Default::default()
2981 };
2982
2983 version.apply_version_delta(&version_delta);
2984
2985 let cg = version.get_compaction_group_levels(1.into());
2986
2987 assert_eq!(
2988 cg.l0.sub_levels.len(),
2989 1,
2990 "empty sub-level should be removed"
2991 );
2992 assert_eq!(cg.l0.sub_levels[0].sub_level_id, 1);
2993 assert_eq!(cg.l0.sub_levels[0].table_infos.len(), 1);
2994 assert_eq!(2, cg.l0.sub_levels[0].table_infos[0].sst_id);
2995 for sub_level in &cg.l0.sub_levels {
2996 for sst in &sub_level.table_infos {
2997 assert!(
2998 !sst.table_ids.is_empty(),
2999 "SST {} should not have empty table_ids",
3000 sst.sst_id
3001 );
3002 }
3003 }
3004
3005 assert_eq!(cg.l0.total_file_size, 60);
3006 assert_eq!(cg.l0.uncompressed_file_size, 120);
3007
3008 assert_eq!(cg.levels[0].table_infos.len(), 1);
3009 assert_eq!(4, cg.levels[0].table_infos[0].sst_id);
3010 assert_eq!(
3011 cg.levels[0].table_infos[0].table_ids,
3012 vec![TableId::new(200)]
3013 );
3014 assert_eq!(cg.levels[0].total_file_size, 80);
3015 assert_eq!(cg.levels[0].uncompressed_file_size, 160);
3016
3017 assert_eq!(cg.compaction_group_version_id, 1);
3018 }
3019
3020 #[test]
3021 fn test_apply_version_delta_compact_l0() {
3022 let mut version = HummockVersion {
3023 id: HummockVersionId::new(0),
3024 levels: HashMap::from_iter([(1.into(), {
3025 #[expect(deprecated)]
3026 let levels = Levels {
3027 l0: OverlappingLevel {
3028 sub_levels: vec![
3029 Level {
3030 level_idx: 0,
3031 level_type: LevelType::Nonoverlapping,
3032 table_infos: vec![
3033 make_sst(1, vec![1], 100),
3034 make_sst(2, vec![2], 200),
3035 ],
3036 total_file_size: 300,
3037 uncompressed_file_size: 600,
3038 sub_level_id: 1,
3039 ..Default::default()
3040 },
3041 Level {
3042 level_idx: 0,
3043 level_type: LevelType::Nonoverlapping,
3044 table_infos: vec![make_sst(3, vec![3], 300)],
3045 total_file_size: 300,
3046 uncompressed_file_size: 600,
3047 sub_level_id: 2,
3048 ..Default::default()
3049 },
3050 ],
3051 total_file_size: 600,
3052 uncompressed_file_size: 1200,
3053 },
3054 levels: vec![Level {
3055 level_idx: 1,
3056 level_type: LevelType::Nonoverlapping,
3057 table_infos: vec![],
3058 total_file_size: 0,
3059 uncompressed_file_size: 0,
3060 ..Default::default()
3061 }],
3062 group_id: 1.into(),
3063 parent_group_id: 0.into(),
3064 member_table_ids: vec![],
3065 compaction_group_version_id: 0,
3066 };
3067 levels
3068 })]),
3069 ..Default::default()
3070 };
3071
3072 let version_delta = HummockVersionDelta {
3073 id: HummockVersionId::new(1),
3074 group_deltas: HashMap::from_iter([(
3075 1.into(),
3076 GroupDeltas {
3077 group_deltas: vec![
3078 GroupDelta::IntraLevel(IntraLevelDelta::new(
3079 0, 0,
3081 HashSet::from([1.into(), 2.into(), 3.into()]),
3082 vec![],
3083 0,
3084 0,
3085 )),
3086 GroupDelta::IntraLevel(IntraLevelDelta::new(
3087 1, 0,
3089 HashSet::new(),
3090 vec![make_sst(10, vec![1, 2, 3], 500)],
3091 0,
3092 0,
3093 )),
3094 ],
3095 },
3096 )]),
3097 ..Default::default()
3098 };
3099
3100 version.apply_version_delta(&version_delta);
3101
3102 let cg = version.get_compaction_group_levels(1.into());
3103
3104 assert!(cg.l0.sub_levels.is_empty());
3105 assert_eq!(cg.l0.total_file_size, 0);
3106 assert_eq!(cg.l0.uncompressed_file_size, 0);
3107
3108 assert_eq!(cg.levels[0].table_infos.len(), 1);
3109 assert_eq!(10, cg.levels[0].table_infos[0].sst_id);
3110 assert_eq!(cg.levels[0].total_file_size, 500);
3111 }
3112}