1use std::borrow::Borrow;
16use std::cmp::Ordering;
17use std::collections::hash_map::Entry;
18use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
19use std::iter::once;
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use risingwave_common::catalog::TableId;
25use risingwave_common::hash::VnodeBitmapExt;
26use risingwave_common::log::LogSuppressor;
27use risingwave_pb::hummock::{
28 CompactionConfig, CompatibilityVersion, PbLevelType, StateTableInfo, StateTableInfoDelta,
29};
30use tracing::warn;
31
32use super::group_split::split_sst_with_table_ids;
33use super::{StateTableId, group_split};
34use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
35use crate::compact_task::is_compaction_task_expired;
36use crate::compaction_group::StaticCompactionGroupId;
37use crate::key_range::KeyRangeCommon;
38use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
39use crate::sstable_info::SstableInfo;
40use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
41use crate::vector_index::apply_vector_index_delta;
42use crate::version::{
43 GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
44 IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader, SstableIdReader,
45};
46use crate::{
47 CompactionGroupId, HummockObjectId, HummockSstableId, HummockSstableObjectId, can_concat,
48};
49
50#[derive(Debug, Clone, Default)]
51pub struct SstDeltaInfo {
52 pub insert_sst_level: u32,
53 pub insert_sst_infos: Vec<SstableInfo>,
54 pub delete_sst_object_ids: Vec<HummockSstableObjectId>,
55}
56
57pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;
58
59impl<L> HummockVersionCommon<SstableInfo, L> {
60 pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
61 self.levels
62 .get(&compaction_group_id)
63 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
64 }
65
66 pub fn get_compaction_group_levels_mut(
67 &mut self,
68 compaction_group_id: CompactionGroupId,
69 ) -> &mut Levels {
70 self.levels
71 .get_mut(&compaction_group_id)
72 .unwrap_or_else(|| panic!("compaction group {} does not exist", compaction_group_id))
73 }
74
75 pub fn get_sst_ids_by_group_id(
77 &self,
78 compaction_group_id: CompactionGroupId,
79 ) -> impl Iterator<Item = HummockSstableId> + '_ {
80 self.levels
81 .iter()
82 .filter_map(move |(cg_id, level)| {
83 if *cg_id == compaction_group_id {
84 Some(level)
85 } else {
86 None
87 }
88 })
89 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
90 .flat_map(|level| level.table_infos.iter())
91 .map(|s| s.sst_id)
92 }
93
94 pub fn prune_stale_table_ids_from_ssts(&mut self) -> usize {
99 let live_table_ids: HashSet<_> = self.state_table_info.info().keys().copied().collect();
100 if live_table_ids.is_empty()
103 && self.levels.values().any(|levels| {
104 #[expect(deprecated)]
105 {
106 !levels.member_table_ids.is_empty()
107 }
108 })
109 {
110 return 0;
111 }
112
113 let mut pruned_table_ids = HashSet::new();
114
115 for levels in self.levels.values_mut() {
116 let stale_table_ids = levels
117 .l0
118 .sub_levels
119 .iter()
120 .chain(levels.levels.iter())
121 .flat_map(|level| level.table_infos.iter())
122 .flat_map(|sst| sst.table_ids.iter().copied())
123 .filter(|table_id| !live_table_ids.contains(table_id))
124 .collect::<HashSet<_>>();
125
126 if stale_table_ids.is_empty() {
127 continue;
128 }
129
130 pruned_table_ids.extend(stale_table_ids.iter().copied());
131 levels.prune_table_ids_from_ssts(&stale_table_ids);
132 }
133
134 pruned_table_ids.len()
135 }
136
137 pub fn level_iter<F: FnMut(&Level) -> bool>(
138 &self,
139 compaction_group_id: CompactionGroupId,
140 mut f: F,
141 ) {
142 if let Some(levels) = self.levels.get(&compaction_group_id) {
143 for sub_level in &levels.l0.sub_levels {
144 if !f(sub_level) {
145 return;
146 }
147 }
148 for level in &levels.levels {
149 if !f(level) {
150 return;
151 }
152 }
153 }
154 }
155
156 pub fn num_levels(&self, compaction_group_id: CompactionGroupId) -> usize {
157 self.levels
159 .get(&compaction_group_id)
160 .map(|group| group.levels.len() + 1)
161 .unwrap_or(0)
162 }
163
164 pub fn safe_epoch_table_watermarks(
165 &self,
166 existing_table_ids: &[TableId],
167 ) -> BTreeMap<TableId, TableWatermarks> {
168 safe_epoch_table_watermarks_impl(&self.table_watermarks, existing_table_ids)
169 }
170}
171
172pub fn safe_epoch_table_watermarks_impl(
173 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
174 existing_table_ids: &[TableId],
175) -> BTreeMap<TableId, TableWatermarks> {
176 fn extract_single_table_watermark(
177 table_watermarks: &TableWatermarks,
178 ) -> Option<TableWatermarks> {
179 if let Some((first_epoch, first_epoch_watermark)) = table_watermarks.watermarks.first() {
180 Some(TableWatermarks {
181 watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
182 direction: table_watermarks.direction,
183 watermark_type: table_watermarks.watermark_type,
184 })
185 } else {
186 None
187 }
188 }
189 table_watermarks
190 .iter()
191 .filter_map(|(table_id, table_watermarks)| {
192 if !existing_table_ids.contains(table_id) {
193 None
194 } else {
195 extract_single_table_watermark(table_watermarks)
196 .map(|table_watermarks| (*table_id, table_watermarks))
197 }
198 })
199 .collect()
200}
201
202pub fn safe_epoch_read_table_watermarks_impl(
203 safe_epoch_watermarks: BTreeMap<TableId, TableWatermarks>,
204) -> BTreeMap<TableId, ReadTableWatermark> {
205 safe_epoch_watermarks
206 .into_iter()
207 .map(|(table_id, watermarks)| {
208 assert_eq!(watermarks.watermarks.len(), 1);
209 let vnode_watermarks = &watermarks.watermarks.first().expect("should exist").1;
210 let mut vnode_watermark_map = BTreeMap::new();
211 for vnode_watermark in vnode_watermarks.iter() {
212 let watermark = Bytes::copy_from_slice(vnode_watermark.watermark());
213 for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
214 assert!(
215 vnode_watermark_map
216 .insert(vnode, watermark.clone())
217 .is_none(),
218 "duplicate table watermark on vnode {}",
219 vnode.to_index()
220 );
221 }
222 }
223 (
224 table_id,
225 ReadTableWatermark {
226 direction: watermarks.direction,
227 vnode_watermarks: vnode_watermark_map,
228 },
229 )
230 })
231 .collect()
232}
233
234impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
235 pub fn count_new_ssts_in_group_split(
236 &self,
237 parent_group_id: CompactionGroupId,
238 split_key: Bytes,
239 ) -> u64 {
240 self.levels
241 .get(&parent_group_id)
242 .map_or(0, |parent_levels| {
243 let l0 = &parent_levels.l0;
244 let mut split_count = 0;
245 for sub_level in &l0.sub_levels {
246 assert!(!sub_level.table_infos.is_empty());
247
248 if sub_level.level_type == PbLevelType::Overlapping {
249 split_count += sub_level
251 .table_infos
252 .iter()
253 .map(|sst| {
254 if let group_split::SstSplitType::Both =
255 group_split::need_to_split(sst, split_key.clone())
256 {
257 2
258 } else {
259 0
260 }
261 })
262 .sum::<u64>();
263 continue;
264 }
265
266 let pos = group_split::get_split_pos(&sub_level.table_infos, split_key.clone());
267 let sst = sub_level.table_infos.get(pos).unwrap();
268
269 if let group_split::SstSplitType::Both =
270 group_split::need_to_split(sst, split_key.clone())
271 {
272 split_count += 2;
273 }
274 }
275
276 for level in &parent_levels.levels {
277 if level.table_infos.is_empty() {
278 continue;
279 }
280 let pos = group_split::get_split_pos(&level.table_infos, split_key.clone());
281 let sst = level.table_infos.get(pos).unwrap();
282 if let group_split::SstSplitType::Both =
283 group_split::need_to_split(sst, split_key.clone())
284 {
285 split_count += 2;
286 }
287 }
288
289 split_count
290 })
291 }
292
293 pub fn init_with_parent_group(
294 &mut self,
295 parent_group_id: CompactionGroupId,
296 group_id: CompactionGroupId,
297 member_table_ids: BTreeSet<StateTableId>,
298 new_sst_start_id: HummockSstableId,
299 ) {
300 let mut new_sst_id = new_sst_start_id;
301 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
302 if new_sst_start_id != 0 {
303 if cfg!(debug_assertions) {
304 panic!(
305 "non-zero sst start id {} for NewCompactionGroup",
306 new_sst_start_id
307 );
308 } else {
309 warn!(
310 %new_sst_start_id,
311 "non-zero sst start id for NewCompactionGroup"
312 );
313 }
314 }
315 return;
316 } else if !self.levels.contains_key(&parent_group_id) {
317 unreachable!(
318 "non-existing parent group id {} to init from",
319 parent_group_id
320 );
321 }
322 let [parent_levels, cur_levels] = self
323 .levels
324 .get_disjoint_mut([&parent_group_id, &group_id])
325 .map(|res| res.unwrap());
326 parent_levels.compaction_group_version_id += 1;
329 cur_levels.compaction_group_version_id += 1;
330 let l0 = &mut parent_levels.l0;
331 {
332 for sub_level in &mut l0.sub_levels {
333 let target_l0 = &mut cur_levels.l0;
334 let insert_table_infos =
337 split_sst_info_for_level(&member_table_ids, sub_level, &mut new_sst_id);
338 sub_level.normalize();
339 if insert_table_infos.is_empty() {
340 continue;
341 }
342 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
343 Ok(idx) => {
344 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
345 }
346 Err(idx) => {
347 insert_new_sub_level(
348 target_l0,
349 sub_level.sub_level_id,
350 sub_level.level_type,
351 insert_table_infos,
352 Some(idx),
353 );
354 }
355 }
356 }
357 l0.normalize();
358 }
359 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
360 let insert_table_infos =
361 split_sst_info_for_level(&member_table_ids, level, &mut new_sst_id);
362 cur_levels.levels[idx].total_file_size += insert_table_infos
363 .iter()
364 .map(|sst| sst.sst_size)
365 .sum::<u64>();
366 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
367 .iter()
368 .map(|sst| sst.uncompressed_file_size)
369 .sum::<u64>();
370 cur_levels.levels[idx]
371 .table_infos
372 .extend(insert_table_infos);
373 cur_levels.levels[idx]
374 .table_infos
375 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
376 assert!(can_concat(&cur_levels.levels[idx].table_infos));
377 level.normalize();
378 }
379
380 assert!(
381 parent_levels
382 .l0
383 .sub_levels
384 .iter()
385 .all(|level| !level.table_infos.is_empty())
386 );
387 assert!(
388 cur_levels
389 .l0
390 .sub_levels
391 .iter()
392 .all(|level| !level.table_infos.is_empty())
393 );
394 }
395
396 pub fn build_sst_delta_infos(
397 &self,
398 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
399 ) -> Vec<SstDeltaInfo> {
400 let mut infos = vec![];
401
402 if version_delta.trivial_move {
405 return infos;
406 }
407
408 for (group_id, group_deltas) in &version_delta.group_deltas {
409 let mut info = SstDeltaInfo::default();
410
411 let mut removed_l0_ssts: BTreeSet<HummockSstableId> = BTreeSet::new();
412 let mut removed_ssts: BTreeMap<u32, BTreeSet<HummockSstableId>> = BTreeMap::new();
413
414 if !group_deltas.group_deltas.iter().all(|delta| {
416 matches!(
417 delta,
418 GroupDelta::IntraLevel(_) | GroupDelta::NewL0SubLevel(_)
419 )
420 }) {
421 continue;
422 }
423
424 for group_delta in &group_deltas.group_deltas {
428 match group_delta {
429 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
430 if !inserted_table_infos.is_empty() {
431 info.insert_sst_level = 0;
432 info.insert_sst_infos
433 .extend(inserted_table_infos.iter().cloned());
434 }
435 }
436 GroupDeltaCommon::IntraLevel(intra_level) => {
437 if !intra_level.inserted_table_infos.is_empty() {
438 info.insert_sst_level = intra_level.level_idx;
439 info.insert_sst_infos
440 .extend(intra_level.inserted_table_infos.iter().cloned());
441 }
442 if !intra_level.removed_table_ids.is_empty() {
443 for id in &intra_level.removed_table_ids {
444 if intra_level.level_idx == 0 {
445 removed_l0_ssts.insert(*id);
446 } else {
447 removed_ssts
448 .entry(intra_level.level_idx)
449 .or_default()
450 .insert(*id);
451 }
452 }
453 }
454 }
455 GroupDeltaCommon::GroupConstruct(_)
456 | GroupDeltaCommon::GroupDestroy(_)
457 | GroupDeltaCommon::GroupMerge(_)
458 | GroupDeltaCommon::PruneTableIdsFromSsts(_) => {}
459 }
460 }
461
462 let group = self.levels.get(group_id).unwrap();
463 for l0_sub_level in &group.level0().sub_levels {
464 for sst_info in &l0_sub_level.table_infos {
465 if removed_l0_ssts.remove(&sst_info.sst_id) {
466 info.delete_sst_object_ids.push(sst_info.object_id);
467 }
468 }
469 }
470 for level in &group.levels {
471 if let Some(mut removed_level_ssts) = removed_ssts.remove(&level.level_idx) {
472 for sst_info in &level.table_infos {
473 if removed_level_ssts.remove(&sst_info.sst_id) {
474 info.delete_sst_object_ids.push(sst_info.object_id);
475 }
476 }
477 if !removed_level_ssts.is_empty() {
478 tracing::error!(
479 "removed_level_ssts is not empty: {:?}",
480 removed_level_ssts,
481 );
482 }
483 debug_assert!(removed_level_ssts.is_empty());
484 }
485 }
486
487 if !removed_l0_ssts.is_empty() || !removed_ssts.is_empty() {
488 tracing::error!(
489 "not empty removed_l0_ssts: {:?}, removed_ssts: {:?}",
490 removed_l0_ssts,
491 removed_ssts
492 );
493 }
494 debug_assert!(removed_l0_ssts.is_empty());
495 debug_assert!(removed_ssts.is_empty());
496
497 infos.push(info);
498 }
499
500 infos
501 }
502
503 pub fn apply_version_delta(
504 &mut self,
505 version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
506 ) -> HashMap<TableId, Option<StateTableInfo>> {
507 assert_eq!(self.id, version_delta.prev_id);
508
509 let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
510 &version_delta.state_table_info_delta,
511 &version_delta.removed_table_ids,
512 );
513
514 #[expect(deprecated)]
515 {
516 if !is_commit_epoch && self.max_committed_epoch < version_delta.max_committed_epoch {
517 is_commit_epoch = true;
518 tracing::trace!(
519 "max committed epoch bumped but no table committed epoch is changed"
520 );
521 }
522 }
523
524 for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
526 let mut is_applied_l0_compact = false;
527 for group_delta in &group_deltas.group_deltas {
528 match group_delta {
529 GroupDeltaCommon::GroupConstruct(group_construct) => {
530 let mut new_levels = build_initial_compaction_group_levels(
531 *compaction_group_id,
532 group_construct.get_group_config().unwrap(),
533 );
534 let parent_group_id = group_construct.parent_group_id;
535 new_levels.parent_group_id = parent_group_id;
536 #[expect(deprecated)]
537 new_levels
539 .member_table_ids
540 .clone_from(&group_construct.table_ids);
541 self.levels.insert(*compaction_group_id, new_levels);
542 let member_table_ids = if group_construct.version()
543 >= CompatibilityVersion::NoMemberTableIds
544 {
545 self.state_table_info
546 .compaction_group_member_table_ids(*compaction_group_id)
547 .iter()
548 .copied()
549 .collect()
550 } else {
551 #[expect(deprecated)]
552 BTreeSet::from_iter(
554 group_construct.table_ids.iter().copied().map(Into::into),
555 )
556 };
557
558 if group_construct.version() >= CompatibilityVersion::SplitGroupByTableId {
559 let split_key = if group_construct.split_key.is_some() {
560 Some(Bytes::from(group_construct.split_key.clone().unwrap()))
561 } else {
562 None
563 };
564 self.init_with_parent_group_v2(
565 parent_group_id,
566 *compaction_group_id,
567 group_construct.new_sst_start_id,
568 split_key.clone(),
569 );
570 } else {
571 self.init_with_parent_group(
573 parent_group_id,
574 *compaction_group_id,
575 member_table_ids,
576 group_construct.new_sst_start_id,
577 );
578 }
579 }
580 GroupDeltaCommon::GroupMerge(group_merge) => {
581 tracing::info!(
582 "group_merge left {:?} right {:?}",
583 group_merge.left_group_id,
584 group_merge.right_group_id
585 );
586 self.merge_compaction_group(
587 group_merge.left_group_id,
588 group_merge.right_group_id,
589 )
590 }
591 GroupDeltaCommon::IntraLevel(level_delta) => {
592 let levels =
593 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
594 panic!("compaction group {} does not exist", compaction_group_id)
595 });
596 if is_commit_epoch {
597 assert!(
598 level_delta.removed_table_ids.is_empty(),
599 "no sst should be deleted when committing an epoch"
600 );
601
602 let IntraLevelDelta {
603 level_idx,
604 l0_sub_level_id,
605 inserted_table_infos,
606 ..
607 } = level_delta;
608 {
609 assert_eq!(
610 *level_idx, 0,
611 "we should only add to L0 when we commit an epoch."
612 );
613 if !inserted_table_infos.is_empty() {
614 insert_new_sub_level(
615 &mut levels.l0,
616 *l0_sub_level_id,
617 PbLevelType::Overlapping,
618 inserted_table_infos.clone(),
619 None,
620 );
621 }
622 }
623 } else {
624 levels.apply_compact_ssts(
626 level_delta,
627 self.state_table_info
628 .compaction_group_member_table_ids(*compaction_group_id),
629 );
630 if level_delta.level_idx == 0 {
631 is_applied_l0_compact = true;
632 }
633 }
634 }
635 GroupDeltaCommon::NewL0SubLevel(inserted_table_infos) => {
636 let levels =
637 self.levels.get_mut(compaction_group_id).unwrap_or_else(|| {
638 panic!("compaction group {} does not exist", compaction_group_id)
639 });
640 assert!(is_commit_epoch);
641
642 if !inserted_table_infos.is_empty() {
643 let next_l0_sub_level_id = levels
644 .l0
645 .sub_levels
646 .last()
647 .map(|level| level.sub_level_id + 1)
648 .unwrap_or(1);
649
650 insert_new_sub_level(
651 &mut levels.l0,
652 next_l0_sub_level_id,
653 PbLevelType::Overlapping,
654 inserted_table_infos.clone(),
655 None,
656 );
657 }
658 }
659 GroupDeltaCommon::GroupDestroy(_) => {
660 self.levels.remove(compaction_group_id);
661 }
662
663 GroupDeltaCommon::PruneTableIdsFromSsts(table_ids) => {
664 self.levels
665 .get_mut(compaction_group_id)
666 .unwrap_or_else(|| {
667 panic!("compaction group {} does not exist", compaction_group_id)
668 })
669 .prune_table_ids_from_ssts(table_ids);
670 }
671 }
672 }
673
674 if is_applied_l0_compact && let Some(levels) = self.levels.get_mut(compaction_group_id)
675 {
676 levels.l0.normalize();
677 }
678 }
679 self.id = version_delta.id;
680 #[expect(deprecated)]
681 {
682 self.max_committed_epoch = version_delta.max_committed_epoch;
683 }
684
685 let mut modified_table_watermarks: HashMap<TableId, Option<TableWatermarks>> =
689 HashMap::new();
690
691 for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
693 if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
694 if version_delta.removed_table_ids.contains(table_id) {
695 modified_table_watermarks.insert(*table_id, None);
696 } else {
697 let mut current_table_watermarks = (**current_table_watermarks).clone();
698 current_table_watermarks.apply_new_table_watermarks(table_watermarks);
699 modified_table_watermarks.insert(*table_id, Some(current_table_watermarks));
700 }
701 } else {
702 modified_table_watermarks.insert(*table_id, Some(table_watermarks.clone()));
703 }
704 }
705 for (table_id, table_watermarks) in &self.table_watermarks {
706 let safe_epoch = if let Some(state_table_info) =
707 self.state_table_info.info().get(table_id)
708 && let Some((oldest_epoch, _)) = table_watermarks.watermarks.first()
709 && state_table_info.committed_epoch > *oldest_epoch
710 {
711 state_table_info.committed_epoch
713 } else {
714 continue;
716 };
717 let table_watermarks = modified_table_watermarks
718 .entry(*table_id)
719 .or_insert_with(|| Some((**table_watermarks).clone()));
720 if let Some(table_watermarks) = table_watermarks {
721 table_watermarks.clear_stale_epoch_watermark(safe_epoch);
722 }
723 }
724 for (table_id, table_watermarks) in modified_table_watermarks {
726 if let Some(table_watermarks) = table_watermarks {
727 self.table_watermarks
728 .insert(table_id, Arc::new(table_watermarks));
729 } else {
730 self.table_watermarks.remove(&table_id);
731 }
732 }
733 apply_vector_index_delta(
735 &mut self.vector_indexes,
736 &version_delta.vector_index_delta,
737 &version_delta.removed_table_ids,
738 );
739
740 changed_table_info
741 }
742
743 pub fn apply_change_log_delta<T: Clone>(
744 table_change_log: &mut HashMap<TableId, TableChangeLogCommon<T>>,
745 change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
746 ) {
747 for (table_id, change_log_delta) in change_log_delta {
748 let new_change_log = &change_log_delta.new_log;
749 match table_change_log.entry(*table_id) {
750 Entry::Occupied(entry) => {
751 let change_log = entry.into_mut();
752 change_log.add_change_log(new_change_log.clone());
753 }
754 Entry::Vacant(entry) => {
755 entry.insert(TableChangeLogCommon::new(once(new_change_log.clone())));
756 }
757 };
758 }
759
760 for (table_id, change_log_delta) in change_log_delta {
762 if let Some(change_log) = table_change_log.get_mut(table_id) {
763 change_log.truncate(change_log_delta.truncate_epoch);
764 }
765 }
766 }
767
768 pub fn collect_gc_change_log_delta<'a, T: Clone>(
770 current_change_log_table_ids: impl Iterator<Item = &'a TableId>,
771 change_log_delta: &HashMap<TableId, ChangeLogDeltaCommon<T>>,
772 removed_table_ids: &HashSet<TableId>,
773 state_table_info_delta: &HashMap<TableId, StateTableInfoDelta>,
774 changed_table_info: &HashMap<TableId, Option<StateTableInfo>>,
775 ) -> HashSet<TableId> {
776 let mut gc_change_log_delta = HashSet::new();
777 for table_id in current_change_log_table_ids {
781 if removed_table_ids.contains(table_id) {
782 gc_change_log_delta.insert(*table_id);
783 continue;
784 }
785 if let Some(table_info_delta) = state_table_info_delta.get(table_id)
786 && let Some(Some(prev_table_info)) = changed_table_info.get(table_id)
787 && table_info_delta.committed_epoch > prev_table_info.committed_epoch
788 {
789 } else {
791 continue;
793 }
794 let contains = change_log_delta.contains_key(table_id);
795 if !contains {
796 gc_change_log_delta.insert(*table_id);
797 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
798 LazyLock::new(|| LogSuppressor::per_second(1));
799 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
800 warn!(
801 suppressed_count,
802 %table_id,
803 "table change log dropped due to no further change log at newly committed epoch"
804 );
805 }
806 }
807 }
808 gc_change_log_delta
809 }
810
811 pub fn build_branched_sst_info(&self) -> BTreeMap<HummockSstableObjectId, BranchedSstInfo> {
812 let mut ret: BTreeMap<_, _> = BTreeMap::new();
813 for (compaction_group_id, group) in &self.levels {
814 let mut levels = vec![];
815 levels.extend(group.l0.sub_levels.iter());
816 levels.extend(group.levels.iter());
817 for level in levels {
818 for table_info in &level.table_infos {
819 if table_info.sst_id.as_raw_id() == table_info.object_id.as_raw_id() {
820 continue;
821 }
822 let object_id = table_info.object_id;
823 let entry: &mut BranchedSstInfo = ret.entry(object_id).or_default();
824 entry
825 .entry(*compaction_group_id)
826 .or_default()
827 .push(table_info.sst_id)
828 }
829 }
830 }
831 ret
832 }
833
834 pub fn merge_compaction_group(
835 &mut self,
836 left_group_id: CompactionGroupId,
837 right_group_id: CompactionGroupId,
838 ) {
839 let left_group_id_table_ids = self
841 .state_table_info
842 .compaction_group_member_table_ids(left_group_id)
843 .iter();
844 let right_group_id_table_ids = self
845 .state_table_info
846 .compaction_group_member_table_ids(right_group_id)
847 .iter();
848
849 assert!(
850 left_group_id_table_ids
851 .chain(right_group_id_table_ids)
852 .is_sorted()
853 );
854
855 let total_cg = self.levels.keys().cloned().collect::<Vec<_>>();
856 let right_levels = self.levels.remove(&right_group_id).unwrap_or_else(|| {
857 panic!(
858 "compaction group should exist right {} all {:?}",
859 right_group_id, total_cg
860 )
861 });
862
863 let left_levels = self.levels.get_mut(&left_group_id).unwrap_or_else(|| {
864 panic!(
865 "compaction group should exist left {} all {:?}",
866 left_group_id, total_cg
867 )
868 });
869
870 group_split::merge_levels(left_levels, right_levels);
871 }
872
873 pub fn init_with_parent_group_v2(
874 &mut self,
875 parent_group_id: CompactionGroupId,
876 group_id: CompactionGroupId,
877 new_sst_start_id: HummockSstableId,
878 split_key: Option<Bytes>,
879 ) {
880 let mut new_sst_id = new_sst_start_id;
881 if parent_group_id == StaticCompactionGroupId::NewCompactionGroup {
882 if new_sst_start_id != 0 {
883 if cfg!(debug_assertions) {
884 panic!(
885 "non-zero sst start id {} for NewCompactionGroup",
886 new_sst_start_id
887 );
888 } else {
889 warn!(
890 %new_sst_start_id,
891 "non-zero sst start id for NewCompactionGroup"
892 );
893 }
894 }
895 return;
896 } else if !self.levels.contains_key(&parent_group_id) {
897 unreachable!(
898 "non-existing parent group id {} to init from (V2)",
899 parent_group_id
900 );
901 }
902
903 let [parent_levels, cur_levels] = self
904 .levels
905 .get_disjoint_mut([&parent_group_id, &group_id])
906 .map(|res| res.unwrap());
907 parent_levels.compaction_group_version_id += 1;
910 cur_levels.compaction_group_version_id += 1;
911
912 let l0 = &mut parent_levels.l0;
913 {
914 for sub_level in &mut l0.sub_levels {
915 let target_l0 = &mut cur_levels.l0;
916 let insert_table_infos = if let Some(split_key) = &split_key {
919 group_split::split_sst_info_for_level_v2(
920 sub_level,
921 &mut new_sst_id,
922 split_key.clone(),
923 )
924 } else {
925 vec![]
926 };
927
928 if insert_table_infos.is_empty() {
929 continue;
930 }
931
932 sub_level.normalize();
933 match group_split::get_sub_level_insert_hint(&target_l0.sub_levels, sub_level) {
934 Ok(idx) => {
935 add_ssts_to_sub_level(target_l0, idx, insert_table_infos);
936 }
937 Err(idx) => {
938 insert_new_sub_level(
939 target_l0,
940 sub_level.sub_level_id,
941 sub_level.level_type,
942 insert_table_infos,
943 Some(idx),
944 );
945 }
946 }
947 }
948 l0.normalize();
949 }
950
951 for (idx, level) in parent_levels.levels.iter_mut().enumerate() {
952 let insert_table_infos = if let Some(split_key) = &split_key {
953 group_split::split_sst_info_for_level_v2(level, &mut new_sst_id, split_key.clone())
954 } else {
955 vec![]
956 };
957
958 if insert_table_infos.is_empty() {
959 continue;
960 }
961
962 cur_levels.levels[idx].total_file_size += insert_table_infos
963 .iter()
964 .map(|sst| sst.sst_size)
965 .sum::<u64>();
966 cur_levels.levels[idx].uncompressed_file_size += insert_table_infos
967 .iter()
968 .map(|sst| sst.uncompressed_file_size)
969 .sum::<u64>();
970 cur_levels.levels[idx]
971 .table_infos
972 .extend(insert_table_infos);
973 cur_levels.levels[idx]
974 .table_infos
975 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
976 assert!(can_concat(&cur_levels.levels[idx].table_infos));
977 level.normalize();
978 }
979
980 assert!(
981 parent_levels
982 .l0
983 .sub_levels
984 .iter()
985 .all(|level| !level.table_infos.is_empty())
986 );
987 assert!(
988 cur_levels
989 .l0
990 .sub_levels
991 .iter()
992 .all(|level| !level.table_infos.is_empty())
993 );
994 }
995}
996
997impl<T> HummockVersionCommon<T>
998where
999 T: SstableIdReader + ObjectIdReader,
1000{
1001 pub fn get_object_ids(&self) -> impl Iterator<Item = HummockObjectId> + '_ {
1002 match HummockObjectId::Sstable(0.into()) {
1006 HummockObjectId::Sstable(_) => {}
1007 HummockObjectId::VectorFile(_) => {}
1008 HummockObjectId::HnswGraphFile(_) => {}
1009 };
1010 self.get_sst_infos()
1011 .map(|s| HummockObjectId::Sstable(s.object_id()))
1012 .chain(
1013 self.vector_indexes
1014 .values()
1015 .flat_map(|index| index.get_objects().map(|(object_id, _)| object_id)),
1016 )
1017 }
1018
1019 pub fn get_sst_ids(&self) -> HashSet<HummockSstableId> {
1020 self.get_sst_infos().map(|s| s.sst_id()).collect()
1021 }
1022
1023 pub fn get_sst_infos(&self) -> impl Iterator<Item = &T> {
1024 self.get_combined_levels()
1025 .flat_map(|level| level.table_infos.iter())
1026 }
1027}
1028
1029impl Levels {
1030 pub(crate) fn apply_compact_ssts(
1031 &mut self,
1032 level_delta: &IntraLevelDeltaCommon<SstableInfo>,
1033 member_table_ids: &BTreeSet<TableId>,
1034 ) {
1035 let IntraLevelDeltaCommon {
1036 level_idx,
1037 l0_sub_level_id,
1038 inserted_table_infos: insert_table_infos,
1039 vnode_partition_count,
1040 removed_table_ids: delete_sst_ids_set,
1041 compaction_group_version_id,
1042 } = level_delta;
1043 let new_vnode_partition_count = *vnode_partition_count;
1044
1045 if is_compaction_task_expired(
1046 self.compaction_group_version_id,
1047 *compaction_group_version_id,
1048 ) {
1049 warn!(
1050 current_compaction_group_version_id = self.compaction_group_version_id,
1051 delta_compaction_group_version_id = compaction_group_version_id,
1052 level_idx,
1053 l0_sub_level_id,
1054 insert_table_infos = ?insert_table_infos
1055 .iter()
1056 .map(|sst| (sst.sst_id, sst.object_id))
1057 .collect_vec(),
1058 ?delete_sst_ids_set,
1059 "This VersionDelta may be committed by an expired compact task. Please check it."
1060 );
1061 return;
1062 }
1063 if !delete_sst_ids_set.is_empty() {
1064 if *level_idx == 0 {
1065 for level in &mut self.l0.sub_levels {
1066 level.delete_ssts(delete_sst_ids_set);
1067 }
1068 } else {
1069 let idx = *level_idx as usize - 1;
1070 self.levels[idx].delete_ssts(delete_sst_ids_set);
1071 }
1072 }
1073
1074 if !insert_table_infos.is_empty() {
1075 let insert_sst_level_id = *level_idx;
1076 let insert_sub_level_id = *l0_sub_level_id;
1077 if insert_sst_level_id == 0 {
1078 let l0 = &mut self.l0;
1079 let index = l0
1080 .sub_levels
1081 .partition_point(|level| level.sub_level_id < insert_sub_level_id);
1082 assert!(
1083 index < l0.sub_levels.len()
1084 && l0.sub_levels[index].sub_level_id == insert_sub_level_id,
1085 "should find the level to insert into when applying compaction generated delta. sub level idx: {}, removed sst ids: {:?}, sub levels: {:?},",
1086 insert_sub_level_id,
1087 delete_sst_ids_set,
1088 l0.sub_levels
1089 .iter()
1090 .map(|level| level.sub_level_id)
1091 .collect_vec()
1092 );
1093 if l0.sub_levels[index].table_infos.is_empty()
1094 && member_table_ids.len() == 1
1095 && insert_table_infos.iter().all(|sst| {
1096 sst.table_ids.len() == 1
1097 && sst.table_ids[0]
1098 == *member_table_ids.iter().next().expect("non-empty")
1099 })
1100 {
1101 l0.sub_levels[index].vnode_partition_count = new_vnode_partition_count;
1104 }
1105 level_insert_ssts(&mut l0.sub_levels[index], insert_table_infos);
1106 } else {
1107 let idx = insert_sst_level_id as usize - 1;
1108 if self.levels[idx].table_infos.is_empty()
1109 && insert_table_infos
1110 .iter()
1111 .all(|sst| sst.table_ids.len() == 1)
1112 {
1113 self.levels[idx].vnode_partition_count = new_vnode_partition_count;
1114 } else if self.levels[idx].vnode_partition_count != 0
1115 && new_vnode_partition_count == 0
1116 && member_table_ids.len() > 1
1117 {
1118 self.levels[idx].vnode_partition_count = 0;
1119 }
1120 level_insert_ssts(&mut self.levels[idx], insert_table_infos);
1121 }
1122 }
1123 }
1124
1125 pub(crate) fn prune_table_ids_from_ssts(&mut self, table_ids: &HashSet<TableId>) {
1128 for level in self.l0.sub_levels.iter_mut().chain(self.levels.iter_mut()) {
1129 level.prune_table_ids_from_ssts(table_ids);
1130 }
1131 self.l0.normalize();
1132 self.compaction_group_version_id += 1;
1133 }
1134}
1135
1136impl<T, L> HummockVersionCommon<T, L> {
1137 pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
1138 self.levels
1139 .values()
1140 .flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
1141 }
1142}
1143
1144pub fn build_initial_compaction_group_levels(
1145 group_id: impl Into<CompactionGroupId>,
1146 compaction_config: &CompactionConfig,
1147) -> Levels {
1148 let mut levels = vec![];
1149 for l in 0..compaction_config.get_max_level() {
1150 levels.push(Level {
1151 level_idx: (l + 1) as u32,
1152 level_type: PbLevelType::Nonoverlapping,
1153 table_infos: vec![],
1154 total_file_size: 0,
1155 sub_level_id: 0,
1156 uncompressed_file_size: 0,
1157 vnode_partition_count: 0,
1158 });
1159 }
1160 #[expect(deprecated)] Levels {
1162 levels,
1163 l0: OverlappingLevel {
1164 sub_levels: vec![],
1165 total_file_size: 0,
1166 uncompressed_file_size: 0,
1167 },
1168 group_id: group_id.into(),
1169 parent_group_id: 0.into(),
1170 member_table_ids: vec![],
1171 compaction_group_version_id: 0,
1172 }
1173}
1174
1175fn split_sst_info_for_level(
1176 member_table_ids: &BTreeSet<TableId>,
1177 level: &mut Level,
1178 new_sst_id: &mut HummockSstableId,
1179) -> Vec<SstableInfo> {
1180 let mut insert_table_infos = vec![];
1183 for sst_info in &mut level.table_infos {
1184 let removed_table_ids = sst_info
1185 .table_ids
1186 .iter()
1187 .filter(|table_id| member_table_ids.contains(*table_id))
1188 .cloned()
1189 .collect_vec();
1190 let sst_size = sst_info.sst_size;
1191 if sst_size / 2 == 0 {
1192 tracing::warn!(
1193 id = %sst_info.sst_id,
1194 object_id = %sst_info.object_id,
1195 sst_size = sst_info.sst_size,
1196 file_size = sst_info.file_size,
1197 "Sstable sst_size is under expected",
1198 );
1199 };
1200 if !removed_table_ids.is_empty() {
1201 let (modified_sst, branch_sst) = split_sst_with_table_ids(
1202 sst_info,
1203 new_sst_id,
1204 sst_size / 2,
1205 sst_size / 2,
1206 member_table_ids.iter().cloned().collect_vec(),
1207 );
1208 *sst_info = modified_sst;
1209 insert_table_infos.push(branch_sst);
1210 }
1211 }
1212 insert_table_infos
1213}
1214
1215pub fn get_compaction_group_ids(
1217 version: &HummockVersion,
1218) -> impl Iterator<Item = CompactionGroupId> + '_ {
1219 version.levels.keys().cloned()
1220}
1221
1222pub fn get_table_compaction_group_id_mapping(
1223 version: &HummockVersion,
1224) -> HashMap<StateTableId, CompactionGroupId> {
1225 version
1226 .state_table_info
1227 .info()
1228 .iter()
1229 .map(|(table_id, info)| (*table_id, info.compaction_group_id))
1230 .collect()
1231}
1232
1233pub fn get_compaction_group_ssts(
1235 version: &HummockVersion,
1236 group_id: CompactionGroupId,
1237) -> impl Iterator<Item = (HummockSstableObjectId, HummockSstableId)> + '_ {
1238 let group_levels = version.get_compaction_group_levels(group_id);
1239 group_levels
1240 .l0
1241 .sub_levels
1242 .iter()
1243 .rev()
1244 .chain(group_levels.levels.iter())
1245 .flat_map(|level| {
1246 level
1247 .table_infos
1248 .iter()
1249 .map(|table_info| (table_info.object_id, table_info.sst_id))
1250 })
1251}
1252
1253pub fn new_sub_level(
1254 sub_level_id: u64,
1255 level_type: PbLevelType,
1256 table_infos: Vec<SstableInfo>,
1257) -> Level {
1258 if level_type == PbLevelType::Nonoverlapping {
1259 debug_assert!(
1260 can_concat(&table_infos),
1261 "sst of non-overlapping level is not concat-able: {:?}",
1262 table_infos
1263 );
1264 }
1265 let total_file_size = table_infos.iter().map(|table| table.sst_size).sum();
1266 let uncompressed_file_size = table_infos
1267 .iter()
1268 .map(|table| table.uncompressed_file_size)
1269 .sum();
1270 Level {
1271 level_idx: 0,
1272 level_type,
1273 table_infos,
1274 total_file_size,
1275 sub_level_id,
1276 uncompressed_file_size,
1277 vnode_partition_count: 0,
1278 }
1279}
1280
1281pub fn add_ssts_to_sub_level(
1282 l0: &mut OverlappingLevel,
1283 sub_level_idx: usize,
1284 insert_table_infos: Vec<SstableInfo>,
1285) {
1286 insert_table_infos.iter().for_each(|sst| {
1287 l0.sub_levels[sub_level_idx].total_file_size += sst.sst_size;
1288 l0.sub_levels[sub_level_idx].uncompressed_file_size += sst.uncompressed_file_size;
1289 l0.total_file_size += sst.sst_size;
1290 l0.uncompressed_file_size += sst.uncompressed_file_size;
1291 });
1292 l0.sub_levels[sub_level_idx]
1293 .table_infos
1294 .extend(insert_table_infos);
1295 if l0.sub_levels[sub_level_idx].level_type == PbLevelType::Nonoverlapping {
1296 l0.sub_levels[sub_level_idx]
1297 .table_infos
1298 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1299 assert!(
1300 can_concat(&l0.sub_levels[sub_level_idx].table_infos),
1301 "sstable ids: {:?}",
1302 l0.sub_levels[sub_level_idx]
1303 .table_infos
1304 .iter()
1305 .map(|sst| sst.sst_id)
1306 .collect_vec()
1307 );
1308 }
1309}
1310
1311pub fn insert_new_sub_level(
1313 l0: &mut OverlappingLevel,
1314 insert_sub_level_id: u64,
1315 level_type: PbLevelType,
1316 insert_table_infos: Vec<SstableInfo>,
1317 sub_level_insert_hint: Option<usize>,
1318) {
1319 if insert_sub_level_id == u64::MAX {
1320 return;
1321 }
1322 let insert_pos = if let Some(insert_pos) = sub_level_insert_hint {
1323 insert_pos
1324 } else {
1325 if let Some(newest_level) = l0.sub_levels.last() {
1326 assert!(
1327 newest_level.sub_level_id < insert_sub_level_id,
1328 "inserted new level is not the newest: prev newest: {}, insert: {}. L0: {:?}",
1329 newest_level.sub_level_id,
1330 insert_sub_level_id,
1331 l0,
1332 );
1333 }
1334 l0.sub_levels.len()
1335 };
1336 #[cfg(debug_assertions)]
1337 {
1338 if insert_pos > 0
1339 && let Some(smaller_level) = l0.sub_levels.get(insert_pos - 1)
1340 {
1341 debug_assert!(smaller_level.sub_level_id < insert_sub_level_id);
1342 }
1343 if let Some(larger_level) = l0.sub_levels.get(insert_pos) {
1344 debug_assert!(larger_level.sub_level_id > insert_sub_level_id);
1345 }
1346 }
1347 let level = new_sub_level(insert_sub_level_id, level_type, insert_table_infos);
1350 l0.total_file_size += level.total_file_size;
1351 l0.uncompressed_file_size += level.uncompressed_file_size;
1352 l0.sub_levels.insert(insert_pos, level);
1353}
1354
1355impl Level {
1356 fn recompute_size(&mut self) {
1357 self.total_file_size = self
1358 .table_infos
1359 .iter()
1360 .map(|table| table.sst_size)
1361 .sum::<u64>();
1362 self.uncompressed_file_size = self
1363 .table_infos
1364 .iter()
1365 .map(|table| table.uncompressed_file_size)
1366 .sum::<u64>();
1367 }
1368
1369 fn normalize(&mut self) {
1371 self.table_infos
1372 .retain(|sst_info| !sst_info.table_ids.is_empty());
1373 self.recompute_size();
1374 }
1375
1376 fn delete_ssts(&mut self, ids: &HashSet<HummockSstableId>) -> bool {
1378 let original_len = self.table_infos.len();
1379 self.table_infos
1380 .retain(|table| !ids.contains(&table.sst_id));
1381 self.recompute_size();
1382 original_len != self.table_infos.len()
1383 }
1384
1385 fn prune_table_ids_from_ssts(&mut self, table_ids: &HashSet<TableId>) {
1388 for sstable_info in &mut self.table_infos {
1389 if !sstable_info
1390 .table_ids
1391 .iter()
1392 .any(|table_id| table_ids.contains(table_id))
1393 {
1394 continue;
1395 }
1396
1397 let mut inner = sstable_info.get_inner();
1398 inner.table_ids.retain(|id| !table_ids.contains(id));
1399 sstable_info.set_inner(inner);
1400 }
1401 self.normalize();
1402 }
1403}
1404
1405impl OverlappingLevel {
1406 fn normalize(&mut self) {
1408 self.sub_levels
1409 .retain(|level| !level.table_infos.is_empty());
1410 self.total_file_size = self
1411 .sub_levels
1412 .iter()
1413 .map(|level| level.total_file_size)
1414 .sum::<u64>();
1415 self.uncompressed_file_size = self
1416 .sub_levels
1417 .iter()
1418 .map(|level| level.uncompressed_file_size)
1419 .sum::<u64>();
1420 }
1421}
1422
1423fn level_insert_ssts(operand: &mut Level, insert_table_infos: &Vec<SstableInfo>) {
1424 fn display_sstable_infos(ssts: &[impl Borrow<SstableInfo>]) -> String {
1425 format!(
1426 "sstable ids: {:?}",
1427 ssts.iter().map(|s| s.borrow().sst_id).collect_vec()
1428 )
1429 }
1430 operand.total_file_size += insert_table_infos
1431 .iter()
1432 .map(|sst| sst.sst_size)
1433 .sum::<u64>();
1434 operand.uncompressed_file_size += insert_table_infos
1435 .iter()
1436 .map(|sst| sst.uncompressed_file_size)
1437 .sum::<u64>();
1438 if operand.level_type == PbLevelType::Overlapping {
1439 operand.level_type = PbLevelType::Nonoverlapping;
1440 operand
1441 .table_infos
1442 .extend(insert_table_infos.iter().cloned());
1443 operand
1444 .table_infos
1445 .sort_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range));
1446 assert!(
1447 can_concat(&operand.table_infos),
1448 "{}",
1449 display_sstable_infos(&operand.table_infos)
1450 );
1451 } else if !insert_table_infos.is_empty() {
1452 let sorted_insert: Vec<_> = insert_table_infos
1453 .iter()
1454 .sorted_by(|sst1, sst2| sst1.key_range.cmp(&sst2.key_range))
1455 .cloned()
1456 .collect();
1457 let first = &sorted_insert[0];
1458 let last = &sorted_insert[sorted_insert.len() - 1];
1459 let pos = operand
1460 .table_infos
1461 .partition_point(|b| b.key_range.cmp(&first.key_range) == Ordering::Less);
1462 if pos >= operand.table_infos.len()
1463 || last.key_range.cmp(&operand.table_infos[pos].key_range) == Ordering::Less
1464 {
1465 operand.table_infos.splice(pos..pos, sorted_insert);
1466 let validate_range = operand
1468 .table_infos
1469 .iter()
1470 .skip(pos.saturating_sub(1))
1471 .take(insert_table_infos.len() + 2)
1472 .collect_vec();
1473 assert!(
1474 can_concat(&validate_range),
1475 "{}",
1476 display_sstable_infos(&validate_range),
1477 );
1478 } else {
1479 warn!(insert = ?insert_table_infos, level = ?operand.table_infos, "unexpected overlap");
1482 for i in insert_table_infos {
1483 let pos = operand
1484 .table_infos
1485 .partition_point(|b| b.key_range.cmp(&i.key_range) == Ordering::Less);
1486 operand.table_infos.insert(pos, i.clone());
1487 }
1488 assert!(
1489 can_concat(&operand.table_infos),
1490 "{}",
1491 display_sstable_infos(&operand.table_infos)
1492 );
1493 }
1494 }
1495}
1496
1497pub fn version_object_size_map(version: &HummockVersion) -> HashMap<HummockObjectId, u64> {
1498 match HummockObjectId::Sstable(0.into()) {
1502 HummockObjectId::Sstable(_) => {}
1503 HummockObjectId::VectorFile(_) => {}
1504 HummockObjectId::HnswGraphFile(_) => {}
1505 };
1506 version
1507 .levels
1508 .values()
1509 .flat_map(|cg| {
1510 cg.level0()
1511 .sub_levels
1512 .iter()
1513 .chain(cg.levels.iter())
1514 .flat_map(|level| level.table_infos.iter().map(|t| (t.object_id, t.file_size)))
1515 })
1516 .map(|(object_id, size)| (HummockObjectId::Sstable(object_id), size))
1517 .chain(
1518 version
1519 .vector_indexes
1520 .values()
1521 .flat_map(|index| index.get_objects()),
1522 )
1523 .collect()
1524}
1525
1526pub fn validate_version(version: &HummockVersion) -> Vec<String> {
1529 let mut res = Vec::new();
1530 for (group_id, levels) in &version.levels {
1532 if levels.group_id != *group_id {
1534 res.push(format!(
1535 "GROUP {}: inconsistent group id {} in Levels",
1536 group_id, levels.group_id
1537 ));
1538 }
1539
1540 let validate_level = |group: CompactionGroupId,
1541 expected_level_idx: u32,
1542 level: &Level,
1543 res: &mut Vec<String>| {
1544 let mut level_identifier = format!("GROUP {} LEVEL {}", group, level.level_idx);
1545 if level.level_idx == 0 {
1546 level_identifier.push_str(format!("SUBLEVEL {}", level.sub_level_id).as_str());
1547 if level.table_infos.is_empty() {
1549 res.push(format!("{}: empty level", level_identifier));
1550 }
1551 } else if level.level_type != PbLevelType::Nonoverlapping {
1552 res.push(format!(
1554 "{}: level type {:?} is not non-overlapping",
1555 level_identifier, level.level_type
1556 ));
1557 }
1558
1559 if level.level_idx != expected_level_idx {
1561 res.push(format!(
1562 "{}: mismatched level idx {}",
1563 level_identifier, expected_level_idx
1564 ));
1565 }
1566
1567 let mut prev_table_info: Option<&SstableInfo> = None;
1568 for table_info in &level.table_infos {
1569 if !table_info.table_ids.is_sorted_by(|a, b| a < b) {
1571 res.push(format!(
1572 "{} SST {}: table_ids not sorted",
1573 level_identifier, table_info.object_id
1574 ));
1575 }
1576
1577 if level.level_type == PbLevelType::Nonoverlapping {
1579 if let Some(prev) = prev_table_info.take()
1580 && prev
1581 .key_range
1582 .compare_right_with(&table_info.key_range.left)
1583 != Ordering::Less
1584 {
1585 res.push(format!(
1586 "{} SST {}: key range should not overlap. prev={:?}, cur={:?}",
1587 level_identifier, table_info.object_id, prev, table_info
1588 ));
1589 }
1590 let _ = prev_table_info.insert(table_info);
1591 }
1592 }
1593 };
1594
1595 let l0 = &levels.l0;
1596 let mut prev_sub_level_id = u64::MAX;
1597 for sub_level in &l0.sub_levels {
1598 if sub_level.sub_level_id >= prev_sub_level_id {
1600 res.push(format!(
1601 "GROUP {} LEVEL 0: sub_level_id {} >= prev_sub_level {}",
1602 group_id, sub_level.level_idx, prev_sub_level_id
1603 ));
1604 }
1605 prev_sub_level_id = sub_level.sub_level_id;
1606
1607 validate_level(*group_id, 0, sub_level, &mut res);
1608 }
1609
1610 for idx in 1..=levels.levels.len() {
1611 validate_level(*group_id, idx as u32, levels.get_level(idx), &mut res);
1612 }
1613 }
1614 res
1615}
1616
1617#[cfg(test)]
1618mod tests {
1619 use std::collections::{HashMap, HashSet};
1620
1621 use bytes::Bytes;
1622 use risingwave_common::catalog::TableId;
1623 use risingwave_common::hash::VirtualNode;
1624 use risingwave_common::util::epoch::test_epoch;
1625 use risingwave_pb::hummock::{
1626 CompactionConfig, GroupConstruct, GroupDestroy, LevelType, StateTableInfo,
1627 };
1628
1629 use super::group_split;
1630 use crate::HummockVersionId;
1631 use crate::compaction_group::group_split::*;
1632 use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
1633 use crate::key::{FullKey, gen_key_from_str};
1634 use crate::key_range::KeyRange;
1635 use crate::level::{Level, Levels, OverlappingLevel};
1636 use crate::sstable_info::{SstableInfo, SstableInfoInner};
1637 use crate::version::{
1638 GroupDelta, GroupDeltas, HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo,
1639 IntraLevelDelta,
1640 };
1641
1642 fn gen_sstable_info(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfo {
1643 gen_sstable_info_impl(sst_id, table_ids, epoch).into()
1644 }
1645
1646 fn gen_sstable_info_impl(sst_id: u64, table_ids: Vec<u32>, epoch: u64) -> SstableInfoInner {
1647 let table_key_l = gen_key_from_str(VirtualNode::ZERO, "1");
1648 let table_key_r = gen_key_from_str(VirtualNode::MAX_FOR_TEST, "1");
1649 let full_key_l = FullKey::for_test(
1650 TableId::new(*table_ids.first().unwrap()),
1651 table_key_l,
1652 epoch,
1653 )
1654 .encode();
1655 let full_key_r =
1656 FullKey::for_test(TableId::new(*table_ids.last().unwrap()), table_key_r, epoch)
1657 .encode();
1658
1659 SstableInfoInner {
1660 sst_id: sst_id.into(),
1661 key_range: KeyRange {
1662 left: full_key_l.into(),
1663 right: full_key_r.into(),
1664 right_exclusive: false,
1665 },
1666 table_ids: table_ids.into_iter().map(Into::into).collect(),
1667 object_id: sst_id.into(),
1668 min_epoch: 20,
1669 max_epoch: 20,
1670 file_size: 100,
1671 sst_size: 100,
1672 ..Default::default()
1673 }
1674 }
1675
1676 #[test]
1677 fn test_get_sst_object_ids() {
1678 let mut version = HummockVersion {
1679 id: HummockVersionId::new(0),
1680 levels: HashMap::from_iter([(
1681 0.into(),
1682 Levels {
1683 levels: vec![],
1684 l0: OverlappingLevel {
1685 sub_levels: vec![],
1686 total_file_size: 0,
1687 uncompressed_file_size: 0,
1688 },
1689 ..Default::default()
1690 },
1691 )]),
1692 ..Default::default()
1693 };
1694 assert_eq!(version.get_object_ids().count(), 0);
1695
1696 version
1698 .levels
1699 .get_mut(&0)
1700 .unwrap()
1701 .l0
1702 .sub_levels
1703 .push(Level {
1704 table_infos: vec![
1705 SstableInfoInner {
1706 object_id: 11.into(),
1707 sst_id: 11.into(),
1708 ..Default::default()
1709 }
1710 .into(),
1711 ],
1712 ..Default::default()
1713 });
1714 assert_eq!(version.get_object_ids().count(), 1);
1715
1716 version.levels.get_mut(&0).unwrap().levels.push(Level {
1718 table_infos: vec![
1719 SstableInfoInner {
1720 object_id: 22.into(),
1721 sst_id: 22.into(),
1722 ..Default::default()
1723 }
1724 .into(),
1725 ],
1726 ..Default::default()
1727 });
1728 assert_eq!(version.get_object_ids().count(), 2);
1729 }
1730
1731 #[test]
1732 fn test_apply_version_delta() {
1733 let mut version = HummockVersion {
1734 id: HummockVersionId::new(0),
1735 levels: HashMap::from_iter([
1736 (
1737 0.into(),
1738 build_initial_compaction_group_levels(
1739 0,
1740 &CompactionConfig {
1741 max_level: 6,
1742 ..Default::default()
1743 },
1744 ),
1745 ),
1746 (
1747 1.into(),
1748 build_initial_compaction_group_levels(
1749 1,
1750 &CompactionConfig {
1751 max_level: 6,
1752 ..Default::default()
1753 },
1754 ),
1755 ),
1756 ]),
1757 ..Default::default()
1758 };
1759 let version_delta = HummockVersionDelta {
1760 id: HummockVersionId::new(1),
1761 group_deltas: HashMap::from_iter([
1762 (
1763 2.into(),
1764 GroupDeltas {
1765 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(GroupConstruct {
1766 group_config: Some(CompactionConfig {
1767 max_level: 6,
1768 ..Default::default()
1769 }),
1770 ..Default::default()
1771 }))],
1772 },
1773 ),
1774 (
1775 0.into(),
1776 GroupDeltas {
1777 group_deltas: vec![GroupDelta::GroupDestroy(GroupDestroy {})],
1778 },
1779 ),
1780 (
1781 1.into(),
1782 GroupDeltas {
1783 group_deltas: vec![GroupDelta::IntraLevel(IntraLevelDelta::new(
1784 1,
1785 0,
1786 HashSet::new(),
1787 vec![
1788 SstableInfoInner {
1789 object_id: 1.into(),
1790 sst_id: 1.into(),
1791 ..Default::default()
1792 }
1793 .into(),
1794 ],
1795 0,
1796 version
1797 .levels
1798 .get(&1)
1799 .as_ref()
1800 .unwrap()
1801 .compaction_group_version_id,
1802 ))],
1803 },
1804 ),
1805 ]),
1806 ..Default::default()
1807 };
1808 let version_delta = version_delta;
1809
1810 version.apply_version_delta(&version_delta);
1811 let mut cg1 = build_initial_compaction_group_levels(
1812 1,
1813 &CompactionConfig {
1814 max_level: 6,
1815 ..Default::default()
1816 },
1817 );
1818 cg1.levels[0] = Level {
1819 level_idx: 1,
1820 level_type: LevelType::Nonoverlapping,
1821 table_infos: vec![
1822 SstableInfoInner {
1823 object_id: 1.into(),
1824 sst_id: 1.into(),
1825 ..Default::default()
1826 }
1827 .into(),
1828 ],
1829 ..Default::default()
1830 };
1831 assert_eq!(
1832 version,
1833 HummockVersion {
1834 id: HummockVersionId::new(1),
1835 levels: HashMap::from_iter([
1836 (
1837 2.into(),
1838 build_initial_compaction_group_levels(
1839 2,
1840 &CompactionConfig {
1841 max_level: 6,
1842 ..Default::default()
1843 },
1844 ),
1845 ),
1846 (1.into(), cg1),
1847 ]),
1848 ..Default::default()
1849 }
1850 );
1851 }
1852
1853 fn gen_sst_info(object_id: u64, table_ids: Vec<u32>, left: Bytes, right: Bytes) -> SstableInfo {
1854 gen_sst_info_impl(object_id, table_ids, left, right).into()
1855 }
1856
1857 fn gen_sst_info_impl(
1858 object_id: u64,
1859 table_ids: Vec<u32>,
1860 left: Bytes,
1861 right: Bytes,
1862 ) -> SstableInfoInner {
1863 SstableInfoInner {
1864 object_id: object_id.into(),
1865 sst_id: object_id.into(),
1866 key_range: KeyRange {
1867 left,
1868 right,
1869 right_exclusive: false,
1870 },
1871 table_ids: table_ids.into_iter().map(Into::into).collect(),
1872 file_size: 100,
1873 sst_size: 100,
1874 uncompressed_file_size: 100,
1875 ..Default::default()
1876 }
1877 }
1878
1879 #[test]
1880 fn test_merge_levels() {
1881 let mut left_levels = build_initial_compaction_group_levels(
1882 1,
1883 &CompactionConfig {
1884 max_level: 6,
1885 ..Default::default()
1886 },
1887 );
1888
1889 let mut right_levels = build_initial_compaction_group_levels(
1890 2,
1891 &CompactionConfig {
1892 max_level: 6,
1893 ..Default::default()
1894 },
1895 );
1896
1897 left_levels.levels[0] = Level {
1898 level_idx: 1,
1899 level_type: LevelType::Nonoverlapping,
1900 table_infos: vec![
1901 gen_sst_info(
1902 1,
1903 vec![3],
1904 FullKey::for_test(
1905 TableId::new(3),
1906 gen_key_from_str(VirtualNode::from_index(1), "1"),
1907 0,
1908 )
1909 .encode()
1910 .into(),
1911 FullKey::for_test(
1912 TableId::new(3),
1913 gen_key_from_str(VirtualNode::from_index(200), "1"),
1914 0,
1915 )
1916 .encode()
1917 .into(),
1918 ),
1919 gen_sst_info(
1920 10,
1921 vec![3, 4],
1922 FullKey::for_test(
1923 TableId::new(3),
1924 gen_key_from_str(VirtualNode::from_index(201), "1"),
1925 0,
1926 )
1927 .encode()
1928 .into(),
1929 FullKey::for_test(
1930 TableId::new(4),
1931 gen_key_from_str(VirtualNode::from_index(10), "1"),
1932 0,
1933 )
1934 .encode()
1935 .into(),
1936 ),
1937 gen_sst_info(
1938 11,
1939 vec![4],
1940 FullKey::for_test(
1941 TableId::new(4),
1942 gen_key_from_str(VirtualNode::from_index(11), "1"),
1943 0,
1944 )
1945 .encode()
1946 .into(),
1947 FullKey::for_test(
1948 TableId::new(4),
1949 gen_key_from_str(VirtualNode::from_index(200), "1"),
1950 0,
1951 )
1952 .encode()
1953 .into(),
1954 ),
1955 ],
1956 total_file_size: 300,
1957 ..Default::default()
1958 };
1959
1960 left_levels.l0.sub_levels.push(Level {
1961 level_idx: 0,
1962 table_infos: vec![gen_sst_info(
1963 3,
1964 vec![3],
1965 FullKey::for_test(
1966 TableId::new(3),
1967 gen_key_from_str(VirtualNode::from_index(1), "1"),
1968 0,
1969 )
1970 .encode()
1971 .into(),
1972 FullKey::for_test(
1973 TableId::new(3),
1974 gen_key_from_str(VirtualNode::from_index(200), "1"),
1975 0,
1976 )
1977 .encode()
1978 .into(),
1979 )],
1980 sub_level_id: 101,
1981 level_type: LevelType::Overlapping,
1982 total_file_size: 100,
1983 ..Default::default()
1984 });
1985
1986 left_levels.l0.sub_levels.push(Level {
1987 level_idx: 0,
1988 table_infos: vec![gen_sst_info(
1989 3,
1990 vec![3],
1991 FullKey::for_test(
1992 TableId::new(3),
1993 gen_key_from_str(VirtualNode::from_index(1), "1"),
1994 0,
1995 )
1996 .encode()
1997 .into(),
1998 FullKey::for_test(
1999 TableId::new(3),
2000 gen_key_from_str(VirtualNode::from_index(200), "1"),
2001 0,
2002 )
2003 .encode()
2004 .into(),
2005 )],
2006 sub_level_id: 103,
2007 level_type: LevelType::Overlapping,
2008 total_file_size: 100,
2009 ..Default::default()
2010 });
2011
2012 left_levels.l0.sub_levels.push(Level {
2013 level_idx: 0,
2014 table_infos: vec![gen_sst_info(
2015 3,
2016 vec![3],
2017 FullKey::for_test(
2018 TableId::new(3),
2019 gen_key_from_str(VirtualNode::from_index(1), "1"),
2020 0,
2021 )
2022 .encode()
2023 .into(),
2024 FullKey::for_test(
2025 TableId::new(3),
2026 gen_key_from_str(VirtualNode::from_index(200), "1"),
2027 0,
2028 )
2029 .encode()
2030 .into(),
2031 )],
2032 sub_level_id: 105,
2033 level_type: LevelType::Nonoverlapping,
2034 total_file_size: 100,
2035 ..Default::default()
2036 });
2037
2038 right_levels.levels[0] = Level {
2039 level_idx: 1,
2040 level_type: LevelType::Nonoverlapping,
2041 table_infos: vec![
2042 gen_sst_info(
2043 1,
2044 vec![5],
2045 FullKey::for_test(
2046 TableId::new(5),
2047 gen_key_from_str(VirtualNode::from_index(1), "1"),
2048 0,
2049 )
2050 .encode()
2051 .into(),
2052 FullKey::for_test(
2053 TableId::new(5),
2054 gen_key_from_str(VirtualNode::from_index(200), "1"),
2055 0,
2056 )
2057 .encode()
2058 .into(),
2059 ),
2060 gen_sst_info(
2061 10,
2062 vec![5, 6],
2063 FullKey::for_test(
2064 TableId::new(5),
2065 gen_key_from_str(VirtualNode::from_index(201), "1"),
2066 0,
2067 )
2068 .encode()
2069 .into(),
2070 FullKey::for_test(
2071 TableId::new(6),
2072 gen_key_from_str(VirtualNode::from_index(10), "1"),
2073 0,
2074 )
2075 .encode()
2076 .into(),
2077 ),
2078 gen_sst_info(
2079 11,
2080 vec![6],
2081 FullKey::for_test(
2082 TableId::new(6),
2083 gen_key_from_str(VirtualNode::from_index(11), "1"),
2084 0,
2085 )
2086 .encode()
2087 .into(),
2088 FullKey::for_test(
2089 TableId::new(6),
2090 gen_key_from_str(VirtualNode::from_index(200), "1"),
2091 0,
2092 )
2093 .encode()
2094 .into(),
2095 ),
2096 ],
2097 total_file_size: 300,
2098 ..Default::default()
2099 };
2100
2101 right_levels.l0.sub_levels.push(Level {
2102 level_idx: 0,
2103 table_infos: vec![gen_sst_info(
2104 3,
2105 vec![5],
2106 FullKey::for_test(
2107 TableId::new(5),
2108 gen_key_from_str(VirtualNode::from_index(1), "1"),
2109 0,
2110 )
2111 .encode()
2112 .into(),
2113 FullKey::for_test(
2114 TableId::new(5),
2115 gen_key_from_str(VirtualNode::from_index(200), "1"),
2116 0,
2117 )
2118 .encode()
2119 .into(),
2120 )],
2121 sub_level_id: 101,
2122 level_type: LevelType::Overlapping,
2123 total_file_size: 100,
2124 ..Default::default()
2125 });
2126
2127 right_levels.l0.sub_levels.push(Level {
2128 level_idx: 0,
2129 table_infos: vec![gen_sst_info(
2130 5,
2131 vec![5],
2132 FullKey::for_test(
2133 TableId::new(5),
2134 gen_key_from_str(VirtualNode::from_index(1), "1"),
2135 0,
2136 )
2137 .encode()
2138 .into(),
2139 FullKey::for_test(
2140 TableId::new(5),
2141 gen_key_from_str(VirtualNode::from_index(200), "1"),
2142 0,
2143 )
2144 .encode()
2145 .into(),
2146 )],
2147 sub_level_id: 102,
2148 level_type: LevelType::Overlapping,
2149 total_file_size: 100,
2150 ..Default::default()
2151 });
2152
2153 right_levels.l0.sub_levels.push(Level {
2154 level_idx: 0,
2155 table_infos: vec![gen_sst_info(
2156 3,
2157 vec![5],
2158 FullKey::for_test(
2159 TableId::new(5),
2160 gen_key_from_str(VirtualNode::from_index(1), "1"),
2161 0,
2162 )
2163 .encode()
2164 .into(),
2165 FullKey::for_test(
2166 TableId::new(5),
2167 gen_key_from_str(VirtualNode::from_index(200), "1"),
2168 0,
2169 )
2170 .encode()
2171 .into(),
2172 )],
2173 sub_level_id: 103,
2174 level_type: LevelType::Nonoverlapping,
2175 total_file_size: 100,
2176 ..Default::default()
2177 });
2178
2179 {
2180 let mut left_levels = Levels::default();
2182 let right_levels = Levels::default();
2183
2184 group_split::merge_levels(&mut left_levels, right_levels);
2185 }
2186
2187 {
2188 let mut left_levels = build_initial_compaction_group_levels(
2190 1,
2191 &CompactionConfig {
2192 max_level: 6,
2193 ..Default::default()
2194 },
2195 );
2196 let right_levels = right_levels.clone();
2197
2198 group_split::merge_levels(&mut left_levels, right_levels);
2199
2200 assert!(left_levels.l0.sub_levels.len() == 3);
2201 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2202 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2203 assert!(left_levels.l0.sub_levels[1].sub_level_id == 102);
2204 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2205 assert!(left_levels.l0.sub_levels[2].sub_level_id == 103);
2206 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2207
2208 assert!(left_levels.levels[0].level_idx == 1);
2209 assert_eq!(300, left_levels.levels[0].total_file_size);
2210 }
2211
2212 {
2213 let mut left_levels = left_levels.clone();
2215 let right_levels = build_initial_compaction_group_levels(
2216 2,
2217 &CompactionConfig {
2218 max_level: 6,
2219 ..Default::default()
2220 },
2221 );
2222
2223 group_split::merge_levels(&mut left_levels, right_levels);
2224
2225 assert!(left_levels.l0.sub_levels.len() == 3);
2226 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2227 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2228 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2229 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2230 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2231 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2232
2233 assert!(left_levels.levels[0].level_idx == 1);
2234 assert_eq!(300, left_levels.levels[0].total_file_size);
2235 }
2236
2237 {
2238 let mut left_levels = left_levels.clone();
2239 let right_levels = right_levels.clone();
2240
2241 group_split::merge_levels(&mut left_levels, right_levels);
2242
2243 assert!(left_levels.l0.sub_levels.len() == 6);
2244 assert!(left_levels.l0.sub_levels[0].sub_level_id == 101);
2245 assert_eq!(100, left_levels.l0.sub_levels[0].total_file_size);
2246 assert!(left_levels.l0.sub_levels[1].sub_level_id == 103);
2247 assert_eq!(100, left_levels.l0.sub_levels[1].total_file_size);
2248 assert!(left_levels.l0.sub_levels[2].sub_level_id == 105);
2249 assert_eq!(100, left_levels.l0.sub_levels[2].total_file_size);
2250 assert!(left_levels.l0.sub_levels[3].sub_level_id == 106);
2251 assert_eq!(100, left_levels.l0.sub_levels[3].total_file_size);
2252 assert!(left_levels.l0.sub_levels[4].sub_level_id == 107);
2253 assert_eq!(100, left_levels.l0.sub_levels[4].total_file_size);
2254 assert!(left_levels.l0.sub_levels[5].sub_level_id == 108);
2255 assert_eq!(100, left_levels.l0.sub_levels[5].total_file_size);
2256
2257 assert!(left_levels.levels[0].level_idx == 1);
2258 assert_eq!(600, left_levels.levels[0].total_file_size);
2259 }
2260 }
2261
2262 #[test]
2263 fn test_get_split_pos() {
2264 let epoch = test_epoch(1);
2265 let s1 = gen_sstable_info(1, vec![1, 2], epoch);
2266 let s2 = gen_sstable_info(2, vec![3, 4, 5], epoch);
2267 let s3 = gen_sstable_info(3, vec![6, 7], epoch);
2268
2269 let ssts = vec![s1, s2, s3];
2270 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2271
2272 let pos = group_split::get_split_pos(&ssts, split_key.clone());
2273 assert_eq!(1, pos);
2274
2275 let pos = group_split::get_split_pos(&vec![], split_key);
2276 assert_eq!(0, pos);
2277 }
2278
2279 #[test]
2280 fn test_split_sst() {
2281 let epoch = test_epoch(1);
2282 let sst = gen_sstable_info(1, vec![1, 2, 3, 5], epoch);
2283
2284 {
2285 let split_key = group_split::build_split_key(3.into(), VirtualNode::ZERO);
2286 let origin_sst = sst.clone();
2287 let sst_size = origin_sst.sst_size;
2288 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2289 assert_eq!(SstSplitType::Both, split_type);
2290
2291 let mut new_sst_id = 10.into();
2292 let (origin_sst, branched_sst) = group_split::split_sst(
2293 origin_sst,
2294 &mut new_sst_id,
2295 split_key,
2296 sst_size / 2,
2297 sst_size / 2,
2298 );
2299
2300 let origin_sst = origin_sst.unwrap();
2301 let branched_sst = branched_sst.unwrap();
2302
2303 assert!(origin_sst.key_range.right_exclusive);
2304 assert!(
2305 origin_sst
2306 .key_range
2307 .right
2308 .cmp(&branched_sst.key_range.left)
2309 .is_le()
2310 );
2311 assert!(origin_sst.table_ids.is_sorted());
2312 assert!(branched_sst.table_ids.is_sorted());
2313 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2314 assert!(branched_sst.sst_size < origin_sst.file_size);
2315 assert_eq!(10, branched_sst.sst_id);
2316 assert_eq!(11, origin_sst.sst_id);
2317 assert_eq!(3, branched_sst.table_ids.first().unwrap().as_raw_id()); }
2319
2320 {
2321 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2323 let origin_sst = sst.clone();
2324 let sst_size = origin_sst.sst_size;
2325 let split_type = group_split::need_to_split(&origin_sst, split_key.clone());
2326 assert_eq!(SstSplitType::Both, split_type);
2327
2328 let mut new_sst_id = 10.into();
2329 let (origin_sst, branched_sst) = group_split::split_sst(
2330 origin_sst,
2331 &mut new_sst_id,
2332 split_key,
2333 sst_size / 2,
2334 sst_size / 2,
2335 );
2336
2337 let origin_sst = origin_sst.unwrap();
2338 let branched_sst = branched_sst.unwrap();
2339
2340 assert!(origin_sst.key_range.right_exclusive);
2341 assert!(origin_sst.key_range.right.le(&branched_sst.key_range.left));
2342 assert!(origin_sst.table_ids.is_sorted());
2343 assert!(branched_sst.table_ids.is_sorted());
2344 assert!(origin_sst.table_ids.last().unwrap() < branched_sst.table_ids.first().unwrap());
2345 assert!(branched_sst.sst_size < origin_sst.file_size);
2346 assert_eq!(10, branched_sst.sst_id);
2347 assert_eq!(11, origin_sst.sst_id);
2348 assert_eq!(5, branched_sst.table_ids.first().unwrap().as_raw_id()); }
2350
2351 {
2352 let split_key = group_split::build_split_key(6.into(), VirtualNode::ZERO);
2353 let split_type = group_split::need_to_split(&sst, split_key);
2354 assert_eq!(SstSplitType::Left, split_type);
2355 }
2356
2357 {
2358 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2359 let origin_sst = sst.clone();
2360 let split_type = group_split::need_to_split(&origin_sst, split_key);
2361 assert_eq!(SstSplitType::Both, split_type);
2362
2363 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2364 let origin_sst = sst;
2365 let split_type = group_split::need_to_split(&origin_sst, split_key);
2366 assert_eq!(SstSplitType::Right, split_type);
2367 }
2368
2369 {
2370 let mut sst = gen_sstable_info_impl(1, vec![1], epoch);
2372 sst.key_range.right = sst.key_range.left.clone();
2373 let sst: SstableInfo = sst.into();
2374 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2375 let origin_sst = sst;
2376 let sst_size = origin_sst.sst_size;
2377
2378 let mut new_sst_id = 10.into();
2379 let (origin_sst, branched_sst) = group_split::split_sst(
2380 origin_sst,
2381 &mut new_sst_id,
2382 split_key,
2383 sst_size / 2,
2384 sst_size / 2,
2385 );
2386
2387 assert!(origin_sst.is_none());
2388 assert!(branched_sst.is_some());
2389 }
2390 }
2391
2392 #[test]
2393 fn test_split_sst_info_for_level() {
2394 let mut version = HummockVersion {
2395 id: HummockVersionId::new(0),
2396 levels: HashMap::from_iter([(
2397 1.into(),
2398 build_initial_compaction_group_levels(
2399 1,
2400 &CompactionConfig {
2401 max_level: 6,
2402 ..Default::default()
2403 },
2404 ),
2405 )]),
2406 ..Default::default()
2407 };
2408
2409 let cg1 = version.levels.get_mut(&1).unwrap();
2410
2411 cg1.levels[0] = Level {
2412 level_idx: 1,
2413 level_type: LevelType::Nonoverlapping,
2414 table_infos: vec![
2415 gen_sst_info(
2416 1,
2417 vec![3],
2418 FullKey::for_test(
2419 TableId::new(3),
2420 gen_key_from_str(VirtualNode::from_index(1), "1"),
2421 0,
2422 )
2423 .encode()
2424 .into(),
2425 FullKey::for_test(
2426 TableId::new(3),
2427 gen_key_from_str(VirtualNode::from_index(200), "1"),
2428 0,
2429 )
2430 .encode()
2431 .into(),
2432 ),
2433 gen_sst_info(
2434 10,
2435 vec![3, 4],
2436 FullKey::for_test(
2437 TableId::new(3),
2438 gen_key_from_str(VirtualNode::from_index(201), "1"),
2439 0,
2440 )
2441 .encode()
2442 .into(),
2443 FullKey::for_test(
2444 TableId::new(4),
2445 gen_key_from_str(VirtualNode::from_index(10), "1"),
2446 0,
2447 )
2448 .encode()
2449 .into(),
2450 ),
2451 gen_sst_info(
2452 11,
2453 vec![4],
2454 FullKey::for_test(
2455 TableId::new(4),
2456 gen_key_from_str(VirtualNode::from_index(11), "1"),
2457 0,
2458 )
2459 .encode()
2460 .into(),
2461 FullKey::for_test(
2462 TableId::new(4),
2463 gen_key_from_str(VirtualNode::from_index(200), "1"),
2464 0,
2465 )
2466 .encode()
2467 .into(),
2468 ),
2469 ],
2470 total_file_size: 300,
2471 ..Default::default()
2472 };
2473
2474 cg1.l0.sub_levels.push(Level {
2475 level_idx: 0,
2476 table_infos: vec![
2477 gen_sst_info(
2478 2,
2479 vec![2],
2480 FullKey::for_test(
2481 TableId::new(0),
2482 gen_key_from_str(VirtualNode::from_index(1), "1"),
2483 0,
2484 )
2485 .encode()
2486 .into(),
2487 FullKey::for_test(
2488 TableId::new(2),
2489 gen_key_from_str(VirtualNode::from_index(200), "1"),
2490 0,
2491 )
2492 .encode()
2493 .into(),
2494 ),
2495 gen_sst_info(
2496 22,
2497 vec![2],
2498 FullKey::for_test(
2499 TableId::new(0),
2500 gen_key_from_str(VirtualNode::from_index(1), "1"),
2501 0,
2502 )
2503 .encode()
2504 .into(),
2505 FullKey::for_test(
2506 TableId::new(2),
2507 gen_key_from_str(VirtualNode::from_index(200), "1"),
2508 0,
2509 )
2510 .encode()
2511 .into(),
2512 ),
2513 gen_sst_info(
2514 23,
2515 vec![2],
2516 FullKey::for_test(
2517 TableId::new(0),
2518 gen_key_from_str(VirtualNode::from_index(1), "1"),
2519 0,
2520 )
2521 .encode()
2522 .into(),
2523 FullKey::for_test(
2524 TableId::new(2),
2525 gen_key_from_str(VirtualNode::from_index(200), "1"),
2526 0,
2527 )
2528 .encode()
2529 .into(),
2530 ),
2531 gen_sst_info(
2532 24,
2533 vec![2],
2534 FullKey::for_test(
2535 TableId::new(2),
2536 gen_key_from_str(VirtualNode::from_index(1), "1"),
2537 0,
2538 )
2539 .encode()
2540 .into(),
2541 FullKey::for_test(
2542 TableId::new(2),
2543 gen_key_from_str(VirtualNode::from_index(200), "1"),
2544 0,
2545 )
2546 .encode()
2547 .into(),
2548 ),
2549 gen_sst_info(
2550 25,
2551 vec![2],
2552 FullKey::for_test(
2553 TableId::new(0),
2554 gen_key_from_str(VirtualNode::from_index(1), "1"),
2555 0,
2556 )
2557 .encode()
2558 .into(),
2559 FullKey::for_test(
2560 TableId::new(0),
2561 gen_key_from_str(VirtualNode::from_index(200), "1"),
2562 0,
2563 )
2564 .encode()
2565 .into(),
2566 ),
2567 ],
2568 sub_level_id: 101,
2569 level_type: LevelType::Overlapping,
2570 total_file_size: 300,
2571 ..Default::default()
2572 });
2573
2574 {
2575 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2577
2578 let mut new_sst_id = 100.into();
2579 let x = group_split::split_sst_info_for_level_v2(
2580 &mut cg1.l0.sub_levels[0],
2581 &mut new_sst_id,
2582 split_key,
2583 );
2584 let mut right_l0 = OverlappingLevel {
2593 sub_levels: vec![],
2594 total_file_size: 0,
2595 uncompressed_file_size: 0,
2596 };
2597
2598 right_l0.sub_levels.push(Level {
2599 level_idx: 0,
2600 table_infos: x,
2601 sub_level_id: 101,
2602 total_file_size: 100,
2603 level_type: LevelType::Overlapping,
2604 ..Default::default()
2605 });
2606
2607 let right_levels = Levels {
2608 levels: vec![],
2609 l0: right_l0,
2610 ..Default::default()
2611 };
2612
2613 merge_levels(cg1, right_levels);
2614 }
2615
2616 {
2617 let mut new_sst_id = 100.into();
2619 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2620 let x = group_split::split_sst_info_for_level_v2(
2621 &mut cg1.levels[2],
2622 &mut new_sst_id,
2623 split_key,
2624 );
2625
2626 assert!(x.is_empty());
2627 }
2628
2629 {
2630 let mut cg1 = cg1.clone();
2632 let split_key = group_split::build_split_key(1.into(), VirtualNode::ZERO);
2633
2634 let mut new_sst_id = 100.into();
2635 let x = group_split::split_sst_info_for_level_v2(
2636 &mut cg1.levels[0],
2637 &mut new_sst_id,
2638 split_key,
2639 );
2640
2641 assert_eq!(3, x.len());
2642 assert_eq!(1, x[0].sst_id);
2643 assert_eq!(100, x[0].sst_size);
2644 assert_eq!(10, x[1].sst_id);
2645 assert_eq!(100, x[1].sst_size);
2646 assert_eq!(11, x[2].sst_id);
2647 assert_eq!(100, x[2].sst_size);
2648
2649 assert_eq!(0, cg1.levels[0].table_infos.len());
2650 }
2651
2652 {
2653 let mut cg1 = cg1.clone();
2655 let split_key = group_split::build_split_key(5.into(), VirtualNode::ZERO);
2656
2657 let mut new_sst_id = 100.into();
2658 let x = group_split::split_sst_info_for_level_v2(
2659 &mut cg1.levels[0],
2660 &mut new_sst_id,
2661 split_key,
2662 );
2663
2664 assert_eq!(0, x.len());
2665 assert_eq!(3, cg1.levels[0].table_infos.len());
2666 }
2667
2668 {
2694 let mut cg1 = cg1.clone();
2696 let split_key = group_split::build_split_key(4.into(), VirtualNode::ZERO);
2697
2698 let mut new_sst_id = 100.into();
2699 let x = group_split::split_sst_info_for_level_v2(
2700 &mut cg1.levels[0],
2701 &mut new_sst_id,
2702 split_key,
2703 );
2704
2705 assert_eq!(2, x.len());
2706 assert_eq!(100, x[0].sst_id);
2707 assert_eq!(100 / 2, x[0].sst_size);
2708 assert_eq!(11, x[1].sst_id);
2709 assert_eq!(100, x[1].sst_size);
2710 assert_eq!(vec![TableId::new(4)], x[1].table_ids);
2711
2712 assert_eq!(2, cg1.levels[0].table_infos.len());
2713 assert_eq!(101, cg1.levels[0].table_infos[1].sst_id);
2714 assert_eq!(100 / 2, cg1.levels[0].table_infos[1].sst_size);
2715 assert_eq!(
2716 vec![TableId::new(3)],
2717 cg1.levels[0].table_infos[1].table_ids
2718 );
2719 }
2720 }
2721
2722 fn make_sst(sst_id: u64, table_ids: Vec<u32>, sst_size: u64) -> SstableInfo {
2723 SstableInfoInner {
2724 sst_id: sst_id.into(),
2725 object_id: sst_id.into(),
2726 table_ids: table_ids.into_iter().map(TableId::new).collect(),
2727 file_size: sst_size,
2728 sst_size,
2729 uncompressed_file_size: sst_size * 2,
2730 ..Default::default()
2731 }
2732 .into()
2733 }
2734
2735 #[test]
2736 fn test_level_normalize() {
2737 let mut level = Level {
2739 level_idx: 1,
2740 level_type: LevelType::Nonoverlapping,
2741 table_infos: vec![
2742 make_sst(1, vec![1, 2], 100),
2743 make_sst(2, vec![], 200), make_sst(3, vec![3], 300),
2745 make_sst(4, vec![], 400), ],
2747 total_file_size: 9999, uncompressed_file_size: 9999,
2749 ..Default::default()
2750 };
2751
2752 level.normalize();
2753
2754 assert_eq!(level.table_infos.len(), 2);
2755 assert_eq!(1, level.table_infos[0].sst_id);
2756 assert_eq!(3, level.table_infos[1].sst_id);
2757 assert_eq!(level.total_file_size, 400);
2758 assert_eq!(level.uncompressed_file_size, 800);
2759
2760 level.total_file_size = 0;
2762 level.uncompressed_file_size = 0;
2763 level.normalize();
2764 assert_eq!(level.table_infos.len(), 2);
2765 assert_eq!(level.total_file_size, 400);
2766 assert_eq!(level.uncompressed_file_size, 800);
2767
2768 level.table_infos = vec![make_sst(10, vec![], 100), make_sst(11, vec![], 200)];
2770 level.normalize();
2771 assert!(level.table_infos.is_empty());
2772 assert_eq!(level.total_file_size, 0);
2773 assert_eq!(level.uncompressed_file_size, 0);
2774 }
2775
2776 #[test]
2777 fn test_level_delete_ssts() {
2778 let mut level = Level {
2779 level_idx: 1,
2780 level_type: LevelType::Nonoverlapping,
2781 table_infos: vec![
2782 make_sst(1, vec![1], 100),
2783 make_sst(2, vec![2], 200),
2784 make_sst(3, vec![3], 300),
2785 ],
2786 total_file_size: 600,
2787 uncompressed_file_size: 1200,
2788 ..Default::default()
2789 };
2790
2791 let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([2.into()]);
2792 let changed = level.delete_ssts(&delete_ids);
2793
2794 assert!(changed);
2795 assert_eq!(level.table_infos.len(), 2);
2796 assert_eq!(1, level.table_infos[0].sst_id);
2797 assert_eq!(3, level.table_infos[1].sst_id);
2798 assert_eq!(level.total_file_size, 400);
2799 assert_eq!(level.uncompressed_file_size, 800);
2800
2801 let delete_ids: HashSet<crate::HummockSstableId> = HashSet::from([999.into()]);
2803 let changed = level.delete_ssts(&delete_ids);
2804 assert!(!changed);
2805 assert_eq!(level.table_infos.len(), 2);
2806 }
2807
2808 #[test]
2809 fn test_level_prune_table_ids_from_ssts() {
2810 let mut level = Level {
2811 level_idx: 1,
2812 level_type: LevelType::Nonoverlapping,
2813 table_infos: vec![
2814 make_sst(1, vec![1, 2], 100),
2815 make_sst(2, vec![2, 3], 200),
2816 make_sst(3, vec![2], 300),
2817 ],
2818 total_file_size: 600,
2819 uncompressed_file_size: 1200,
2820 ..Default::default()
2821 };
2822
2823 let pruned_table_ids = HashSet::from([TableId::new(2)]);
2824 level.prune_table_ids_from_ssts(&pruned_table_ids);
2825
2826 assert_eq!(level.table_infos.len(), 2);
2828 assert_eq!(level.table_infos[0].table_ids, vec![TableId::new(1)]);
2829 assert_eq!(level.table_infos[1].table_ids, vec![TableId::new(3)]);
2830 assert_eq!(level.total_file_size, 100 + 200);
2831 assert_eq!(level.uncompressed_file_size, 200 + 400);
2832
2833 let pruned_table_ids = HashSet::from([TableId::new(1), TableId::new(3)]);
2835 level.prune_table_ids_from_ssts(&pruned_table_ids);
2836 assert!(level.table_infos.is_empty());
2837 assert_eq!(level.total_file_size, 0);
2838 assert_eq!(level.uncompressed_file_size, 0);
2839 }
2840
2841 #[test]
2842 fn test_overlapping_level_normalize() {
2843 let mut l0 = OverlappingLevel {
2844 sub_levels: vec![
2845 Level {
2846 level_idx: 0,
2847 table_infos: vec![make_sst(1, vec![1], 100)],
2848 total_file_size: 100,
2849 uncompressed_file_size: 200,
2850 sub_level_id: 1,
2851 ..Default::default()
2852 },
2853 Level {
2854 level_idx: 0,
2855 table_infos: vec![], total_file_size: 0,
2857 uncompressed_file_size: 0,
2858 sub_level_id: 2,
2859 ..Default::default()
2860 },
2861 Level {
2862 level_idx: 0,
2863 table_infos: vec![make_sst(3, vec![3], 300)],
2864 total_file_size: 300,
2865 uncompressed_file_size: 600,
2866 sub_level_id: 3,
2867 ..Default::default()
2868 },
2869 ],
2870 total_file_size: 9999, uncompressed_file_size: 9999,
2872 };
2873
2874 l0.normalize();
2875
2876 assert_eq!(l0.sub_levels.len(), 2);
2877 assert_eq!(l0.sub_levels[0].sub_level_id, 1);
2878 assert_eq!(l0.sub_levels[1].sub_level_id, 3);
2879 assert_eq!(l0.total_file_size, 100 + 300);
2880 assert_eq!(l0.uncompressed_file_size, 200 + 600);
2881
2882 l0.sub_levels = vec![Level {
2884 level_idx: 0,
2885 table_infos: vec![],
2886 ..Default::default()
2887 }];
2888 l0.normalize();
2889 assert!(l0.sub_levels.is_empty());
2890 assert_eq!(l0.total_file_size, 0);
2891 assert_eq!(l0.uncompressed_file_size, 0);
2892 }
2893
2894 #[test]
2895 fn test_levels_prune_table_ids_from_ssts() {
2896 #[expect(deprecated)]
2897 let mut levels = Levels {
2898 l0: OverlappingLevel {
2899 sub_levels: vec![
2900 Level {
2901 level_idx: 0,
2902 table_infos: vec![
2903 make_sst(1, vec![10], 100), make_sst(2, vec![20], 200), ],
2906 total_file_size: 300,
2907 uncompressed_file_size: 600,
2908 sub_level_id: 1,
2909 ..Default::default()
2910 },
2911 Level {
2912 level_idx: 0,
2913 table_infos: vec![
2914 make_sst(3, vec![10], 150), ],
2916 total_file_size: 150,
2917 uncompressed_file_size: 300,
2918 sub_level_id: 2,
2919 ..Default::default()
2920 },
2921 ],
2922 total_file_size: 450,
2923 uncompressed_file_size: 900,
2924 },
2925 levels: vec![Level {
2926 level_idx: 1,
2927 level_type: LevelType::Nonoverlapping,
2928 table_infos: vec![
2929 make_sst(4, vec![10, 20], 400), make_sst(5, vec![10], 500), ],
2932 total_file_size: 900,
2933 uncompressed_file_size: 1800,
2934 ..Default::default()
2935 }],
2936 group_id: 1.into(),
2937 parent_group_id: 0.into(),
2938 member_table_ids: vec![],
2939 compaction_group_version_id: 0,
2940 };
2941
2942 levels.prune_table_ids_from_ssts(&HashSet::from([TableId::new(10)]));
2944
2945 assert_eq!(levels.l0.sub_levels.len(), 1);
2946 assert_eq!(levels.l0.sub_levels[0].sub_level_id, 1);
2947 assert_eq!(levels.l0.sub_levels[0].table_infos.len(), 1);
2948 assert_eq!(2, levels.l0.sub_levels[0].table_infos[0].sst_id);
2949 assert_eq!(levels.l0.sub_levels[0].total_file_size, 200);
2950 assert_eq!(levels.l0.sub_levels[0].uncompressed_file_size, 400);
2951
2952 assert_eq!(levels.l0.total_file_size, 200);
2953 assert_eq!(levels.l0.uncompressed_file_size, 400);
2954
2955 assert_eq!(levels.levels[0].table_infos.len(), 1);
2956 assert_eq!(4, levels.levels[0].table_infos[0].sst_id);
2957 assert_eq!(
2958 levels.levels[0].table_infos[0].table_ids,
2959 vec![TableId::new(20)]
2960 );
2961 assert_eq!(levels.levels[0].total_file_size, 400);
2962 assert_eq!(levels.levels[0].uncompressed_file_size, 800);
2963
2964 assert_eq!(levels.compaction_group_version_id, 1);
2965 }
2966
2967 #[test]
2968 fn test_apply_version_delta_prune_table_ids_from_ssts() {
2969 let mut version = HummockVersion {
2970 id: HummockVersionId::new(0),
2971 levels: HashMap::from_iter([(1.into(), {
2972 #[expect(deprecated)]
2973 let levels = Levels {
2974 l0: OverlappingLevel {
2975 sub_levels: vec![
2976 Level {
2977 level_idx: 0,
2978 level_type: LevelType::Overlapping,
2979 table_infos: vec![
2980 make_sst(1, vec![100], 50), make_sst(2, vec![200], 60), ],
2983 total_file_size: 110,
2984 uncompressed_file_size: 220,
2985 sub_level_id: 1,
2986 ..Default::default()
2987 },
2988 Level {
2989 level_idx: 0,
2990 level_type: LevelType::Overlapping,
2991 table_infos: vec![
2992 make_sst(3, vec![100], 70), ],
2994 total_file_size: 70,
2995 uncompressed_file_size: 140,
2996 sub_level_id: 2,
2997 ..Default::default()
2998 },
2999 ],
3000 total_file_size: 180,
3001 uncompressed_file_size: 360,
3002 },
3003 levels: vec![Level {
3004 level_idx: 1,
3005 level_type: LevelType::Nonoverlapping,
3006 table_infos: vec![
3007 make_sst(4, vec![100, 200], 80), make_sst(5, vec![100], 90), ],
3010 total_file_size: 170,
3011 uncompressed_file_size: 340,
3012 ..Default::default()
3013 }],
3014 group_id: 1.into(),
3015 parent_group_id: 0.into(),
3016 member_table_ids: vec![],
3017 compaction_group_version_id: 0,
3018 };
3019 levels
3020 })]),
3021 ..Default::default()
3022 };
3023
3024 let version_delta = HummockVersionDelta {
3025 id: HummockVersionId::new(1),
3026 group_deltas: HashMap::from_iter([(
3027 1.into(),
3028 GroupDeltas {
3029 group_deltas: vec![GroupDelta::PruneTableIdsFromSsts(HashSet::from([
3030 TableId::new(100),
3031 ]))],
3032 },
3033 )]),
3034 ..Default::default()
3035 };
3036
3037 version.apply_version_delta(&version_delta);
3038
3039 let cg = version.get_compaction_group_levels(1.into());
3040
3041 assert_eq!(
3042 cg.l0.sub_levels.len(),
3043 1,
3044 "empty sub-level should be removed"
3045 );
3046 assert_eq!(cg.l0.sub_levels[0].sub_level_id, 1);
3047 assert_eq!(cg.l0.sub_levels[0].table_infos.len(), 1);
3048 assert_eq!(2, cg.l0.sub_levels[0].table_infos[0].sst_id);
3049 assert_eq!(
3050 cg.l0.sub_levels[0].table_infos[0].table_ids,
3051 vec![TableId::new(200)]
3052 );
3053
3054 assert_eq!(cg.l0.total_file_size, 60);
3055 assert_eq!(cg.l0.uncompressed_file_size, 120);
3056
3057 assert_eq!(cg.levels[0].table_infos.len(), 1);
3058 assert_eq!(4, cg.levels[0].table_infos[0].sst_id);
3059 assert_eq!(
3060 cg.levels[0].table_infos[0].table_ids,
3061 vec![TableId::new(200)]
3062 );
3063 assert_eq!(cg.levels[0].total_file_size, 80);
3064 assert_eq!(cg.levels[0].uncompressed_file_size, 160);
3065
3066 assert_eq!(cg.compaction_group_version_id, 1);
3067 }
3068
3069 #[test]
3070 fn test_prune_stale_table_ids_from_ssts() {
3071 let live_table_id = TableId::new(100);
3072 let stale_table_id = TableId::new(200);
3073 let mut version = HummockVersion {
3074 id: HummockVersionId::new(0),
3075 levels: HashMap::from_iter([(1.into(), {
3076 #[expect(deprecated)]
3077 let levels = Levels {
3078 l0: OverlappingLevel {
3079 sub_levels: vec![Level {
3080 level_idx: 0,
3081 level_type: LevelType::Overlapping,
3082 table_infos: vec![make_sst(
3083 1,
3084 vec![live_table_id.as_raw_id(), stale_table_id.as_raw_id()],
3085 50,
3086 )],
3087 total_file_size: 50,
3088 uncompressed_file_size: 100,
3089 sub_level_id: 1,
3090 ..Default::default()
3091 }],
3092 total_file_size: 50,
3093 uncompressed_file_size: 100,
3094 },
3095 levels: vec![Level {
3096 level_idx: 1,
3097 level_type: LevelType::Nonoverlapping,
3098 table_infos: vec![make_sst(2, vec![stale_table_id.as_raw_id()], 60)],
3099 total_file_size: 60,
3100 uncompressed_file_size: 120,
3101 ..Default::default()
3102 }],
3103 group_id: 1.into(),
3104 parent_group_id: 0.into(),
3105 member_table_ids: vec![],
3106 compaction_group_version_id: 0,
3107 };
3108 levels
3109 })]),
3110 state_table_info: HummockVersionStateTableInfo::from_protobuf_owned(
3111 HashMap::from_iter([(
3112 live_table_id,
3113 StateTableInfo {
3114 committed_epoch: 1,
3115 compaction_group_id: 1.into(),
3116 },
3117 )]),
3118 ),
3119 ..Default::default()
3120 };
3121
3122 assert_eq!(version.prune_stale_table_ids_from_ssts(), 1);
3123
3124 let cg = version.get_compaction_group_levels(1.into());
3125 assert_eq!(cg.l0.sub_levels.len(), 1);
3126 assert_eq!(cg.l0.sub_levels[0].table_infos.len(), 1);
3127 assert_eq!(cg.l0.sub_levels[0].table_infos[0].sst_id, 1);
3128 assert_eq!(
3129 cg.l0.sub_levels[0].table_infos[0].table_ids,
3130 vec![live_table_id]
3131 );
3132 assert!(cg.levels[0].table_infos.is_empty());
3133 assert_eq!(cg.compaction_group_version_id, 1);
3134 }
3135
3136 #[test]
3137 fn test_prune_stale_table_ids_from_ssts_skips_legacy_member_table_ids() {
3138 let mut version = HummockVersion {
3139 id: HummockVersionId::new(0),
3140 levels: HashMap::from_iter([(1.into(), {
3141 #[expect(deprecated)]
3142 let levels = Levels {
3143 l0: OverlappingLevel {
3144 sub_levels: vec![Level {
3145 level_idx: 0,
3146 level_type: LevelType::Overlapping,
3147 table_infos: vec![make_sst(1, vec![100, 200], 50)],
3148 total_file_size: 50,
3149 uncompressed_file_size: 100,
3150 sub_level_id: 1,
3151 ..Default::default()
3152 }],
3153 total_file_size: 50,
3154 uncompressed_file_size: 100,
3155 },
3156 group_id: 1.into(),
3157 parent_group_id: 0.into(),
3158 member_table_ids: vec![100, 200],
3159 compaction_group_version_id: 0,
3160 ..Default::default()
3161 };
3162 levels
3163 })]),
3164 ..Default::default()
3165 };
3166
3167 assert_eq!(version.prune_stale_table_ids_from_ssts(), 0);
3168
3169 let cg = version.get_compaction_group_levels(1.into());
3170 assert_eq!(
3171 cg.l0.sub_levels[0].table_infos[0].table_ids,
3172 vec![TableId::new(100), TableId::new(200)]
3173 );
3174 assert_eq!(cg.compaction_group_version_id, 0);
3175 }
3176
3177 #[test]
3178 fn test_apply_version_delta_compact_l0() {
3179 let mut version = HummockVersion {
3180 id: HummockVersionId::new(0),
3181 levels: HashMap::from_iter([(1.into(), {
3182 #[expect(deprecated)]
3183 let levels = Levels {
3184 l0: OverlappingLevel {
3185 sub_levels: vec![
3186 Level {
3187 level_idx: 0,
3188 level_type: LevelType::Nonoverlapping,
3189 table_infos: vec![
3190 make_sst(1, vec![1], 100),
3191 make_sst(2, vec![2], 200),
3192 ],
3193 total_file_size: 300,
3194 uncompressed_file_size: 600,
3195 sub_level_id: 1,
3196 ..Default::default()
3197 },
3198 Level {
3199 level_idx: 0,
3200 level_type: LevelType::Nonoverlapping,
3201 table_infos: vec![make_sst(3, vec![3], 300)],
3202 total_file_size: 300,
3203 uncompressed_file_size: 600,
3204 sub_level_id: 2,
3205 ..Default::default()
3206 },
3207 ],
3208 total_file_size: 600,
3209 uncompressed_file_size: 1200,
3210 },
3211 levels: vec![Level {
3212 level_idx: 1,
3213 level_type: LevelType::Nonoverlapping,
3214 table_infos: vec![],
3215 total_file_size: 0,
3216 uncompressed_file_size: 0,
3217 ..Default::default()
3218 }],
3219 group_id: 1.into(),
3220 parent_group_id: 0.into(),
3221 member_table_ids: vec![],
3222 compaction_group_version_id: 0,
3223 };
3224 levels
3225 })]),
3226 ..Default::default()
3227 };
3228
3229 let version_delta = HummockVersionDelta {
3230 id: HummockVersionId::new(1),
3231 group_deltas: HashMap::from_iter([(
3232 1.into(),
3233 GroupDeltas {
3234 group_deltas: vec![
3235 GroupDelta::IntraLevel(IntraLevelDelta::new(
3236 0, 0,
3238 HashSet::from([1.into(), 2.into(), 3.into()]),
3239 vec![],
3240 0,
3241 0,
3242 )),
3243 GroupDelta::IntraLevel(IntraLevelDelta::new(
3244 1, 0,
3246 HashSet::new(),
3247 vec![make_sst(10, vec![1, 2, 3], 500)],
3248 0,
3249 0,
3250 )),
3251 ],
3252 },
3253 )]),
3254 ..Default::default()
3255 };
3256
3257 version.apply_version_delta(&version_delta);
3258
3259 let cg = version.get_compaction_group_levels(1.into());
3260
3261 assert!(cg.l0.sub_levels.is_empty());
3262 assert_eq!(cg.l0.total_file_size, 0);
3263 assert_eq!(cg.l0.uncompressed_file_size, 0);
3264
3265 assert_eq!(cg.levels[0].table_infos.len(), 1);
3266 assert_eq!(10, cg.levels[0].table_infos[0].sst_id);
3267 assert_eq!(cg.levels[0].total_file_size, 500);
3268 }
3269}