1use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_common::log::LogSuppressor;
27use risingwave_pb::hummock::{
28 CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
29};
30use tracing::warn;
31
32use super::group_split::split_sst_with_table_ids;
33use super::{StateTableId, group_split};
34use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
35use crate::compact_task::is_compaction_task_expired;
36use crate::compaction_group::StaticCompactionGroupId;
37use crate::key_range::KeyRangeCommon;
38use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
39use crate::sstable_info::SstableInfo;
40use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
41use crate::vector_index::apply_vector_index_delta;
42use crate::version::{
43 GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
44 IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
45};
46use crate::{
47 CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
48};
49
50#[derive(Debug, Clone, Default)]
51pub struct SstDeltaInfo {
52 pub insert_sst_level: u32,
53 pub insert_sst_infos: Vec<SstableInfo>,
54 pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
55}
56
57pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
58
59impl<L> HummockVersionCommon<SstableInfo, L> {
60 pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
61 self.levels
62 .get(&compaction_group_id)
63 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
64 }
65
66 pub fn get_compaction_group_levels_mut(
67 &mut self,
68 compaction_group_id: CompactionGroupId,
69 ) -> &mut Levels {
70 self.levels
71 .get_mut(&compaction_group_id)
72 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
73 }
74
75 pub fn get_sst_ids_by_group_id(
77 &self,
78 compaction_group_id: CompactionGroupId,
79 ) -> impl Iterator<Item = HummockSstableId> + '_ {
80 self.levels
81 .iter()
82 .filter_map(move |(cg_id, level)| {
83 if *cg_id == compaction_group_id {
84 Some(level)
85 } else {
86 None
87 }
88 })
89 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
90 .flat_map(|level| level.table_infos.iter())
91 .map(|s| s.sst_id)
92 }
93
94 pub fn prune_stale_table_ids_from_ssts(&mut self) -> usize {
99 let live_table_ids: HashSet<_> = self.state_table_info.info().keys().copied().collect();
100 if live_table_ids.is_empty()
103 && self.levels.values().any(|levels| {
104 #[expect(deprecated)]
105 {
106 !levels.member_table_ids.is_empty()
107 }
108 })
109 {
110 return 0;
111 }
112
113 let mut pruned_table_ids = HashSet::new();
114
115 for levels in self.levels.values_mut() {
116 let stale_table_ids = levels
117 .l0
118 .sub_levels
119 .iter()
120 .chain(levels.levels.iter())
121 .flat_map(|level| level.table_infos.iter())
122 .flat_map(|sst| sst.table_ids.iter().copied())
123 .filter(|table_id| !live_table_ids.contains(table_id))
124 .collect::<HashSet<_>>();
125
126 if stale_table_ids.is_empty() {
127 continue;
128 }
129
130 pruned_table_ids.extend(stale_table_ids.iter().copied());
131 levels.prune_table_ids_from_ssts(&stale_table_ids);
132 }
133
134 pruned_table_ids.len()
135 }
136
137 pub fn level_iter<F: FnMut(&Level) -> bool>(
138 &self,
139 compaction_group_id: CompactionGroupId,
140 mut f: F,
141 ) {
142 if let Some(levels) = self.levels.get(&compaction_group_id) {
143 for sub_level in &levels.l0.sub_levels {
144 if !f(sub_level) {
145 return;
146 }
147 }
148 for level in &levels.levels {
149 if !f(level) {
150 return;
151 }
152 }
153 }
154 }
155
156 pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
157 self.levels
159 .get(&compaction_group_id)
160 .map(|group| group.levels.len() + 1)
161 .unwrap_or(0)
162 }
163
164 pub fn safe_epoch_table_watermarks(
165 &self,
166 existing_table_ids: &[TableId],
167 ) -> BTreeMap<TableId, TableWatermarks> {
168 safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
169 }
170}
171
172pub fn safe_epoch_table_watermarks_impl(
173 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
174 existing_table_ids: &[TableId],
175) -> BTreeMap<TableId, TableWatermarks> {
176 fn extract_single_table_watermark(
177 table_watermarks: &TableWatermarks,
178 ) -> Option<TableWatermarks> {
179 if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
180 Some(TableWatermarks {
181 watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
182 direction: table_watermarks.direction,
183 watermark_type: table_watermarks.watermark_type,
184 })
185 } else {
186 None
187 }
188 }
189 table_watermarks
190 .iter()
191 .filter_map(|(table_id, table_watermarks)| {
192 if !existing_table_ids.contains(table_id) {
193 None
194 } else {
195 extract_single_table_watermark(table_watermarks)
196 .map(|table_watermarks| (*table_id, table_watermarks))
197 }
198 })
199 .collect()
200}
201
202pub fn safe_epoch_read_table_watermarks_impl(
203 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
204) -> BTreeMap<TableId, ReadTableWatermark> {
205 safe_epoch_watermarks
206 .into_iter()
207 .map(|(table_id, watermarks)| {
208 assert_eq!(watermarks.watermarks.len(), 1);
209 let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
210 let mut vnode_watermark_map = BTreeMap::new();
211 for vnode_watermark in vnode_watermarks.iter() {
212 let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
213 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
214 assert!(
215 vnode_watermark_map
216 .insert(vnode, watermark.clone())
217 .is_none(),
218 "duplicate table watermark on vnode {}",
219 vnode.to_index()
220 );
221 }
222 }
223 (
224 table_id,
225 ReadTableWatermark {
226 direction: watermarks.direction,
227 vnode_watermarks: vnode_watermark_map,
228 },
229 )
230 })
231 .collect()
232}
233
234impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
235 pub fn count_new_ssts_in_group_split(
236 &self,
237 parent_group_id: CompactionGroupId,
238 split_key: Bytes,
239 ) -> u64 {
240 self.levels
241 .get(&parent_group_id)
242 .map_or(0, |parent_levels| {
243 let l0 = &parent_levels.l0;
244 let mut split_count = 0;
245 for sub_level in &l0.sub_levels {
246 assert!(!sub_level.table_infos.is_empty());
247
248 if sub_level.level_type == PbLevelType::Overlapping {
249 split_count += sub_level
251 .table_infos
252 .iter()
253 .map(|sst| {
254 if let group_split::SstSplitType::Both =
255 group_split::need_to_split(sst, split_key.clone())
256 {
257 2
258 } else {
259 0
260 }
261 })
262 .sum::<u64>();
263 continue;
264 }
265
266 let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
267 let sst = sub_level.table_infos.get(pos).unwrap();
268
269 if let group_split::SstSplitType::Both =
270 group_split::need_to_split(sst, split_key.clone())
271 {
272 split_count += 2;
273 }
274 }
275
276 for level in &parent_levels.levels {
277 if level.table_infos.is_empty() {
278 continue;
279 }
280 let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
281 let sst = level.table_infos.get(pos).unwrap();
282 if let group_split::SstSplitType::Both =
283 group_split::need_to_split(sst, split_key.clone())
284 {
285 split_count += 2;
286 }
287 }
288
289 split_count
290 })
291 }
292
293 pub fn init_with_parent_group(
294 &mut self,
295 parent_group_id: CompactionGroupId,
296 group_id: CompactionGroupId,
297 member_table_ids: BTreeSet<StateTableId>,
298 new_sst_start_id: HummockSstableId,
299 ) {
300 let mut new_sst_id = new_sst_start_id;
301 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
302 if new_sst_start_id != 0 {
303 if cfg!(debug_assertions) {
304 panic!(
305 "non-zero sst start id {} for NewCompactionGroup",
306 new_sst_start_id
307 );
308 } else {
309 warn!(
310 %new_sst_start_id,
311 "non-zero sst start id for NewCompactionGroup"
312 );
313 }
314 }
315 return;
316 } else if !self.levels.contains_key(&parent_group_id) {
317 unreachable!(
318 "non-existing parent group id {} to init from",
319 parent_group_id
320 );
321 }
322 let [parent_levels, cur_levels] = self
323 .levels
324 .get_disjoint_mut([&parent_group_id, &group_id])
325 .map(|res| res.unwrap());
326 parent_levels.compaction_group_version_id += 1;
329 cur_levels.compaction_group_version_id += 1;
330 let l0 = &mut parent_levels.l0;
331 {
332 for sub_level in &mut l0.sub_levels {
333 let target_l0 = &mut cur_levels.l0;
334 let insert_table_infos =
337 split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
338 sub_level.normalize();
339 if insert_table_infos.is_empty() {
340 continue;
341 }
342 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
343 Ok(idx) => {
344 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
345 }
346 Err(idx) => {
347 insert_new_sub_level(
348 target_l0,
349 sub_level.sub_level_id,
350 sub_level.level_type,
351 insert_table_infos,
352 Some(idx),
353 );
354 }
355 }
356 }
357 l0.normalize();
358 }
359 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
360 let insert_table_infos =
361 split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
362 cur_levels.levels[idx].total_file_size += insert_table_infos
363 .iter()
364 .map(|sst| sst.sst_size)
365 .sum::<u64>();
366 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
367 .iter()
368 .map(|sst| sst.uncompressed_file_size)
369 .sum::<u64>();
370 cur_levels.levels[idx]
371 .table_infos
372 .extend(insert_table_infos);
373 cur_levels.levels[idx]
374 .table_infos
375 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
376 assert!(can_concat(&cur_levels.levels[idx].table_infos));
377 level.normalize();
378 }
379
380 assert!(
381 parent_levels
382 .l0
383 .sub_levels
384 .iter()
385 .all(|level| !level.table_infos.is_empty())
386 );
387 assert!(
388 cur_levels
389 .l0
390 .sub_levels
391 .iter()
392 .all(|level| !level.table_infos.is_empty())
393 );
394 }
395
396 pub fn build_sst_delta_infos(
397 &self,
398 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
399 ) -> Vec<SstDeltaInfo> {
400 let mut infos = vec![];
401
402 if version_delta.trivial_move {
405 return infos;
406 }
407
408 for (group_id, group_deltas) in &version_delta.group_deltas {
409 let mut info = SstDeltaInfo::default();
410
411 let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
412 let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
413
414 if !group_deltas.group_deltas.iter().all(|delta| {
416 matches!(
417 delta,
418 GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
419 )
420 }) {
421 continue;
422 }
423
424 for group_delta in &group_deltas.group_deltas {
428 match group_delta {
429 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
430 if !inserted_table_infos.is_empty() {
431 info.insert_sst_level = 0;
432 info.insert_sst_infos
433 .extend(inserted_table_infos.iter().cloned());
434 }
435 }
436 GroupDeltaCommon::IntraLevel(intra_level) => {
437 if !intra_level.inserted_table_infos.is_empty() {
438 info.insert_sst_level = intra_level.level_idx;
439 info.insert_sst_infos
440 .extend(intra_level.inserted_table_infos.iter().cloned());
441 }
442 if !intra_level.removed_table_ids.is_empty() {
443 for id in &intra_level.removed_table_ids {
444 if intra_level.level_idx == 0 {
445 removed_l0_ssts.insert(*id);
446 } else {
447 removed_ssts
448 .entry(intra_level.level_idx)
449 .or_default()
450 .insert(*id);
451 }
452 }
453 }
454 }
455 GroupDeltaCommon::GroupConstruct(_)
456 | GroupDeltaCommon::GroupDestroy(_)
457 | GroupDeltaCommon::GroupMerge(_)
458 | GroupDeltaCommon::PruneTableIdsFromSsts(_) => {}
459 }
460 }
461
462 let group = self.levels.get(group_id).unwrap();
463 for l0_sub_level in &group.level0().sub_levels {
464 for sst_info in &l0_sub_level.table_infos {
465 if removed_l0_ssts.remove(&sst_info.sst_id) {
466 info.delete_sst_object_ids.push(sst_info.object_id);
467 }
468 }
469 }
470 for level in &group.levels {
471 if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
472 for sst_info in &level.table_infos {
473 if removed_level_ssts.remove(&sst_info.sst_id) {
474 info.delete_sst_object_ids.push(sst_info.object_id);
475 }
476 }
477 if !removed_level_ssts.is_empty() {
478 tracing::error!(
479 "removed_level_ssts is not empty: {:?}",
480 removed_level_ssts,
481 );
482 }
483 debug_assert!(removed_level_ssts.is_empty());
484 }
485 }
486
487 if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
488 tracing::error!(
489 "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
490 removed_l0_ssts,
491 removed_ssts
492 );
493 }
494 debug_assert!(removed_l0_ssts.is_empty());
495 debug_assert!(removed_ssts.is_empty());
496
497 infos.push(info);
498 }
499
500 infos
501 }
502
503 pub fn apply_table_change_log_delta_backward_compatibility(
506 &mut self,
507 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
508 ) {
509 #[expect(deprecated)]
510 for (table_id, change_log_delta) in &version_delta.change_log_delta {
511 let new_change_log = &change_log_delta.new_log;
512 match self.table_change_log.entry(*table_id) {
513 Entry::Occupied(entry) => {
514 let change_log = entry.into_mut();
515 change_log.add_change_log(new_change_log.clone());
516 }
517 Entry::Vacant(entry) => {
518 entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
519 }
520 };
521 }
522 }
523
524 pub fn apply_version_delta(
525 &mut self,
526 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
527 ) -> HashMap<TableId, Option<StateTableInfo>> {
528 assert_eq!(self.id, version_delta.prev_id);
529
530 let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
531 &version_delta.state_table_info_delta,
532 &version_delta.removed_table_ids,
533 );
534
535 #[expect(deprecated)]
536 {
537 if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
538 is_commit_epoch = true;
539 tracing::trace!(
540 "max committed epoch bumped but no table committed epoch is changed"
541 );
542 }
543 }
544
545 for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
547 let mut is_applied_l0_compact = false;
548 for group_delta in &group_deltas.group_deltas {
549 match group_delta {
550 GroupDeltaCommon::GroupConstruct(group_construct) => {
551 let mut new_levels = build_initial_compaction_group_levels(
552 *compaction_group_id,
553 group_construct.get_group_config().unwrap(),
554 );
555 let parent_group_id = group_construct.parent_group_id;
556 new_levels.parent_group_id = parent_group_id;
557 #[expect(deprecated)]
558 new_levels
560 .member_table_ids
561 .clone_from(&group_construct.table_ids);
562 self.levels.insert(*compaction_group_id, new_levels);
563 let member_table_ids = if group_construct.version()
564 >= CompatibilityVersion::NoMemberTableIds
565 {
566 self.state_table_info
567 .compaction_group_member_table_ids(*compaction_group_id)
568 .iter()
569 .copied()
570 .collect()
571 } else {
572 #[expect(deprecated)]
573 BTreeSet::from_iter(
575 group_construct.table_ids.iter().copied().map(Into::into),
576 )
577 };
578
579 if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
580 let split_key = if group_construct.split_key.is_some() {
581 Some(Bytes::from(group_construct.split_key.clone().unwrap()))
582 } else {
583 None
584 };
585 self.init_with_parent_group_v2(
586 parent_group_id,
587 *compaction_group_id,
588 group_construct.new_sst_start_id,
589 split_key.clone(),
590 );
591 } else {
592 self.init_with_parent_group(
594 parent_group_id,
595 *compaction_group_id,
596 member_table_ids,
597 group_construct.new_sst_start_id,
598 );
599 }
600 }
601 GroupDeltaCommon::GroupMerge(group_merge) => {
602 tracing::info!(
603 "group_merge left {:?} right {:?}",
604 group_merge.left_group_id,
605 group_merge.right_group_id
606 );
607 self.merge_compaction_group(
608 group_merge.left_group_id,
609 group_merge.right_group_id,
610 )
611 }
612 GroupDeltaCommon::IntraLevel(level_delta) => {
613 let levels =
614 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
615 panic!("compaction group {} does not exist", compaction_group_id)
616 });
617 if is_commit_epoch {
618 assert!(
619 level_delta.removed_table_ids.is_empty(),
620 "no sst should be deleted when committing an epoch"
621 );
622
623 let IntraLevelDelta {
624 level_idx,
625 l0_sub_level_id,
626 inserted_table_infos,
627 ..
628 } = level_delta;
629 {
630 assert_eq!(
631 *level_idx, 0,
632 "we should only add to L0 when we commit an epoch."
633 );
634 if !inserted_table_infos.is_empty() {
635 insert_new_sub_level(
636 &mut levels.l0,
637 *l0_sub_level_id,
638 PbLevelType::Overlapping,
639 inserted_table_infos.clone(),
640 None,
641 );
642 }
643 }
644 } else {
645 levels.apply_compact_ssts(
647 level_delta,
648 self.state_table_info
649 .compaction_group_member_table_ids(*compaction_group_id),
650 );
651 if level_delta.level_idx == 0 {
652 is_applied_l0_compact = true;
653 }
654 }
655 }
656 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
657 let levels =
658 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
659 panic!("compaction group {} does not exist", compaction_group_id)
660 });
661 assert!(is_commit_epoch);
662
663 if !inserted_table_infos.is_empty() {
664 let next_l0_sub_level_id = levels
665 .l0
666 .sub_levels
667 .last()
668 .map(|level| level.sub_level_id + 1)
669 .unwrap_or(1);
670
671 insert_new_sub_level(
672 &mut levels.l0,
673 next_l0_sub_level_id,
674 PbLevelType::Overlapping,
675 inserted_table_infos.clone(),
676 None,
677 );
678 }
679 }
680 GroupDeltaCommon::GroupDestroy(_) => {
681 self.levels.remove(compaction_group_id);
682 }
683
684 GroupDeltaCommon::PruneTableIdsFromSsts(table_ids) => {
685 self.levels
686 .get_mut(compaction_group_id)
687 .unwrap_or_else(|| {
688 panic!("compaction group {} does not exist", compaction_group_id)
689 })
690 .prune_table_ids_from_ssts(table_ids);
691 }
692 }
693 }
694
695 if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
696 {
697 levels.l0.normalize();
698 }
699 }
700 self.id = version_delta.id;
701 #[expect(deprecated)]
702 {
703 self.max_committed_epoch = version_delta.max_committed_epoch;
704 }
705
706 let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
710 HashMap::new();
711
712 for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
714 if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
715 if version_delta.removed_table_ids.contains(table_id) {
716 modified_table_watermarks.insert(*table_id, None);
717 } else {
718 let mut current_table_watermarks = (**current_table_watermarks).clone();
719 current_table_watermarks.apply_new_table_watermarks(table_watermarks);
720 modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
721 }
722 } else {
723 modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
724 }
725 }
726 for (table_id, table_watermarks) in &self.table_watermarks {
727 let safe_epoch = if let Some(state_table_info) =
728 self.state_table_info.info().get(table_id)
729 && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
730 && state_table_info.committed_epoch > *oldest_epoch
731 {
732 state_table_info.committed_epoch
734 } else {
735 continue;
737 };
738 let table_watermarks = modified_table_watermarks
739 .entry(*table_id)
740 .or_insert_with(|| Some((**table_watermarks).clone()));
741 if let Some(table_watermarks) = table_watermarks {
742 table_watermarks.clear_stale_epoch_watermark(safe_epoch);
743 }
744 }
745 for (table_id, table_watermarks) in modified_table_watermarks {
747 if let Some(table_watermarks) = table_watermarks {
748 self.table_watermarks
749 .insert(table_id, Arc::new(table_watermarks));
750 } else {
751 self.table_watermarks.remove(&table_id);
752 }
753 }
754 apply_vector_index_delta(
756 &mut self.vector_indexes,
757 &version_delta.vector_index_delta,
758 &version_delta.removed_table_ids,
759 );
760
761 changed_table_info
762 }
763
764 pub fn apply_change_log_delta<T: Clone>(
765 table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
766 change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
767 ) {
768 for (table_id, change_log_delta) in change_log_delta {
769 let new_change_log = &change_log_delta.new_log;
770 match table_change_log.entry(*table_id) {
771 Entry::Occupied(entry) => {
772 let change_log = entry.into_mut();
773 change_log.add_change_log(new_change_log.clone());
774 }
775 Entry::Vacant(entry) => {
776 entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
777 }
778 };
779 }
780
781 for (table_id, change_log_delta) in change_log_delta {
783 if let Some(change_log) = table_change_log.get_mut(table_id) {
784 change_log.truncate(change_log_delta.truncate_epoch);
785 }
786 }
787 }
788
789 pub fn collect_gc_change_log_delta<'a, T: Clone>(
791 current_change_log_table_ids: impl Iterator<Item = &'a TableId>,
792 change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
793 removed_table_ids: &HashSet<TableId>,
794 state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
795 changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
796 ) -> HashSet<TableId> {
797 let mut gc_change_log_delta = HashSet::new();
798 for table_id in current_change_log_table_ids {
802 if removed_table_ids.contains(table_id) {
803 gc_change_log_delta.insert(*table_id);
804 continue;
805 }
806 if let Some(table_info_delta) = state_table_info_delta.get(table_id)
807 && let Some(Some(prev_table_info)) = changed_table_info.get(table_id)
808 && table_info_delta.committed_epoch > prev_table_info.committed_epoch
809 {
810 } else {
812 continue;
814 }
815 let contains = change_log_delta.contains_key(table_id);
816 if !contains {
817 gc_change_log_delta.insert(*table_id);
818 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
819 LazyLock::new(|| LogSuppressor::per_second(1));
820 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
821 warn!(
822 suppressed_count,
823 %table_id,
824 "table change log dropped due to no further change log at newly committed epoch"
825 );
826 }
827 }
828 }
829 gc_change_log_delta
830 }
831
832 pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
833 let mut ret: BTreeMap<_, _> = BTreeMap::new();
834 for (compaction_group_id, group) in &self.levels {
835 let mut levels = vec![];
836 levels.extend(group.l0.sub_levels.iter());
837 levels.extend(group.levels.iter());
838 for level in levels {
839 for table_info in &level.table_infos {
840 if table_info.sst_id.as_raw_id() == table_info.object_id.as_raw_id() {
841 continue;
842 }
843 let object_id = table_info.object_id;
844 let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
845 entry
846 .entry(*compaction_group_id)
847 .or_default()
848 .push(table_info.sst_id)
849 }
850 }
851 }
852 ret
853 }
854
855 pub fn merge_compaction_group(
856 &mut self,
857 left_group_id: CompactionGroupId,
858 right_group_id: CompactionGroupId,
859 ) {
860 let left_group_id_table_ids = self
862 .state_table_info
863 .compaction_group_member_table_ids(left_group_id)
864 .iter();
865 let right_group_id_table_ids = self
866 .state_table_info
867 .compaction_group_member_table_ids(right_group_id)
868 .iter();
869
870 assert!(
871 left_group_id_table_ids
872 .chain(right_group_id_table_ids)
873 .is_sorted()
874 );
875
876 let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
877 let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
878 panic!(
879 "compaction group should exist right {} all {:?}",
880 right_group_id, total_cg
881 )
882 });
883
884 let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
885 panic!(
886 "compaction group should exist left {} all {:?}",
887 left_group_id, total_cg
888 )
889 });
890
891 group_split::merge_levels(left_levels, right_levels);
892 }
893
894 pub fn init_with_parent_group_v2(
895 &mut self,
896 parent_group_id: CompactionGroupId,
897 group_id: CompactionGroupId,
898 new_sst_start_id: HummockSstableId,
899 split_key: Option<Bytes>,
900 ) {
901 let mut new_sst_id = new_sst_start_id;
902 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
903 if new_sst_start_id != 0 {
904 if cfg!(debug_assertions) {
905 panic!(
906 "non-zero sst start id {} for NewCompactionGroup",
907 new_sst_start_id
908 );
909 } else {
910 warn!(
911 %new_sst_start_id,
912 "non-zero sst start id for NewCompactionGroup"
913 );
914 }
915 }
916 return;
917 } else if !self.levels.contains_key(&parent_group_id) {
918 unreachable!(
919 "non-existing parent group id {} to init from (V2)",
920 parent_group_id
921 );
922 }
923
924 let [parent_levels, cur_levels] = self
925 .levels
926 .get_disjoint_mut([&parent_group_id, &group_id])
927 .map(|res| res.unwrap());
928 parent_levels.compaction_group_version_id += 1;
931 cur_levels.compaction_group_version_id += 1;
932
933 let l0 = &mut parent_levels.l0;
934 {
935 for sub_level in &mut l0.sub_levels {
936 let target_l0 = &mut cur_levels.l0;
937 let insert_table_infos = if let Some(split_key) = &split_key {
940 group_split::split_sst_info_for_level_v2(
941 sub_level,
942 &mut new_sst_id,
943 split_key.clone(),
944 )
945 } else {
946 vec![]
947 };
948
949 if insert_table_infos.is_empty() {
950 continue;
951 }
952
953 sub_level.normalize();
954 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
955 Ok(idx) => {
956 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
957 }
958 Err(idx) => {
959 insert_new_sub_level(
960 target_l0,
961 sub_level.sub_level_id,
962 sub_level.level_type,
963 insert_table_infos,
964 Some(idx),
965 );
966 }
967 }
968 }
969 l0.normalize();
970 }
971
972 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
973 let insert_table_infos = if let Some(split_key) = &split_key {
974 group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
975 } else {
976 vec![]
977 };
978
979 if insert_table_infos.is_empty() {
980 continue;
981 }
982
983 cur_levels.levels[idx].total_file_size += insert_table_infos
984 .iter()
985 .map(|sst| sst.sst_size)
986 .sum::<u64>();
987 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
988 .iter()
989 .map(|sst| sst.uncompressed_file_size)
990 .sum::<u64>();
991 cur_levels.levels[idx]
992 .table_infos
993 .extend(insert_table_infos);
994 cur_levels.levels[idx]
995 .table_infos
996 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
997 assert!(can_concat(&cur_levels.levels[idx].table_infos));
998 level.normalize();
999 }
1000
1001 assert!(
1002 parent_levels
1003 .l0
1004 .sub_levels
1005 .iter()
1006 .all(|level| !level.table_infos.is_empty())
1007 );
1008 assert!(
1009 cur_levels
1010 .l0
1011 .sub_levels
1012 .iter()
1013 .all(|level| !level.table_infos.is_empty())
1014 );
1015 }
1016}
1017
1018impl<T> HummockVersionCommon<T>
1019where
1020 T: SstableIdReader + ObjectIdReader,
1021{
1022 pub fn get_object_ids(&self) -> impl Iterator<Item = HummockObjectId> + '_ {
1023 match HummockObjectId::Sstable(0.into()) {
1027 HummockObjectId::Sstable(_) => {}
1028 HummockObjectId::VectorFile(_) => {}
1029 HummockObjectId::HnswGraphFile(_) => {}
1030 };
1031 self.get_sst_infos()
1032 .map(|s| HummockObjectId::Sstable(s.object_id()))
1033 .chain(
1034 self.vector_indexes
1035 .values()
1036 .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
1037 )
1038 }
1039
1040 pub fn get_sst_ids(&self) -> HashSet<HummockSstableId> {
1041 self.get_sst_infos().map(|s| s.sst_id()).collect()
1042 }
1043
1044 pub fn get_sst_infos(&self) -> impl Iterator<Item = &T> {
1045 self.get_combined_levels()
1046 .flat_map(|level| level.table_infos.iter())
1047 }
1048}
1049
1050impl Levels {
1051 pub(crate) fn apply_compact_ssts(
1052 &mut self,
1053 level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1054 member_table_ids: &BTreeSet<TableId>,
1055 ) {
1056 let IntraLevelDeltaCommon {
1057 level_idx,
1058 l0_sub_level_id,
1059 inserted_table_infos: insert_table_infos,
1060 vnode_partition_count,
1061 removed_table_ids: delete_sst_ids_set,
1062 compaction_group_version_id,
1063 } = level_delta;
1064 let new_vnode_partition_count = *vnode_partition_count;
1065
1066 if is_compaction_task_expired(
1067 self.compaction_group_version_id,
1068 *compaction_group_version_id,
1069 ) {
1070 warn!(
1071 current_compaction_group_version_id = self.compaction_group_version_id,
1072 delta_compaction_group_version_id = compaction_group_version_id,
1073 level_idx,
1074 l0_sub_level_id,
1075 insert_table_infos = ?insert_table_infos
1076 .iter()
1077 .map(|sst| (sst.sst_id, sst.object_id))
1078 .collect_vec(),
1079 ?delete_sst_ids_set,
1080 "This VersionDelta may be committed by an expired compact task. Please check it."
1081 );
1082 return;
1083 }
1084 if !delete_sst_ids_set.is_empty() {
1085 if *level_idx == 0 {
1086 for level in &mut self.l0.sub_levels {
1087 level.delete_ssts(delete_sst_ids_set);
1088 }
1089 } else {
1090 let idx = *level_idx as usize - 1;
1091 self.levels[idx].delete_ssts(delete_sst_ids_set);
1092 }
1093 }
1094
1095 if !insert_table_infos.is_empty() {
1096 let insert_sst_level_id = *level_idx;
1097 let insert_sub_level_id = *l0_sub_level_id;
1098 if insert_sst_level_id == 0 {
1099 let l0 = &mut self.l0;
1100 let index = l0
1101 .sub_levels
1102 .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1103 assert!(
1104 index < l0.sub_levels.len()
1105 && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1106 "should find the level to insert into when applying compaction generated delta. sub level idx: {}, removed sst ids: {:?}, sub levels: {:?},",
1107 insert_sub_level_id,
1108 delete_sst_ids_set,
1109 l0.sub_levels
1110 .iter()
1111 .map(|level| level.sub_level_id)
1112 .collect_vec()
1113 );
1114 if l0.sub_levels[index].table_infos.is_empty()
1115 && member_table_ids.len() == 1
1116 && insert_table_infos.iter().all(|sst| {
1117 sst.table_ids.len() == 1
1118 && sst.table_ids[0]
1119 == *member_table_ids.iter().next().expect("non-empty")
1120 })
1121 {
1122 l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1125 }
1126 level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1127 } else {
1128 let idx = insert_sst_level_id as usize - 1;
1129 if self.levels[idx].table_infos.is_empty()
1130 && insert_table_infos
1131 .iter()
1132 .all(|sst| sst.table_ids.len() == 1)
1133 {
1134 self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1135 } else if self.levels[idx].vnode_partition_count != 0
1136 && new_vnode_partition_count == 0
1137 && member_table_ids.len() > 1
1138 {
1139 self.levels[idx].vnode_partition_count = 0;
1140 }
1141 level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1142 }
1143 }
1144 }
1145
1146 pub(crate) fn prune_table_ids_from_ssts(&mut self, table_ids: &HashSet<TableId>) {
1149 for level in self.l0.sub_levels.iter_mut().chain(self.levels.iter_mut()) {
1150 level.prune_table_ids_from_ssts(table_ids);
1151 }
1152 self.l0.normalize();
1153 self.compaction_group_version_id += 1;
1154 }
1155}
1156
1157impl<T, L> HummockVersionCommon<T, L> {
1158 pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1159 self.levels
1160 .values()
1161 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1162 }
1163}
1164
1165pub fn build_initial_compaction_group_levels(
1166 group_id: impl Into<CompactionGroupId>,
1167 compaction_config: &CompactionConfig,
1168) -> Levels {
1169 let mut levels = vec![];
1170 for l in 0..compaction_config.get_max_level() {
1171 levels.push(Level {
1172 level_idx: (l + 1) as u32,
1173 level_type: PbLevelType::Nonoverlapping,
1174 table_infos: vec![],
1175 total_file_size: 0,
1176 sub_level_id: 0,
1177 uncompressed_file_size: 0,
1178 vnode_partition_count: 0,
1179 });
1180 }
1181 #[expect(deprecated)] Levels {
1183 levels,
1184 l0: OverlappingLevel {
1185 sub_levels: vec![],
1186 total_file_size: 0,
1187 uncompressed_file_size: 0,
1188 },
1189 group_id: group_id.into(),
1190 parent_group_id: 0.into(),
1191 member_table_ids: vec![],
1192 compaction_group_version_id: 0,
1193 }
1194}
1195
1196fn split_sst_info_for_level(
1197 member_table_ids: &BTreeSet<TableId>,
1198 level: &mut Level,
1199 new_sst_id: &mut HummockSstableId,
1200) -> Vec<SstableInfo> {
1201 let mut insert_table_infos = vec![];
1204 for sst_info in &mut level.table_infos {
1205 let removed_table_ids = sst_info
1206 .table_ids
1207 .iter()
1208 .filter(|table_id| member_table_ids.contains(*table_id))
1209 .cloned()
1210 .collect_vec();
1211 let sst_size = sst_info.sst_size;
1212 if sst_size / 2 == 0 {
1213 tracing::warn!(
1214 id = %sst_info.sst_id,
1215 object_id = %sst_info.object_id,
1216 sst_size = sst_info.sst_size,
1217 file_size = sst_info.file_size,
1218 "Sstable sst_size is under expected",
1219 );
1220 };
1221 if !removed_table_ids.is_empty() {
1222 let (modified_sst, branch_sst) = split_sst_with_table_ids(
1223 sst_info,
1224 new_sst_id,
1225 sst_size / 2,
1226 sst_size / 2,
1227 member_table_ids.iter().cloned().collect_vec(),
1228 );
1229 *sst_info = modified_sst;
1230 insert_table_infos.push(branch_sst);
1231 }
1232 }
1233 insert_table_infos
1234}
1235
1236pub fn get_compaction_group_ids(
1238 version: &HummockVersion,
1239) -> impl Iterator<Item = CompactionGroupId> + '_ {
1240 version.levels.keys().cloned()
1241}
1242
1243pub fn get_table_compaction_group_id_mapping(
1244 version: &HummockVersion,
1245) -> HashMap<StateTableId, CompactionGroupId> {
1246 version
1247 .state_table_info
1248 .info()
1249 .iter()
1250 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
1251 .collect()
1252}
1253
1254pub fn get_compaction_group_ssts(
1256 version: &HummockVersion,
1257 group_id: CompactionGroupId,
1258) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1259 let group_levels = version.get_compaction_group_levels(group_id);
1260 group_levels
1261 .l0
1262 .sub_levels
1263 .iter()
1264 .rev()
1265 .chain(group_levels.levels.iter())
1266 .flat_map(|level| {
1267 level
1268 .table_infos
1269 .iter()
1270 .map(|table_info| (table_info.object_id, table_info.sst_id))
1271 })
1272}
1273
1274pub fn new_sub_level(
1275 sub_level_id: u64,
1276 level_type: PbLevelType,
1277 table_infos: Vec<SstableInfo>,
1278) -> Level {
1279 if level_type == PbLevelType::Nonoverlapping {
1280 debug_assert!(
1281 can_concat(&table_infos),
1282 "sst of non-overlapping level is not concat-able: {:?}",
1283 table_infos
1284 );
1285 }
1286 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1287 let uncompressed_file_size = table_infos
1288 .iter()
1289 .map(|table| table.uncompressed_file_size)
1290 .sum();
1291 Level {
1292 level_idx: 0,
1293 level_type,
1294 table_infos,
1295 total_file_size,
1296 sub_level_id,
1297 uncompressed_file_size,
1298 vnode_partition_count: 0,
1299 }
1300}
1301
1302pub fn add_ssts_to_sub_level(
1303 l0: &mut OverlappingLevel,
1304 sub_level_idx: usize,
1305 insert_table_infos: Vec<SstableInfo>,
1306) {
1307 insert_table_infos.iter().for_each(|sst| {
1308 l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1309 l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1310 l0.total_file_size += sst.sst_size;
1311 l0.uncompressed_file_size += sst.uncompressed_file_size;
1312 });
1313 l0.sub_levels[sub_level_idx]
1314 .table_infos
1315 .extend(insert_table_infos);
1316 if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1317 l0.sub_levels[sub_level_idx]
1318 .table_infos
1319 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1320 assert!(
1321 can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1322 "sstable ids: {:?}",
1323 l0.sub_levels[sub_level_idx]
1324 .table_infos
1325 .iter()
1326 .map(|sst| sst.sst_id)
1327 .collect_vec()
1328 );
1329 }
1330}
1331
1332pub fn insert_new_sub_level(
1334 l0: &mut OverlappingLevel,
1335 insert_sub_level_id: u64,
1336 level_type: PbLevelType,
1337 insert_table_infos: Vec<SstableInfo>,
1338 sub_level_insert_hint: Option<usize>,
1339) {
1340 if insert_sub_level_id == u64::MAX {
1341 return;
1342 }
1343 let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1344 insert_pos
1345 } else {
1346 if let Some(newest_level) = l0.sub_levels.last() {
1347 assert!(
1348 newest_level.sub_level_id < insert_sub_level_id,
1349 "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1350 newest_level.sub_level_id,
1351 insert_sub_level_id,
1352 l0,
1353 );
1354 }
1355 l0.sub_levels.len()
1356 };
1357 #[cfg(debug_assertions)]
1358 {
1359 if insert_pos > 0
1360 && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1361 {
1362 debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1363 }
1364 if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1365 debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1366 }
1367 }
1368 let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1371 l0.total_file_size += level.total_file_size;
1372 l0.uncompressed_file_size += level.uncompressed_file_size;
1373 l0.sub_levels.insert(insert_pos, level);
1374}
1375
1376impl Level {
1377 fn recompute_size(&mut self) {
1378 self.total_file_size = self
1379 .table_infos
1380 .iter()
1381 .map(|table| table.sst_size)
1382 .sum::<u64>();
1383 self.uncompressed_file_size = self
1384 .table_infos
1385 .iter()
1386 .map(|table| table.uncompressed_file_size)
1387 .sum::<u64>();
1388 }
1389
1390 fn normalize(&mut self) {
1392 self.table_infos
1393 .retain(|sst_info| !sst_info.table_ids.is_empty());
1394 self.recompute_size();
1395 }
1396
1397 fn delete_ssts(&mut self, ids: &HashSet<HummockSstableId>) -> bool {
1399 let original_len = self.table_infos.len();
1400 self.table_infos
1401 .retain(|table| !ids.contains(&table.sst_id));
1402 self.recompute_size();
1403 original_len != self.table_infos.len()
1404 }
1405
1406 fn prune_table_ids_from_ssts(&mut self, table_ids: &HashSet<TableId>) {
1409 for sstable_info in &mut self.table_infos {
1410 if !sstable_info
1411 .table_ids
1412 .iter()
1413 .any(|table_id| table_ids.contains(table_id))
1414 {
1415 continue;
1416 }
1417
1418 let mut inner = sstable_info.get_inner();
1419 inner.table_ids.retain(|id| !table_ids.contains(id));
1420 sstable_info.set_inner(inner);
1421 }
1422 self.normalize();
1423 }
1424}
1425
1426impl OverlappingLevel {
1427 fn normalize(&mut self) {
1429 self.sub_levels
1430 .retain(|level| !level.table_infos.is_empty());
1431 self.total_file_size = self
1432 .sub_levels
1433 .iter()
1434 .map(|level| level.total_file_size)
1435 .sum::<u64>();
1436 self.uncompressed_file_size = self
1437 .sub_levels
1438 .iter()
1439 .map(|level| level.uncompressed_file_size)
1440 .sum::<u64>();
1441 }
1442}
1443
1444fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1445 fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1446 format!(
1447 "sstable ids: {:?}",
1448 ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1449 )
1450 }
1451 operand.total_file_size += insert_table_infos
1452 .iter()
1453 .map(|sst| sst.sst_size)
1454 .sum::<u64>();
1455 operand.uncompressed_file_size += insert_table_infos
1456 .iter()
1457 .map(|sst| sst.uncompressed_file_size)
1458 .sum::<u64>();
1459 if operand.level_type == PbLevelType::Overlapping {
1460 operand.level_type = PbLevelType::Nonoverlapping;
1461 operand
1462 .table_infos
1463 .extend(insert_table_infos.iter().cloned());
1464 operand
1465 .table_infos
1466 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1467 assert!(
1468 can_concat(&operand.table_infos),
1469 "{}",
1470 display_sstable_infos(&operand.table_infos)
1471 );
1472 } else if !insert_table_infos.is_empty() {
1473 let sorted_insert: Vec<_> = insert_table_infos
1474 .iter()
1475 .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1476 .cloned()
1477 .collect();
1478 let first = &sorted_insert[0];
1479 let last = &sorted_insert[sorted_insert.len() - 1];
1480 let pos = operand
1481 .table_infos
1482 .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1483 if pos >= operand.table_infos.len()
1484 || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1485 {
1486 operand.table_infos.splice(pos..pos, sorted_insert);
1487 let validate_range = operand
1489 .table_infos
1490 .iter()
1491 .skip(pos.saturating_sub(1))
1492 .take(insert_table_infos.len() + 2)
1493 .collect_vec();
1494 assert!(
1495 can_concat(&validate_range),
1496 "{}",
1497 display_sstable_infos(&validate_range),
1498 );
1499 } else {
1500 warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1503 for i in insert_table_infos {
1504 let pos = operand
1505 .table_infos
1506 .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1507 operand.table_infos.insert(pos, i.clone());
1508 }
1509 assert!(
1510 can_concat(&operand.table_infos),
1511 "{}",
1512 display_sstable_infos(&operand.table_infos)
1513 );
1514 }
1515 }
1516}
1517
1518pub fn version_object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1519 match HummockObjectId::Sstable(0.into()) {
1523 HummockObjectId::Sstable(_) => {}
1524 HummockObjectId::VectorFile(_) => {}
1525 HummockObjectId::HnswGraphFile(_) => {}
1526 };
1527 version
1528 .levels
1529 .values()
1530 .flat_map(|cg| {
1531 cg.level0()
1532 .sub_levels
1533 .iter()
1534 .chain(cg.levels.iter())
1535 .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1536 })
1537 .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1538 .chain(
1539 version
1540 .vector_indexes
1541 .values()
1542 .flat_map(|index| index.get_objects()),
1543 )
1544 .collect()
1545}
1546
1547pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1550 let mut res = Vec::new();
1551 for (group_id, levels) in &version.levels {
1553 if levels.group_id != *group_id {
1555 res.push(format!(
1556 "GROUP {}: inconsistent group id {} in Levels",
1557 group_id, levels.group_id
1558 ));
1559 }
1560
1561 let validate_level = |group: CompactionGroupId,
1562 expected_level_idx: u32,
1563 level: &Level,
1564 res: &mut Vec<String>| {
1565 let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1566 if level.level_idx == 0 {
1567 level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1568 if level.table_infos.is_empty() {
1570 res.push(format!("{}: empty level", level_identifier));
1571 }
1572 } else if level.level_type != PbLevelType::Nonoverlapping {
1573 res.push(format!(
1575 "{}: level type {:?} is not non-overlapping",
1576 level_identifier, level.level_type
1577 ));
1578 }
1579
1580 if level.level_idx != expected_level_idx {
1582 res.push(format!(
1583 "{}: mismatched level idx {}",
1584 level_identifier, expected_level_idx
1585 ));
1586 }
1587
1588 let mut prev_table_info: Option<&SstableInfo> = None;
1589 for table_info in &level.table_infos {
1590 if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1592 res.push(format!(
1593 "{} SST {}: table_ids not sorted",
1594 level_identifier, table_info.object_id
1595 ));
1596 }
1597
1598 if level.level_type == PbLevelType::Nonoverlapping {
1600 if let Some(prev) = prev_table_info.take()
1601 && prev
1602 .key_range
1603 .compare_right_with(&table_info.key_range.left)
1604 != Ordering::Less
1605 {
1606 res.push(format!(
1607 "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1608 level_identifier, table_info.object_id, prev, table_info
1609 ));
1610 }
1611 let _ = prev_table_info.insert(table_info);
1612 }
1613 }
1614 };
1615
1616 let l0 = &levels.l0;
1617 let mut prev_sub_level_id = u64::MAX;
1618 for sub_level in &l0.sub_levels {
1619 if sub_level.sub_level_id >= prev_sub_level_id {
1621 res.push(format!(
1622 "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1623 group_id, sub_level.level_idx, prev_sub_level_id
1624 ));
1625 }
1626 prev_sub_level_id = sub_level.sub_level_id;
1627
1628 validate_level(*group_id, 0, sub_level, &mut res);
1629 }
1630
1631 for idx in 1..=levels.levels.len() {
1632 validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1633 }
1634 }
1635 res
1636}
1637
1638#[cfg(test)]
1639mod tests {
1640 use std::collections::{HashMap, HashSet};
1641
1642 use bytes::Bytes;
1643 use risingwave_common::catalog::TableId;
1644 use risingwave_common::hash::VirtualNode;
1645 use risingwave_common::util::epoch::test_epoch;
1646 use risingwave_pb::hummock::{
1647 CompactionConfig, GroupConstruct, GroupDestroy, LevelType, StateTableInfo,
1648 };
1649
1650 use super::group_split;
1651 use crate::HummockVersionId;
1652 use crate::compaction_group::group_split::*;
1653 use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1654 use crate::key::{FullKey, gen_key_from_str};
1655 use crate::key_range::KeyRange;
1656 use crate::level::{Level, Levels, OverlappingLevel};
1657 use crate::sstable_info::{SstableInfo, SstableInfoInner};
1658 use crate::version::{
1659 GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo,
1660 IntraLevelDelta,
1661 };
1662
1663 fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1664 gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1665 }
1666
1667 fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1668 let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1669 let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1670 let full_key_l = FullKey::for_test(
1671 TableId::new(*table_ids.first().unwrap()),
1672 table_key_l,
1673 epoch,
1674 )
1675 .encode();
1676 let full_key_r =
1677 FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1678 .encode();
1679
1680 SstableInfoInner {
1681 sst_id: sst_id.into(),
1682 key_range: KeyRange {
1683 left: full_key_l.into(),
1684 right: full_key_r.into(),
1685 right_exclusive: false,
1686 },
1687 table_ids: table_ids.into_iter().map(Into::into).collect(),
1688 object_id: sst_id.into(),
1689 min_epoch: 20,
1690 max_epoch: 20,
1691 file_size: 100,
1692 sst_size: 100,
1693 ..Default::default()
1694 }
1695 }
1696
1697 #[test]
1698 fn test_get_sst_object_ids() {
1699 let mut version = HummockVersion {
1700 id: HummockVersionId::new(0),
1701 levels: HashMap::from_iter([(
1702 0.into(),
1703 Levels {
1704 levels: vec![],
1705 l0: OverlappingLevel {
1706 sub_levels: vec![],
1707 total_file_size: 0,
1708 uncompressed_file_size: 0,
1709 },
1710 ..Default::default()
1711 },
1712 )]),
1713 ..Default::default()
1714 };
1715 assert_eq!(version.get_object_ids().count(), 0);
1716
1717 version
1719 .levels
1720 .get_mut(&0)
1721 .unwrap()
1722 .l0
1723 .sub_levels
1724 .push(Level {
1725 table_infos: vec![
1726 SstableInfoInner {
1727 object_id: 11.into(),
1728 sst_id: 11.into(),
1729 ..Default::default()
1730 }
1731 .into(),
1732 ],
1733 ..Default::default()
1734 });
1735 assert_eq!(version.get_object_ids().count(), 1);
1736
1737 version.levels.get_mut(&0).unwrap().levels.push(Level {
1739 table_infos: vec![
1740 SstableInfoInner {
1741 object_id: 22.into(),
1742 sst_id: 22.into(),
1743 ..Default::default()
1744 }
1745 .into(),
1746 ],
1747 ..Default::default()
1748 });
1749 assert_eq!(version.get_object_ids().count(), 2);
1750 }
1751
1752 #[test]
1753 fn test_apply_version_delta() {
1754 let mut version = HummockVersion {
1755 id: HummockVersionId::new(0),
1756 levels: HashMap::from_iter([
1757 (
1758 0.into(),
1759 build_initial_compaction_group_levels(
1760 0,
1761 &CompactionConfig {
1762 max_level: 6,
1763 ..Default::default()
1764 },
1765 ),
1766 ),
1767 (
1768 1.into(),
1769 build_initial_compaction_group_levels(
1770 1,
1771 &CompactionConfig {
1772 max_level: 6,
1773 ..Default::default()
1774 },
1775 ),
1776 ),
1777 ]),
1778 ..Default::default()
1779 };
1780 let version_delta = HummockVersionDelta {
1781 id: HummockVersionId::new(1),
1782 group_deltas: HashMap::from_iter([
1783 (
1784 2.into(),
1785 GroupDeltas {
1786 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1787 group_config: Some(CompactionConfig {
1788 max_level: 6,
1789 ..Default::default()
1790 }),
1791 ..Default::default()
1792 }))],
1793 },
1794 ),
1795 (
1796 0.into(),
1797 GroupDeltas {
1798 group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1799 },
1800 ),
1801 (
1802 1.into(),
1803 GroupDeltas {
1804 group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1805 1,
1806 0,
1807 HashSet::new(),
1808 vec![
1809 SstableInfoInner {
1810 object_id: 1.into(),
1811 sst_id: 1.into(),
1812 ..Default::default()
1813 }
1814 .into(),
1815 ],
1816 0,
1817 version
1818 .levels
1819 .get(&1)
1820 .as_ref()
1821 .unwrap()
1822 .compaction_group_version_id,
1823 ))],
1824 },
1825 ),
1826 ]),
1827 ..Default::default()
1828 };
1829 let version_delta = version_delta;
1830
1831 version.apply_version_delta(&version_delta);
1832 let mut cg1 = build_initial_compaction_group_levels(
1833 1,
1834 &CompactionConfig {
1835 max_level: 6,
1836 ..Default::default()
1837 },
1838 );
1839 cg1.levels[0] = Level {
1840 level_idx: 1,
1841 level_type: LevelType::Nonoverlapping,
1842 table_infos: vec![
1843 SstableInfoInner {
1844 object_id: 1.into(),
1845 sst_id: 1.into(),
1846 ..Default::default()
1847 }
1848 .into(),
1849 ],
1850 ..Default::default()
1851 };
1852 assert_eq!(
1853 version,
1854 HummockVersion {
1855 id: HummockVersionId::new(1),
1856 levels: HashMap::from_iter([
1857 (
1858 2.into(),
1859 build_initial_compaction_group_levels(
1860 2,
1861 &CompactionConfig {
1862 max_level: 6,
1863 ..Default::default()
1864 },
1865 ),
1866 ),
1867 (1.into(), cg1),
1868 ]),
1869 ..Default::default()
1870 }
1871 );
1872 }
1873
1874 fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1875 gen_sst_info_impl(object_id, table_ids, left, right).into()
1876 }
1877
1878 fn gen_sst_info_impl(
1879 object_id: u64,
1880 table_ids: Vec<u32>,
1881 left: Bytes,
1882 right: Bytes,
1883 ) -> SstableInfoInner {
1884 SstableInfoInner {
1885 object_id: object_id.into(),
1886 sst_id: object_id.into(),
1887 key_range: KeyRange {
1888 left,
1889 right,
1890 right_exclusive: false,
1891 },
1892 table_ids: table_ids.into_iter().map(Into::into).collect(),
1893 file_size: 100,
1894 sst_size: 100,
1895 uncompressed_file_size: 100,
1896 ..Default::default()
1897 }
1898 }
1899
1900 #[test]
1901 fn test_merge_levels() {
1902 let mut left_levels = build_initial_compaction_group_levels(
1903 1,
1904 &CompactionConfig {
1905 max_level: 6,
1906 ..Default::default()
1907 },
1908 );
1909
1910 let mut right_levels = build_initial_compaction_group_levels(
1911 2,
1912 &CompactionConfig {
1913 max_level: 6,
1914 ..Default::default()
1915 },
1916 );
1917
1918 left_levels.levels[0] = Level {
1919 level_idx: 1,
1920 level_type: LevelType::Nonoverlapping,
1921 table_infos: vec![
1922 gen_sst_info(
1923 1,
1924 vec![3],
1925 FullKey::for_test(
1926 TableId::new(3),
1927 gen_key_from_str(VirtualNode::from_index(1), "1"),
1928 0,
1929 )
1930 .encode()
1931 .into(),
1932 FullKey::for_test(
1933 TableId::new(3),
1934 gen_key_from_str(VirtualNode::from_index(200), "1"),
1935 0,
1936 )
1937 .encode()
1938 .into(),
1939 ),
1940 gen_sst_info(
1941 10,
1942 vec![3, 4],
1943 FullKey::for_test(
1944 TableId::new(3),
1945 gen_key_from_str(VirtualNode::from_index(201), "1"),
1946 0,
1947 )
1948 .encode()
1949 .into(),
1950 FullKey::for_test(
1951 TableId::new(4),
1952 gen_key_from_str(VirtualNode::from_index(10), "1"),
1953 0,
1954 )
1955 .encode()
1956 .into(),
1957 ),
1958 gen_sst_info(
1959 11,
1960 vec![4],
1961 FullKey::for_test(
1962 TableId::new(4),
1963 gen_key_from_str(VirtualNode::from_index(11), "1"),
1964 0,
1965 )
1966 .encode()
1967 .into(),
1968 FullKey::for_test(
1969 TableId::new(4),
1970 gen_key_from_str(VirtualNode::from_index(200), "1"),
1971 0,
1972 )
1973 .encode()
1974 .into(),
1975 ),
1976 ],
1977 total_file_size: 300,
1978 ..Default::default()
1979 };
1980
1981 left_levels.l0.sub_levels.push(Level {
1982 level_idx: 0,
1983 table_infos: vec![gen_sst_info(
1984 3,
1985 vec![3],
1986 FullKey::for_test(
1987 TableId::new(3),
1988 gen_key_from_str(VirtualNode::from_index(1), "1"),
1989 0,
1990 )
1991 .encode()
1992 .into(),
1993 FullKey::for_test(
1994 TableId::new(3),
1995 gen_key_from_str(VirtualNode::from_index(200), "1"),
1996 0,
1997 )
1998 .encode()
1999 .into(),
2000 )],
2001 sub_level_id: 101,
2002 level_type: LevelType::Overlapping,
2003 total_file_size: 100,
2004 ..Default::default()
2005 });
2006
2007 left_levels.l0.sub_levels.push(Level {
2008 level_idx: 0,
2009 table_infos: vec![gen_sst_info(
2010 3,
2011 vec![3],
2012 FullKey::for_test(
2013 TableId::new(3),
2014 gen_key_from_str(VirtualNode::from_index(1), "1"),
2015 0,
2016 )
2017 .encode()
2018 .into(),
2019 FullKey::for_test(
2020 TableId::new(3),
2021 gen_key_from_str(VirtualNode::from_index(200), "1"),
2022 0,
2023 )
2024 .encode()
2025 .into(),
2026 )],
2027 sub_level_id: 103,
2028 level_type: LevelType::Overlapping,
2029 total_file_size: 100,
2030 ..Default::default()
2031 });
2032
2033 left_levels.l0.sub_levels.push(Level {
2034 level_idx: 0,
2035 table_infos: vec![gen_sst_info(
2036 3,
2037 vec![3],
2038 FullKey::for_test(
2039 TableId::new(3),
2040 gen_key_from_str(VirtualNode::from_index(1), "1"),
2041 0,
2042 )
2043 .encode()
2044 .into(),
2045 FullKey::for_test(
2046 TableId::new(3),
2047 gen_key_from_str(VirtualNode::from_index(200), "1"),
2048 0,
2049 )
2050 .encode()
2051 .into(),
2052 )],
2053 sub_level_id: 105,
2054 level_type: LevelType::Nonoverlapping,
2055 total_file_size: 100,
2056 ..Default::default()
2057 });
2058
2059 right_levels.levels[0] = Level {
2060 level_idx: 1,
2061 level_type: LevelType::Nonoverlapping,
2062 table_infos: vec![
2063 gen_sst_info(
2064 1,
2065 vec![5],
2066 FullKey::for_test(
2067 TableId::new(5),
2068 gen_key_from_str(VirtualNode::from_index(1), "1"),
2069 0,
2070 )
2071 .encode()
2072 .into(),
2073 FullKey::for_test(
2074 TableId::new(5),
2075 gen_key_from_str(VirtualNode::from_index(200), "1"),
2076 0,
2077 )
2078 .encode()
2079 .into(),
2080 ),
2081 gen_sst_info(
2082 10,
2083 vec![5, 6],
2084 FullKey::for_test(
2085 TableId::new(5),
2086 gen_key_from_str(VirtualNode::from_index(201), "1"),
2087 0,
2088 )
2089 .encode()
2090 .into(),
2091 FullKey::for_test(
2092 TableId::new(6),
2093 gen_key_from_str(VirtualNode::from_index(10), "1"),
2094 0,
2095 )
2096 .encode()
2097 .into(),
2098 ),
2099 gen_sst_info(
2100 11,
2101 vec![6],
2102 FullKey::for_test(
2103 TableId::new(6),
2104 gen_key_from_str(VirtualNode::from_index(11), "1"),
2105 0,
2106 )
2107 .encode()
2108 .into(),
2109 FullKey::for_test(
2110 TableId::new(6),
2111 gen_key_from_str(VirtualNode::from_index(200), "1"),
2112 0,
2113 )
2114 .encode()
2115 .into(),
2116 ),
2117 ],
2118 total_file_size: 300,
2119 ..Default::default()
2120 };
2121
2122 right_levels.l0.sub_levels.push(Level {
2123 level_idx: 0,
2124 table_infos: vec![gen_sst_info(
2125 3,
2126 vec![5],
2127 FullKey::for_test(
2128 TableId::new(5),
2129 gen_key_from_str(VirtualNode::from_index(1), "1"),
2130 0,
2131 )
2132 .encode()
2133 .into(),
2134 FullKey::for_test(
2135 TableId::new(5),
2136 gen_key_from_str(VirtualNode::from_index(200), "1"),
2137 0,
2138 )
2139 .encode()
2140 .into(),
2141 )],
2142 sub_level_id: 101,
2143 level_type: LevelType::Overlapping,
2144 total_file_size: 100,
2145 ..Default::default()
2146 });
2147
2148 right_levels.l0.sub_levels.push(Level {
2149 level_idx: 0,
2150 table_infos: vec![gen_sst_info(
2151 5,
2152 vec![5],
2153 FullKey::for_test(
2154 TableId::new(5),
2155 gen_key_from_str(VirtualNode::from_index(1), "1"),
2156 0,
2157 )
2158 .encode()
2159 .into(),
2160 FullKey::for_test(
2161 TableId::new(5),
2162 gen_key_from_str(VirtualNode::from_index(200), "1"),
2163 0,
2164 )
2165 .encode()
2166 .into(),
2167 )],
2168 sub_level_id: 102,
2169 level_type: LevelType::Overlapping,
2170 total_file_size: 100,
2171 ..Default::default()
2172 });
2173
2174 right_levels.l0.sub_levels.push(Level {
2175 level_idx: 0,
2176 table_infos: vec![gen_sst_info(
2177 3,
2178 vec![5],
2179 FullKey::for_test(
2180 TableId::new(5),
2181 gen_key_from_str(VirtualNode::from_index(1), "1"),
2182 0,
2183 )
2184 .encode()
2185 .into(),
2186 FullKey::for_test(
2187 TableId::new(5),
2188 gen_key_from_str(VirtualNode::from_index(200), "1"),
2189 0,
2190 )
2191 .encode()
2192 .into(),
2193 )],
2194 sub_level_id: 103,
2195 level_type: LevelType::Nonoverlapping,
2196 total_file_size: 100,
2197 ..Default::default()
2198 });
2199
2200 {
2201 let mut left_levels = Levels::default();
2203 let right_levels = Levels::default();
2204
2205 group_split::merge_levels(&mut left_levels, right_levels);
2206 }
2207
2208 {
2209 let mut left_levels = build_initial_compaction_group_levels(
2211 1,
2212 &CompactionConfig {
2213 max_level: 6,
2214 ..Default::default()
2215 },
2216 );
2217 let right_levels = right_levels.clone();
2218
2219 group_split::merge_levels(&mut left_levels, right_levels);
2220
2221 assert!(left_levels.l0.sub_levels.len() == 3);
2222 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2223 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2224 assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2225 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2226 assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2227 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2228
2229 assert!(left_levels.levels[0].level_idx == 1);
2230 assert_eq!(300, left_levels.levels[0].total_file_size);
2231 }
2232
2233 {
2234 let mut left_levels = left_levels.clone();
2236 let right_levels = build_initial_compaction_group_levels(
2237 2,
2238 &CompactionConfig {
2239 max_level: 6,
2240 ..Default::default()
2241 },
2242 );
2243
2244 group_split::merge_levels(&mut left_levels, right_levels);
2245
2246 assert!(left_levels.l0.sub_levels.len() == 3);
2247 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2248 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2249 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2250 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2251 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2252 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2253
2254 assert!(left_levels.levels[0].level_idx == 1);
2255 assert_eq!(300, left_levels.levels[0].total_file_size);
2256 }
2257
2258 {
2259 let mut left_levels = left_levels.clone();
2260 let right_levels = right_levels.clone();
2261
2262 group_split::merge_levels(&mut left_levels, right_levels);
2263
2264 assert!(left_levels.l0.sub_levels.len() == 6);
2265 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2266 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2267 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2268 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2269 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2270 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2271 assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2272 assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2273 assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2274 assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2275 assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2276 assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2277
2278 assert!(left_levels.levels[0].level_idx == 1);
2279 assert_eq!(600, left_levels.levels[0].total_file_size);
2280 }
2281 }
2282
2283 #[test]
2284 fn test_get_split_pos() {
2285 let epoch = test_epoch(1);
2286 let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2287 let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2288 let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2289
2290 let ssts = vec![s1, s2, s3];
2291 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2292
2293 let pos = group_split::get_split_pos(&ssts, split_key.clone());
2294 assert_eq!(1, pos);
2295
2296 let pos = group_split::get_split_pos(&vec![], split_key);
2297 assert_eq!(0, pos);
2298 }
2299
2300 #[test]
2301 fn test_split_sst() {
2302 let epoch = test_epoch(1);
2303 let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2304
2305 {
2306 let split_key = group_split::build_split_key(3.into(), VirtualNode::ZERO);
2307 let origin_sst = sst.clone();
2308 let sst_size = origin_sst.sst_size;
2309 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2310 assert_eq!(SstSplitType::Both, split_type);
2311
2312 let mut new_sst_id = 10.into();
2313 let (origin_sst, branched_sst) = group_split::split_sst(
2314 origin_sst,
2315 &mut new_sst_id,
2316 split_key,
2317 sst_size / 2,
2318 sst_size / 2,
2319 );
2320
2321 let origin_sst = origin_sst.unwrap();
2322 let branched_sst = branched_sst.unwrap();
2323
2324 assert!(origin_sst.key_range.right_exclusive);
2325 assert!(
2326 origin_sst
2327 .key_range
2328 .right
2329 .cmp(&branched_sst.key_range.left)
2330 .is_le()
2331 );
2332 assert!(origin_sst.table_ids.is_sorted());
2333 assert!(branched_sst.table_ids.is_sorted());
2334 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2335 assert!(branched_sst.sst_size < origin_sst.file_size);
2336 assert_eq!(10, branched_sst.sst_id);
2337 assert_eq!(11, origin_sst.sst_id);
2338 assert_eq!(3, branched_sst.table_ids.first().unwrap().as_raw_id()); }
2340
2341 {
2342 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2344 let origin_sst = sst.clone();
2345 let sst_size = origin_sst.sst_size;
2346 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2347 assert_eq!(SstSplitType::Both, split_type);
2348
2349 let mut new_sst_id = 10.into();
2350 let (origin_sst, branched_sst) = group_split::split_sst(
2351 origin_sst,
2352 &mut new_sst_id,
2353 split_key,
2354 sst_size / 2,
2355 sst_size / 2,
2356 );
2357
2358 let origin_sst = origin_sst.unwrap();
2359 let branched_sst = branched_sst.unwrap();
2360
2361 assert!(origin_sst.key_range.right_exclusive);
2362 assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2363 assert!(origin_sst.table_ids.is_sorted());
2364 assert!(branched_sst.table_ids.is_sorted());
2365 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2366 assert!(branched_sst.sst_size < origin_sst.file_size);
2367 assert_eq!(10, branched_sst.sst_id);
2368 assert_eq!(11, origin_sst.sst_id);
2369 assert_eq!(5, branched_sst.table_ids.first().unwrap().as_raw_id()); }
2371
2372 {
2373 let split_key = group_split::build_split_key(6.into(), VirtualNode::ZERO);
2374 let split_type = group_split::need_to_split(&sst, split_key);
2375 assert_eq!(SstSplitType::Left, split_type);
2376 }
2377
2378 {
2379 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2380 let origin_sst = sst.clone();
2381 let split_type = group_split::need_to_split(&origin_sst, split_key);
2382 assert_eq!(SstSplitType::Both, split_type);
2383
2384 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2385 let origin_sst = sst;
2386 let split_type = group_split::need_to_split(&origin_sst, split_key);
2387 assert_eq!(SstSplitType::Right, split_type);
2388 }
2389
2390 {
2391 let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2393 sst.key_range.right = sst.key_range.left.clone();
2394 let sst: SstableInfo = sst.into();
2395 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2396 let origin_sst = sst;
2397 let sst_size = origin_sst.sst_size;
2398
2399 let mut new_sst_id = 10.into();
2400 let (origin_sst, branched_sst) = group_split::split_sst(
2401 origin_sst,
2402 &mut new_sst_id,
2403 split_key,
2404 sst_size / 2,
2405 sst_size / 2,
2406 );
2407
2408 assert!(origin_sst.is_none());
2409 assert!(branched_sst.is_some());
2410 }
2411 }
2412
2413 #[test]
2414 fn test_split_sst_info_for_level() {
2415 let mut version = HummockVersion {
2416 id: HummockVersionId::new(0),
2417 levels: HashMap::from_iter([(
2418 1.into(),
2419 build_initial_compaction_group_levels(
2420 1,
2421 &CompactionConfig {
2422 max_level: 6,
2423 ..Default::default()
2424 },
2425 ),
2426 )]),
2427 ..Default::default()
2428 };
2429
2430 let cg1 = version.levels.get_mut(&1).unwrap();
2431
2432 cg1.levels[0] = Level {
2433 level_idx: 1,
2434 level_type: LevelType::Nonoverlapping,
2435 table_infos: vec![
2436 gen_sst_info(
2437 1,
2438 vec![3],
2439 FullKey::for_test(
2440 TableId::new(3),
2441 gen_key_from_str(VirtualNode::from_index(1), "1"),
2442 0,
2443 )
2444 .encode()
2445 .into(),
2446 FullKey::for_test(
2447 TableId::new(3),
2448 gen_key_from_str(VirtualNode::from_index(200), "1"),
2449 0,
2450 )
2451 .encode()
2452 .into(),
2453 ),
2454 gen_sst_info(
2455 10,
2456 vec![3, 4],
2457 FullKey::for_test(
2458 TableId::new(3),
2459 gen_key_from_str(VirtualNode::from_index(201), "1"),
2460 0,
2461 )
2462 .encode()
2463 .into(),
2464 FullKey::for_test(
2465 TableId::new(4),
2466 gen_key_from_str(VirtualNode::from_index(10), "1"),
2467 0,
2468 )
2469 .encode()
2470 .into(),
2471 ),
2472 gen_sst_info(
2473 11,
2474 vec![4],
2475 FullKey::for_test(
2476 TableId::new(4),
2477 gen_key_from_str(VirtualNode::from_index(11), "1"),
2478 0,
2479 )
2480 .encode()
2481 .into(),
2482 FullKey::for_test(
2483 TableId::new(4),
2484 gen_key_from_str(VirtualNode::from_index(200), "1"),
2485 0,
2486 )
2487 .encode()
2488 .into(),
2489 ),
2490 ],
2491 total_file_size: 300,
2492 ..Default::default()
2493 };
2494
2495 cg1.l0.sub_levels.push(Level {
2496 level_idx: 0,
2497 table_infos: vec![
2498 gen_sst_info(
2499 2,
2500 vec![2],
2501 FullKey::for_test(
2502 TableId::new(0),
2503 gen_key_from_str(VirtualNode::from_index(1), "1"),
2504 0,
2505 )
2506 .encode()
2507 .into(),
2508 FullKey::for_test(
2509 TableId::new(2),
2510 gen_key_from_str(VirtualNode::from_index(200), "1"),
2511 0,
2512 )
2513 .encode()
2514 .into(),
2515 ),
2516 gen_sst_info(
2517 22,
2518 vec![2],
2519 FullKey::for_test(
2520 TableId::new(0),
2521 gen_key_from_str(VirtualNode::from_index(1), "1"),
2522 0,
2523 )
2524 .encode()
2525 .into(),
2526 FullKey::for_test(
2527 TableId::new(2),
2528 gen_key_from_str(VirtualNode::from_index(200), "1"),
2529 0,
2530 )
2531 .encode()
2532 .into(),
2533 ),
2534 gen_sst_info(
2535 23,
2536 vec![2],
2537 FullKey::for_test(
2538 TableId::new(0),
2539 gen_key_from_str(VirtualNode::from_index(1), "1"),
2540 0,
2541 )
2542 .encode()
2543 .into(),
2544 FullKey::for_test(
2545 TableId::new(2),
2546 gen_key_from_str(VirtualNode::from_index(200), "1"),
2547 0,
2548 )
2549 .encode()
2550 .into(),
2551 ),
2552 gen_sst_info(
2553 24,
2554 vec![2],
2555 FullKey::for_test(
2556 TableId::new(2),
2557 gen_key_from_str(VirtualNode::from_index(1), "1"),
2558 0,
2559 )
2560 .encode()
2561 .into(),
2562 FullKey::for_test(
2563 TableId::new(2),
2564 gen_key_from_str(VirtualNode::from_index(200), "1"),
2565 0,
2566 )
2567 .encode()
2568 .into(),
2569 ),
2570 gen_sst_info(
2571 25,
2572 vec![2],
2573 FullKey::for_test(
2574 TableId::new(0),
2575 gen_key_from_str(VirtualNode::from_index(1), "1"),
2576 0,
2577 )
2578 .encode()
2579 .into(),
2580 FullKey::for_test(
2581 TableId::new(0),
2582 gen_key_from_str(VirtualNode::from_index(200), "1"),
2583 0,
2584 )
2585 .encode()
2586 .into(),
2587 ),
2588 ],
2589 sub_level_id: 101,
2590 level_type: LevelType::Overlapping,
2591 total_file_size: 300,
2592 ..Default::default()
2593 });
2594
2595 {
2596 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2598
2599 let mut new_sst_id = 100.into();
2600 let x = group_split::split_sst_info_for_level_v2(
2601 &mut cg1.l0.sub_levels[0],
2602 &mut new_sst_id,
2603 split_key,
2604 );
2605 let mut right_l0 = OverlappingLevel {
2614 sub_levels: vec![],
2615 total_file_size: 0,
2616 uncompressed_file_size: 0,
2617 };
2618
2619 right_l0.sub_levels.push(Level {
2620 level_idx: 0,
2621 table_infos: x,
2622 sub_level_id: 101,
2623 total_file_size: 100,
2624 level_type: LevelType::Overlapping,
2625 ..Default::default()
2626 });
2627
2628 let right_levels = Levels {
2629 levels: vec![],
2630 l0: right_l0,
2631 ..Default::default()
2632 };
2633
2634 merge_levels(cg1, right_levels);
2635 }
2636
2637 {
2638 let mut new_sst_id = 100.into();
2640 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2641 let x = group_split::split_sst_info_for_level_v2(
2642 &mut cg1.levels[2],
2643 &mut new_sst_id,
2644 split_key,
2645 );
2646
2647 assert!(x.is_empty());
2648 }
2649
2650 {
2651 let mut cg1 = cg1.clone();
2653 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2654
2655 let mut new_sst_id = 100.into();
2656 let x = group_split::split_sst_info_for_level_v2(
2657 &mut cg1.levels[0],
2658 &mut new_sst_id,
2659 split_key,
2660 );
2661
2662 assert_eq!(3, x.len());
2663 assert_eq!(1, x[0].sst_id);
2664 assert_eq!(100, x[0].sst_size);
2665 assert_eq!(10, x[1].sst_id);
2666 assert_eq!(100, x[1].sst_size);
2667 assert_eq!(11, x[2].sst_id);
2668 assert_eq!(100, x[2].sst_size);
2669
2670 assert_eq!(0, cg1.levels[0].table_infos.len());
2671 }
2672
2673 {
2674 let mut cg1 = cg1.clone();
2676 let split_key = group_split::build_split_key(5.into(), VirtualNode::ZERO);
2677
2678 let mut new_sst_id = 100.into();
2679 let x = group_split::split_sst_info_for_level_v2(
2680 &mut cg1.levels[0],
2681 &mut new_sst_id,
2682 split_key,
2683 );
2684
2685 assert_eq!(0, x.len());
2686 assert_eq!(3, cg1.levels[0].table_infos.len());
2687 }
2688
2689 {
2715 let mut cg1 = cg1.clone();
2717 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2718
2719 let mut new_sst_id = 100.into();
2720 let x = group_split::split_sst_info_for_level_v2(
2721 &mut cg1.levels[0],
2722 &mut new_sst_id,
2723 split_key,
2724 );
2725
2726 assert_eq!(2, x.len());
2727 assert_eq!(100, x[0].sst_id);
2728 assert_eq!(100 / 2, x[0].sst_size);
2729 assert_eq!(11, x[1].sst_id);
2730 assert_eq!(100, x[1].sst_size);
2731 assert_eq!(vec![TableId::new(4)], x[1].table_ids);
2732
2733 assert_eq!(2, cg1.levels[0].table_infos.len());
2734 assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2735 assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2736 assert_eq!(
2737 vec![TableId::new(3)],
2738 cg1.levels[0].table_infos[1].table_ids
2739 );
2740 }
2741 }
2742
2743 fn make_sst(sst_id: u64, table_ids: Vec<u32>, sst_size: u64) -> SstableInfo {
2744 SstableInfoInner {
2745 sst_id: sst_id.into(),
2746 object_id: sst_id.into(),
2747 table_ids: table_ids.into_iter().map(TableId::new).collect(),
2748 file_size: sst_size,
2749 sst_size,
2750 uncompressed_file_size: sst_size * 2,
2751 ..Default::default()
2752 }
2753 .into()
2754 }
2755
2756 #[test]
2757 fn test_level_normalize() {
2758 let mut level = Level {
2760 level_idx: 1,
2761 level_type: LevelType::Nonoverlapping,
2762 table_infos: vec![
2763 make_sst(1, vec![1, 2], 100),
2764 make_sst(2, vec![], 200), make_sst(3, vec![3], 300),
2766 make_sst(4, vec![], 400), ],
2768 total_file_size: 9999, uncompressed_file_size: 9999,
2770 ..Default::default()
2771 };
2772
2773 level.normalize();
2774
2775 assert_eq!(level.table_infos.len(), 2);
2776 assert_eq!(1, level.table_infos[0].sst_id);
2777 assert_eq!(3, level.table_infos[1].sst_id);
2778 assert_eq!(level.total_file_size, 400);
2779 assert_eq!(level.uncompressed_file_size, 800);
2780
2781 level.total_file_size = 0;
2783 level.uncompressed_file_size = 0;
2784 level.normalize();
2785 assert_eq!(level.table_infos.len(), 2);
2786 assert_eq!(level.total_file_size, 400);
2787 assert_eq!(level.uncompressed_file_size, 800);
2788
2789 level.table_infos = vec![make_sst(10, vec![], 100), make_sst(11, vec![], 200)];
2791 level.normalize();
2792 assert!(level.table_infos.is_empty());
2793 assert_eq!(level.total_file_size, 0);
2794 assert_eq!(level.uncompressed_file_size, 0);
2795 }
2796
2797 #[test]
2798 fn test_level_delete_ssts() {
2799 let mut level = Level {
2800 level_idx: 1,
2801 level_type: LevelType::Nonoverlapping,
2802 table_infos: vec![
2803 make_sst(1, vec![1], 100),
2804 make_sst(2, vec![2], 200),
2805 make_sst(3, vec![3], 300),
2806 ],
2807 total_file_size: 600,
2808 uncompressed_file_size: 1200,
2809 ..Default::default()
2810 };
2811
2812 let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([2.into()]);
2813 let changed = level.delete_ssts(&delete_ids);
2814
2815 assert!(changed);
2816 assert_eq!(level.table_infos.len(), 2);
2817 assert_eq!(1, level.table_infos[0].sst_id);
2818 assert_eq!(3, level.table_infos[1].sst_id);
2819 assert_eq!(level.total_file_size, 400);
2820 assert_eq!(level.uncompressed_file_size, 800);
2821
2822 let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([999.into()]);
2824 let changed = level.delete_ssts(&delete_ids);
2825 assert!(!changed);
2826 assert_eq!(level.table_infos.len(), 2);
2827 }
2828
2829 #[test]
2830 fn test_level_prune_table_ids_from_ssts() {
2831 let mut level = Level {
2832 level_idx: 1,
2833 level_type: LevelType::Nonoverlapping,
2834 table_infos: vec![
2835 make_sst(1, vec![1, 2], 100),
2836 make_sst(2, vec![2, 3], 200),
2837 make_sst(3, vec![2], 300),
2838 ],
2839 total_file_size: 600,
2840 uncompressed_file_size: 1200,
2841 ..Default::default()
2842 };
2843
2844 let pruned_table_ids = HashSet::from([TableId::new(2)]);
2845 level.prune_table_ids_from_ssts(&pruned_table_ids);
2846
2847 assert_eq!(level.table_infos.len(), 2);
2849 assert_eq!(level.table_infos[0].table_ids, vec![TableId::new(1)]);
2850 assert_eq!(level.table_infos[1].table_ids, vec![TableId::new(3)]);
2851 assert_eq!(level.total_file_size, 100 + 200);
2852 assert_eq!(level.uncompressed_file_size, 200 + 400);
2853
2854 let pruned_table_ids = HashSet::from([TableId::new(1), TableId::new(3)]);
2856 level.prune_table_ids_from_ssts(&pruned_table_ids);
2857 assert!(level.table_infos.is_empty());
2858 assert_eq!(level.total_file_size, 0);
2859 assert_eq!(level.uncompressed_file_size, 0);
2860 }
2861
2862 #[test]
2863 fn test_overlapping_level_normalize() {
2864 let mut l0 = OverlappingLevel {
2865 sub_levels: vec![
2866 Level {
2867 level_idx: 0,
2868 table_infos: vec![make_sst(1, vec![1], 100)],
2869 total_file_size: 100,
2870 uncompressed_file_size: 200,
2871 sub_level_id: 1,
2872 ..Default::default()
2873 },
2874 Level {
2875 level_idx: 0,
2876 table_infos: vec![], total_file_size: 0,
2878 uncompressed_file_size: 0,
2879 sub_level_id: 2,
2880 ..Default::default()
2881 },
2882 Level {
2883 level_idx: 0,
2884 table_infos: vec![make_sst(3, vec![3], 300)],
2885 total_file_size: 300,
2886 uncompressed_file_size: 600,
2887 sub_level_id: 3,
2888 ..Default::default()
2889 },
2890 ],
2891 total_file_size: 9999, uncompressed_file_size: 9999,
2893 };
2894
2895 l0.normalize();
2896
2897 assert_eq!(l0.sub_levels.len(), 2);
2898 assert_eq!(l0.sub_levels[0].sub_level_id, 1);
2899 assert_eq!(l0.sub_levels[1].sub_level_id, 3);
2900 assert_eq!(l0.total_file_size, 100 + 300);
2901 assert_eq!(l0.uncompressed_file_size, 200 + 600);
2902
2903 l0.sub_levels = vec![Level {
2905 level_idx: 0,
2906 table_infos: vec![],
2907 ..Default::default()
2908 }];
2909 l0.normalize();
2910 assert!(l0.sub_levels.is_empty());
2911 assert_eq!(l0.total_file_size, 0);
2912 assert_eq!(l0.uncompressed_file_size, 0);
2913 }
2914
2915 #[test]
2916 fn test_levels_prune_table_ids_from_ssts() {
2917 #[expect(deprecated)]
2918 let mut levels = Levels {
2919 l0: OverlappingLevel {
2920 sub_levels: vec![
2921 Level {
2922 level_idx: 0,
2923 table_infos: vec![
2924 make_sst(1, vec![10], 100), make_sst(2, vec![20], 200), ],
2927 total_file_size: 300,
2928 uncompressed_file_size: 600,
2929 sub_level_id: 1,
2930 ..Default::default()
2931 },
2932 Level {
2933 level_idx: 0,
2934 table_infos: vec![
2935 make_sst(3, vec![10], 150), ],
2937 total_file_size: 150,
2938 uncompressed_file_size: 300,
2939 sub_level_id: 2,
2940 ..Default::default()
2941 },
2942 ],
2943 total_file_size: 450,
2944 uncompressed_file_size: 900,
2945 },
2946 levels: vec![Level {
2947 level_idx: 1,
2948 level_type: LevelType::Nonoverlapping,
2949 table_infos: vec![
2950 make_sst(4, vec![10, 20], 400), make_sst(5, vec![10], 500), ],
2953 total_file_size: 900,
2954 uncompressed_file_size: 1800,
2955 ..Default::default()
2956 }],
2957 group_id: 1.into(),
2958 parent_group_id: 0.into(),
2959 member_table_ids: vec![],
2960 compaction_group_version_id: 0,
2961 };
2962
2963 levels.prune_table_ids_from_ssts(&HashSet::from([TableId::new(10)]));
2965
2966 assert_eq!(levels.l0.sub_levels.len(), 1);
2967 assert_eq!(levels.l0.sub_levels[0].sub_level_id, 1);
2968 assert_eq!(levels.l0.sub_levels[0].table_infos.len(), 1);
2969 assert_eq!(2, levels.l0.sub_levels[0].table_infos[0].sst_id);
2970 assert_eq!(levels.l0.sub_levels[0].total_file_size, 200);
2971 assert_eq!(levels.l0.sub_levels[0].uncompressed_file_size, 400);
2972
2973 assert_eq!(levels.l0.total_file_size, 200);
2974 assert_eq!(levels.l0.uncompressed_file_size, 400);
2975
2976 assert_eq!(levels.levels[0].table_infos.len(), 1);
2977 assert_eq!(4, levels.levels[0].table_infos[0].sst_id);
2978 assert_eq!(
2979 levels.levels[0].table_infos[0].table_ids,
2980 vec![TableId::new(20)]
2981 );
2982 assert_eq!(levels.levels[0].total_file_size, 400);
2983 assert_eq!(levels.levels[0].uncompressed_file_size, 800);
2984
2985 assert_eq!(levels.compaction_group_version_id, 1);
2986 }
2987
2988 #[test]
2989 fn test_apply_version_delta_prune_table_ids_from_ssts() {
2990 let mut version = HummockVersion {
2991 id: HummockVersionId::new(0),
2992 levels: HashMap::from_iter([(1.into(), {
2993 #[expect(deprecated)]
2994 let levels = Levels {
2995 l0: OverlappingLevel {
2996 sub_levels: vec![
2997 Level {
2998 level_idx: 0,
2999 level_type: LevelType::Overlapping,
3000 table_infos: vec![
3001 make_sst(1, vec![100], 50), make_sst(2, vec![200], 60), ],
3004 total_file_size: 110,
3005 uncompressed_file_size: 220,
3006 sub_level_id: 1,
3007 ..Default::default()
3008 },
3009 Level {
3010 level_idx: 0,
3011 level_type: LevelType::Overlapping,
3012 table_infos: vec![
3013 make_sst(3, vec![100], 70), ],
3015 total_file_size: 70,
3016 uncompressed_file_size: 140,
3017 sub_level_id: 2,
3018 ..Default::default()
3019 },
3020 ],
3021 total_file_size: 180,
3022 uncompressed_file_size: 360,
3023 },
3024 levels: vec![Level {
3025 level_idx: 1,
3026 level_type: LevelType::Nonoverlapping,
3027 table_infos: vec![
3028 make_sst(4, vec![100, 200], 80), make_sst(5, vec![100], 90), ],
3031 total_file_size: 170,
3032 uncompressed_file_size: 340,
3033 ..Default::default()
3034 }],
3035 group_id: 1.into(),
3036 parent_group_id: 0.into(),
3037 member_table_ids: vec![],
3038 compaction_group_version_id: 0,
3039 };
3040 levels
3041 })]),
3042 ..Default::default()
3043 };
3044
3045 let version_delta = HummockVersionDelta {
3046 id: HummockVersionId::new(1),
3047 group_deltas: HashMap::from_iter([(
3048 1.into(),
3049 GroupDeltas {
3050 group_deltas: vec![GroupDelta::PruneTableIdsFromSsts(HashSet::from([
3051 TableId::new(100),
3052 ]))],
3053 },
3054 )]),
3055 ..Default::default()
3056 };
3057
3058 version.apply_version_delta(&version_delta);
3059
3060 let cg = version.get_compaction_group_levels(1.into());
3061
3062 assert_eq!(
3063 cg.l0.sub_levels.len(),
3064 1,
3065 "empty sub-level should be removed"
3066 );
3067 assert_eq!(cg.l0.sub_levels[0].sub_level_id, 1);
3068 assert_eq!(cg.l0.sub_levels[0].table_infos.len(), 1);
3069 assert_eq!(2, cg.l0.sub_levels[0].table_infos[0].sst_id);
3070 assert_eq!(
3071 cg.l0.sub_levels[0].table_infos[0].table_ids,
3072 vec![TableId::new(200)]
3073 );
3074
3075 assert_eq!(cg.l0.total_file_size, 60);
3076 assert_eq!(cg.l0.uncompressed_file_size, 120);
3077
3078 assert_eq!(cg.levels[0].table_infos.len(), 1);
3079 assert_eq!(4, cg.levels[0].table_infos[0].sst_id);
3080 assert_eq!(
3081 cg.levels[0].table_infos[0].table_ids,
3082 vec![TableId::new(200)]
3083 );
3084 assert_eq!(cg.levels[0].total_file_size, 80);
3085 assert_eq!(cg.levels[0].uncompressed_file_size, 160);
3086
3087 assert_eq!(cg.compaction_group_version_id, 1);
3088 }
3089
3090 #[test]
3091 fn test_prune_stale_table_ids_from_ssts() {
3092 let live_table_id = TableId::new(100);
3093 let stale_table_id = TableId::new(200);
3094 let mut version = HummockVersion {
3095 id: HummockVersionId::new(0),
3096 levels: HashMap::from_iter([(1.into(), {
3097 #[expect(deprecated)]
3098 let levels = Levels {
3099 l0: OverlappingLevel {
3100 sub_levels: vec![Level {
3101 level_idx: 0,
3102 level_type: LevelType::Overlapping,
3103 table_infos: vec![make_sst(
3104 1,
3105 vec![live_table_id.as_raw_id(), stale_table_id.as_raw_id()],
3106 50,
3107 )],
3108 total_file_size: 50,
3109 uncompressed_file_size: 100,
3110 sub_level_id: 1,
3111 ..Default::default()
3112 }],
3113 total_file_size: 50,
3114 uncompressed_file_size: 100,
3115 },
3116 levels: vec![Level {
3117 level_idx: 1,
3118 level_type: LevelType::Nonoverlapping,
3119 table_infos: vec![make_sst(2, vec![stale_table_id.as_raw_id()], 60)],
3120 total_file_size: 60,
3121 uncompressed_file_size: 120,
3122 ..Default::default()
3123 }],
3124 group_id: 1.into(),
3125 parent_group_id: 0.into(),
3126 member_table_ids: vec![],
3127 compaction_group_version_id: 0,
3128 };
3129 levels
3130 })]),
3131 state_table_info: HummockVersionStateTableInfo::from_protobuf_owned(
3132 HashMap::from_iter([(
3133 live_table_id,
3134 StateTableInfo {
3135 committed_epoch: 1,
3136 compaction_group_id: 1.into(),
3137 },
3138 )]),
3139 ),
3140 ..Default::default()
3141 };
3142
3143 assert_eq!(version.prune_stale_table_ids_from_ssts(), 1);
3144
3145 let cg = version.get_compaction_group_levels(1.into());
3146 assert_eq!(cg.l0.sub_levels.len(), 1);
3147 assert_eq!(cg.l0.sub_levels[0].table_infos.len(), 1);
3148 assert_eq!(cg.l0.sub_levels[0].table_infos[0].sst_id, 1);
3149 assert_eq!(
3150 cg.l0.sub_levels[0].table_infos[0].table_ids,
3151 vec![live_table_id]
3152 );
3153 assert!(cg.levels[0].table_infos.is_empty());
3154 assert_eq!(cg.compaction_group_version_id, 1);
3155 }
3156
3157 #[test]
3158 fn test_prune_stale_table_ids_from_ssts_skips_legacy_member_table_ids() {
3159 let mut version = HummockVersion {
3160 id: HummockVersionId::new(0),
3161 levels: HashMap::from_iter([(1.into(), {
3162 #[expect(deprecated)]
3163 let levels = Levels {
3164 l0: OverlappingLevel {
3165 sub_levels: vec![Level {
3166 level_idx: 0,
3167 level_type: LevelType::Overlapping,
3168 table_infos: vec![make_sst(1, vec![100, 200], 50)],
3169 total_file_size: 50,
3170 uncompressed_file_size: 100,
3171 sub_level_id: 1,
3172 ..Default::default()
3173 }],
3174 total_file_size: 50,
3175 uncompressed_file_size: 100,
3176 },
3177 group_id: 1.into(),
3178 parent_group_id: 0.into(),
3179 member_table_ids: vec![100, 200],
3180 compaction_group_version_id: 0,
3181 ..Default::default()
3182 };
3183 levels
3184 })]),
3185 ..Default::default()
3186 };
3187
3188 assert_eq!(version.prune_stale_table_ids_from_ssts(), 0);
3189
3190 let cg = version.get_compaction_group_levels(1.into());
3191 assert_eq!(
3192 cg.l0.sub_levels[0].table_infos[0].table_ids,
3193 vec![TableId::new(100), TableId::new(200)]
3194 );
3195 assert_eq!(cg.compaction_group_version_id, 0);
3196 }
3197
3198 #[test]
3199 fn test_apply_version_delta_compact_l0() {
3200 let mut version = HummockVersion {
3201 id: HummockVersionId::new(0),
3202 levels: HashMap::from_iter([(1.into(), {
3203 #[expect(deprecated)]
3204 let levels = Levels {
3205 l0: OverlappingLevel {
3206 sub_levels: vec![
3207 Level {
3208 level_idx: 0,
3209 level_type: LevelType::Nonoverlapping,
3210 table_infos: vec![
3211 make_sst(1, vec![1], 100),
3212 make_sst(2, vec![2], 200),
3213 ],
3214 total_file_size: 300,
3215 uncompressed_file_size: 600,
3216 sub_level_id: 1,
3217 ..Default::default()
3218 },
3219 Level {
3220 level_idx: 0,
3221 level_type: LevelType::Nonoverlapping,
3222 table_infos: vec![make_sst(3, vec![3], 300)],
3223 total_file_size: 300,
3224 uncompressed_file_size: 600,
3225 sub_level_id: 2,
3226 ..Default::default()
3227 },
3228 ],
3229 total_file_size: 600,
3230 uncompressed_file_size: 1200,
3231 },
3232 levels: vec![Level {
3233 level_idx: 1,
3234 level_type: LevelType::Nonoverlapping,
3235 table_infos: vec![],
3236 total_file_size: 0,
3237 uncompressed_file_size: 0,
3238 ..Default::default()
3239 }],
3240 group_id: 1.into(),
3241 parent_group_id: 0.into(),
3242 member_table_ids: vec![],
3243 compaction_group_version_id: 0,
3244 };
3245 levels
3246 })]),
3247 ..Default::default()
3248 };
3249
3250 let version_delta = HummockVersionDelta {
3251 id: HummockVersionId::new(1),
3252 group_deltas: HashMap::from_iter([(
3253 1.into(),
3254 GroupDeltas {
3255 group_deltas: vec![
3256 GroupDelta::IntraLevel(IntraLevelDelta::new(
3257 0, 0,
3259 HashSet::from([1.into(), 2.into(), 3.into()]),
3260 vec![],
3261 0,
3262 0,
3263 )),
3264 GroupDelta::IntraLevel(IntraLevelDelta::new(
3265 1, 0,
3267 HashSet::new(),
3268 vec![make_sst(10, vec![1, 2, 3], 500)],
3269 0,
3270 0,
3271 )),
3272 ],
3273 },
3274 )]),
3275 ..Default::default()
3276 };
3277
3278 version.apply_version_delta(&version_delta);
3279
3280 let cg = version.get_compaction_group_levels(1.into());
3281
3282 assert!(cg.l0.sub_levels.is_empty());
3283 assert_eq!(cg.l0.total_file_size, 0);
3284 assert_eq!(cg.l0.uncompressed_file_size, 0);
3285
3286 assert_eq!(cg.levels[0].table_infos.len(), 1);
3287 assert_eq!(10, cg.levels[0].table_infos[0].sst_id);
3288 assert_eq!(cg.levels[0].total_file_size, 500);
3289 }
3290}